Documentation

API Reference

Streaming

The A4F API allows streaming responses from any model. This is useful for building chat interfaces or other applications where the UI should update as the model generates the response.

To enable streaming, you set the stream parameter to true in your request to the /v1/chat/completions endpoint. The model will then stream the response to the client in chunks, rather than returning the entire response at once.

Here is an example of how to stream a response and process it:

import os
import requests
import json
A4F_API_KEY = os.getenv("A4F_API_KEY")
A4F_BASE_URL = "https://api.a4f.co/v1/chat/completions"
headers = {
"Authorization": f"Bearer {A4F_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": "provider-1/chatgpt-4o-latest",
"messages": [
{"role": "user", "content": "Write a short story about a friendly robot."}
],
"stream": True
}
try:
with requests.post(A4F_BASE_URL, headers=headers, json=payload, stream=True) as response:
response.raise_for_status()
print("Streaming response:")
for chunk in response.iter_lines():
if chunk:
decoded_chunk = chunk.decode('utf-8')
if decoded_chunk.startswith("data: "):
json_data_str = decoded_chunk[len("data: "):]
if json_data_str.strip() == "[DONE]":
print("\nStream finished.")
break
try:
json_data = json.loads(json_data_str)
content = json_data.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
print(content, end="", flush=True)
except json.JSONDecodeError:
print(f"\nError decoding JSON chunk: {json_data_str}")
print()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
except Exception as e:
print(f"An error occurred: {e}")

Additional Information

For Server-Sent Events (SSE) streams, A4F passes through the data format provided by the underlying model provider, which is generally compatible with the OpenAI SSE format. Some providers might occasionally send comments (lines starting with :) as keep-alive pings to prevent connection timeouts. These can usually be ignored by SSE clients.

Standard SSE client libraries and parsers should handle these streams correctly:

Each data chunk in the stream typically follows the format data: {...json...}, followed by two newlines. The stream is terminated by data: [DONE].

Stream Cancellation

Streaming requests can be cancelled by aborting the underlying HTTP connection. For supported providers, A4F will attempt to propagate this cancellation to stop model processing and billing.

Provider Support for Cancellation

Effective stream cancellation (i.e., stopping backend processing and further billing) depends on the capabilities of the underlying model provider. A4F endeavors to forward cancellation signals when feasible.

If a provider does not support immediate cancellation or if the signal is not propagated in time, the model might continue processing the request fully, and you could be billed accordingly, even if your client has stopped receiving chunks.

To implement stream cancellation in your client:

import os
import requests
import json
import threading
import time
A4F_API_KEY = os.getenv("A4F_API_KEY")
A4F_BASE_URL = "https://api.a4f.co/v1/chat/completions"
headers = {
"Authorization": f"Bearer {A4F_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": "provider-1/chatgpt-4o-latest-long",
"messages": [{"role": "user", "content": "Tell me a very long story about a space adventure that takes at least 2 minutes to write."}],
"stream": True,
}
session = requests.Session()
response = None
cancel_event = threading.Event()
def stream_with_cancellation():
global response
try:
with session.post(A4F_BASE_URL, headers=headers, json=payload, stream=True, timeout=(3.1, 27)) as r:
response = r
response.raise_for_status()
for chunk in response.iter_lines():
if cancel_event.is_set():
print("\nStream cancelled by client.")
if hasattr(response, 'raw') and hasattr(response.raw, 'close'):
response.raw.close()
break
if chunk:
print(chunk.decode('utf-8'))
if not cancel_event.is_set():
print("\nStream finished naturally.")
except requests.exceptions.Timeout:
print("\nRequest timed out.")
except requests.exceptions.RequestException as e:
print(f"\nRequest failed: {e}")
except Exception as e:
print(f"\nAn error occurred: {e}")
finally:
if response:
response.close()
session.close()
def cancel_after_delay(delay_seconds):
time.sleep(delay_seconds)
print(f"\n[Timer] Attempting to cancel stream after {delay_seconds}s...")
cancel_event.set()
stream_thread = threading.Thread(target=stream_with_cancellation)
stream_thread.start()
cancel_timer_thread = threading.Thread(target=cancel_after_delay, args=(5,))
cancel_timer_thread.start()
stream_thread.join()
cancel_timer_thread.join()
print("Main thread finished.")

Was this page helpful?