Python FastAPI SSE Implementation Guide Permalink to this section
Part of Backend Stream Generation & Connection Management.
FastAPI has no first-class SSE primitive — you compose one from three building blocks: an async generator that yields formatted frames, StreamingResponse that wraps it, and an ASGI server (Uvicorn or Hypercorn) that flushes each chunk without waiting for the body to close. Get any one of those wrong and you end up with a response that works in curl but blocks for 30 seconds in a browser, or one that leaks file descriptors under load. This guide covers the full path from wire format through ASGI tuning, proxy config, backpressure, and load-test validation.
How It Works Permalink to this section
FastAPI’s StreamingResponse implements the ASGI streaming body protocol. When Uvicorn calls the response’s __call__ method, it invokes send({"type": "http.response.body", "body": chunk, "more_body": True}) for each yield from the wrapped async generator. The ASGI spec guarantees that each such send call causes Uvicorn to flush the chunk to the OS socket buffer immediately — no full-body buffering.
The SSE wire format (defined in the WHATWG HTML spec §9.2) is line-oriented:
id: 42\r\n
event: price\r\n
data: {"symbol":"BTC","price":67200}\r\n
retry: 3000\r\n
\r\n
Each field is field-name: value\n. A blank line (\n\n or \r\n\r\n) terminates the event. Everything after the colon including the space is the value — the spec permits omitting the space but browsers strip exactly one leading space if present. Multi-line data requires multiple data: lines; \n inside a single data: line is not valid.
Starlette’s StreamingResponse sets Transfer-Encoding: chunked automatically on HTTP/1.1 when no Content-Length is provided (which it can’t be for an infinite stream). On HTTP/2 the framing layer handles this natively. Either way, Content-Type must be text/event-stream — the browser’s EventSource rejects any other MIME type and fires onerror immediately.
The event ID and retry mechanism is also part of the wire protocol: when a client reconnects it sends Last-Event-ID: <id> as a request header, which your generator must read and replay missed events from.
Server-Side Implementation with StreamingResponse Permalink to this section
Minimal Working Endpoint Permalink to this section
import asyncio
import json
import time
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def event_generator(request: Request):
event_id = 0
try:
while True:
if await request.is_disconnected():
break # Starlette ASGI disconnect detection
event_id += 1
payload = {"ts": time.time(), "id": event_id}
# SSE frame: id, event type, data, blank line
yield (
f"id: {event_id}\n"
f"event: heartbeat\n"
f"data: {json.dumps(payload)}\n"
f"\n"
)
await asyncio.sleep(1.0)
except asyncio.CancelledError:
pass # Generator cleanup on client disconnect
@app.get("/stream")
async def stream(request: Request):
return StreamingResponse(
event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx: disable proxy_buffering
"Connection": "keep-alive", # HTTP/1.1 advisory
},
)
request.is_disconnected() calls await receive() on the ASGI connection and returns True when a http.disconnect message arrives. This is more reliable than catching CancelledError alone because some ASGI servers only send CancelledError after a timeout, not immediately on TCP close.
Resumption via Last-Event-ID Permalink to this section
from fastapi import Header
@app.get("/stream")
async def stream(
request: Request,
last_event_id: str | None = Header(default=None),
):
# last_event_id is None on first connect, "42" on reconnect
start_id = int(last_event_id) + 1 if last_event_id else 0
return StreamingResponse(
replay_and_follow(request, start_id),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
async def replay_and_follow(request: Request, start_id: int):
# Replay buffered events from e.g. Redis XRANGE
async for event in fetch_missed_events(start_id):
yield format_sse(event)
# Then follow live stream
async for event in live_stream(request):
yield format_sse(event)
def format_sse(event: dict) -> str:
parts = [f"id: {event['id']}\n"]
if "type" in event:
parts.append(f"event: {event['type']}\n")
for line in event["data"].splitlines():
parts.append(f"data: {line}\n")
parts.append("\n")
return "".join(parts)
Idempotent event ID generation describes monotonic ID strategies — use a Redis INCR counter or a database sequence, not time.time(), to guarantee strict ordering across restarts.
Using sse-starlette Permalink to this section
sse-starlette wraps Starlette to provide a typed ServerSentEvent model and automatic frame formatting, plus a built-in ping task that keeps the connection alive through idle proxies.
pip install sse-starlette
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse, ServerSentEvent
import asyncio
app = FastAPI()
async def price_stream(request: Request):
"""Yields typed SSE events; sse-starlette handles formatting."""
event_id = 0
async for price in market_data_feed():
if await request.is_disconnected():
break
event_id += 1
yield ServerSentEvent(
data=price, # dict → JSON-serialized automatically
event="price",
id=str(event_id),
retry=3000, # ms; sent on every event (optional, first is enough)
)
await asyncio.sleep(0) # yield event loop after each event
@app.get("/prices")
async def prices(request: Request):
return EventSourceResponse(
price_stream(request),
ping=15, # sends ": ping" comment every 15 s to keep idle proxies alive
ping_message_factory=lambda: ServerSentEvent(comment="ping"),
)
EventSourceResponse inherits from Starlette’s Response and sets the correct media_type, Cache-Control: no-cache, and X-Accel-Buffering: no by default. The ping parameter spawns an asyncio.Task that interleaves comment-only frames (": ping\n\n") to prevent 60-second proxy idle timeouts.
Comparison: StreamingResponse vs EventSourceResponse Permalink to this section
| Feature | StreamingResponse |
sse-starlette EventSourceResponse |
|---|---|---|
| Frame formatting | Manual string concat | ServerSentEvent typed model |
| Ping / keepalive | Manual | Built-in, configurable interval |
| Disconnect detection | request.is_disconnected() |
Automatic in iteration loop |
retry: field |
Manual in each frame | retry= param on ServerSentEvent |
| CORS / headers | Manual | Inherits Starlette middleware |
| Extra dependency | None | sse-starlette |
| Streaming guarantee | Uvicorn flush | Same |
For production systems with complex event types, sse-starlette reduces boilerplate. For maximum control — custom serializers, binary-safe data, unusual event shapes — raw StreamingResponse is simpler to audit.
Client-Side EventSource Consumption Permalink to this section
The browser EventSource API handles reconnection and Last-Event-ID automatically. Name your event types to avoid the overhead of parsing all messages in the generic onmessage handler:
const es = new EventSource('/prices', { withCredentials: true });
// Named event listeners (matches `event: price` frame field)
es.addEventListener('price', (e) => {
const { symbol, price } = JSON.parse(e.data);
updateUI(symbol, price);
});
es.addEventListener('error', (e) => {
if (es.readyState === EventSource.CLOSED) {
// Browser gave up; manual reconnect with backoff
scheduleReconnect();
}
// readyState === CONNECTING means browser is auto-retrying
});
// Always clean up
window.addEventListener('beforeunload', () => es.close());
For authentication, cookies work transparently with withCredentials: true. Bearer tokens require either a cookie or a query-parameter workaround, since EventSource does not support custom request headers. See the guide on authenticating SSE streams for the token-in-cookie pattern.
For React applications, encapsulate this in a hook — Building a useEventSource React Hook covers cleanup, state integration, and dependency tracking.
Edge Cases & Network Interference Permalink to this section
Proxy Buffering Permalink to this section
The most common production failure: Nginx, AWS ALB, and Cloudflare all buffer streaming responses by default, collecting the entire body before forwarding to the client. Result: events arrive in bursts after minutes of silence, or not at all until the connection closes.
# nginx.conf — SSE location block
location /stream {
proxy_pass http://uvicorn_upstream;
proxy_http_version 1.1; # required for chunked TE
proxy_set_header Connection ""; # clear hop-by-hop for upstream
proxy_buffering off; # disable response buffering
proxy_cache off;
proxy_read_timeout 3600s; # keep connection open for 1 h
gzip off; # compression forces buffering
add_header X-Accel-Buffering no;
add_header Cache-Control no-cache;
}
X-Accel-Buffering: no sent in the response header tells Nginx to disable buffering for that specific response even if proxy_buffering on is the global default — FastAPI sets this automatically when you include it in headers={}. For buffer management details and chunked transfer encoding specific to the ASGI stack, see the dedicated guide.
CDN and Load Balancer Timeouts Permalink to this section
| Layer | Default idle timeout | SSE mitigation |
|---|---|---|
| AWS ALB | 60 s | Set idle timeout to 3600 s; send ping ≤ 55 s |
Nginx proxy_read_timeout |
60 s | Set to 3600 s |
| Cloudflare (proxied) | 100 s | Enterprise plan needed for >100 s streams |
HAProxy timeout tunnel |
1 h by default | Usually fine; verify timeout client |
| GCP HTTP(S) LB | 30 s | Backend service timeoutSec = 3600 |
Send a comment frame every 30–55 seconds to reset idle timers at every hop:
async def keepalive_generator(request: Request, inner_gen):
"""Interleave SSE comment pings with real events."""
PING_INTERVAL = 30.0
last_ping = asyncio.get_event_loop().time()
async for frame in inner_gen:
yield frame
now = asyncio.get_event_loop().time()
if now - last_ping >= PING_INTERVAL:
yield ": ping\n\n" # comment line; EventSource ignores it
last_ping = now
Firewall and Corporate Proxy Interference Permalink to this section
Enterprise HTTP proxies (Squid, Zscaler, BlueCoat) frequently buffer SSE because they expect request-response semantics. Mitigation options in priority order:
- Use HTTPS — TLS-terminated streams cannot be inspected or buffered by most corporate proxies.
- Set
retry:— aretry: 1000frame on connect tells the browser how long to wait before reconnecting; keep it short (1–3 s) for interactive UIs. - Fallback to long-polling — detect EventSource failure (
onerrorwithreadyState === CLOSED) and switch to a polling endpoint that acceptsLast-Event-IDas a query param.
gzip Compression Permalink to this section
Never enable gzip on SSE endpoints. The gzip filter in Nginx, Apache, or Python middleware buffers the entire response body to compress it — a stream has no end, so it buffers forever. Either disable globally for text/event-stream or add gzip off to the SSE location block.
Performance & Scale Considerations Permalink to this section
Connection Counts and Worker Model Permalink to this section
Uvicorn runs a single-threaded async event loop per worker process. Each SSE connection is a persistent async generator consuming roughly 2–4 KB of Python frame stack per active connection plus the asyncio task overhead (~1 KB). At 10,000 concurrent connections on a single Uvicorn worker you’ll see roughly 40–80 MB of generator/task overhead — the async model scales significantly better than threaded frameworks.
Run multiple Uvicorn workers behind Gunicorn:
gunicorn app:app \
-k uvicorn.workers.UvicornWorker \
-w 4 \ # 1–2× CPU cores for IO-bound SSE
--timeout 0 \ # disable worker timeout (long-lived connections)
--keepalive 65 \ # > any proxy idle timeout
--bind 0.0.0.0:8000
--timeout 0 is critical: Gunicorn’s default 30-second worker timeout kills long-lived SSE connections. With --timeout 0, only the OS TCP keepalive and your application-level pings govern connection lifetime.
For connection pooling strategies and per-worker capacity calculations, see the dedicated guide. File descriptor limits (ulimit -n) must be raised on the OS before you can hold >1024 connections per process.
Backpressure and Slow Consumers Permalink to this section
An async generator that awaits a slow producer (database, Redis) will naturally backpressure. The risk is accumulating events in an unbounded queue when the producer is faster than the consumer:
import asyncio
async def bounded_stream(request: Request, channel: asyncio.Queue):
"""Drop old events if consumer falls behind; never buffer more than 100."""
while not await request.is_disconnected():
try:
event = channel.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(0.05)
continue
yield format_sse(event)
# Producer side: drop oldest if full
async def publish(channel: asyncio.Queue, event: dict):
if channel.full():
try:
channel.get_nowait() # discard oldest
except asyncio.QueueEmpty:
pass
await channel.put(event)
# Create per-connection queue with cap
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
For token-bucket rate limiting applied per SSE connection, see Rate Limiting & Backpressure Handling.
Fan-out Architecture Permalink to this section
A single FastAPI instance cannot efficiently broadcast to thousands of connections by iterating a list of generator queues in Python. Use Redis pub/sub to fan out events at the broker layer:
import redis.asyncio as redis
async def redis_sse_stream(request: Request, channel: str):
r = redis.Redis.from_url("redis://localhost:6379")
pubsub = r.pubsub()
await pubsub.subscribe(channel)
try:
async for message in pubsub.listen():
if await request.is_disconnected():
break
if message["type"] != "message":
continue
yield f"data: {message['data'].decode()}\n\n"
finally:
await pubsub.unsubscribe(channel)
await r.aclose()
Each Uvicorn worker subscribes independently; Redis delivers the same message to every subscribed connection regardless of which worker holds it. This lets you scale to many workers without sticky sessions. See Redis Pub/Sub Fan-Out for SSE for patterns including channel namespacing, ACL security, and connection pool sizing.
Validation & Debugging Permalink to this section
curl Verification Permalink to this section
# Basic SSE stream — confirm chunked delivery and frame format
curl -N -H "Accept: text/event-stream" http://localhost:8000/stream
# Include verbose headers to inspect Content-Type, Transfer-Encoding
curl -Nv -H "Accept: text/event-stream" http://localhost:8000/stream 2>&1 | head -40
# Simulate reconnect with Last-Event-ID
curl -N \
-H "Accept: text/event-stream" \
-H "Last-Event-ID: 17" \
http://localhost:8000/stream
-N disables curl’s own output buffering. Without it, you won’t see events until curl’s internal buffer fills.
Expected output shape:
HTTP/1.1 200 OK
content-type: text/event-stream; charset=utf-8
cache-control: no-cache
transfer-encoding: chunked
x-accel-buffering: no
id: 1
event: heartbeat
data: {"ts": 1718870400.123, "id": 1}
id: 2
event: heartbeat
data: {"ts": 1718870401.124, "id": 2}
Browser DevTools Permalink to this section
- Open Network tab → filter by
EventStreamtype (Chrome) oreventsource(Firefox). - Click the request row → EventStream sub-tab shows each parsed event with its
id,type,data, and timestamp. - Timing tab:
Waiting (TTFB)should be low; the request never completes (Status remains pending). - Check Response Headers for
transfer-encoding: chunkedand absence ofcontent-length.
Structured Logging Permalink to this section
import logging
import structlog
logger = structlog.get_logger()
@app.get("/stream")
async def stream(request: Request):
client = request.client.host if request.client else "unknown"
log = logger.bind(client=client, path="/stream")
log.info("sse_connect")
async def instrumented_gen():
count = 0
try:
async for frame in event_generator(request):
count += 1
yield frame
finally:
log.info("sse_disconnect", events_sent=count)
return StreamingResponse(
instrumented_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
Emit sse_connect and sse_disconnect events with client IP, connection duration, and event count. Alert on events_sent == 0 with a short duration — this indicates a proxy or auth rejection before the first frame.
Load Testing Permalink to this section
# k6 SSE load test (k6 does not natively parse SSE, use a stream reader)
k6 run - <<'EOF'
import http from 'k6/http';
export let options = { vus: 200, duration: '60s' };
export default function () {
const res = http.get('http://localhost:8000/stream', {
headers: { 'Accept': 'text/event-stream' },
timeout: '30s',
});
// Check at least one SSE frame arrived
check(res, { 'has event-stream body': (r) => r.body.includes('data:') });
}
EOF
Monitor during the test: ss -s for socket counts, ps aux for Uvicorn worker memory, /proc/<pid>/fd count for file descriptor usage. Watch for memory growth proportional to VU count — that indicates unbounded per-connection state.
⚡ Production Directives
- Set
--timeout 0on Gunicorn andproxy_read_timeout 3600son Nginx — the default 30/60-second timeouts will kill long-lived SSE connections silently. - Always include
X-Accel-Buffering: noin the response headers andproxy_buffering offin the Nginx location block; without both, Nginx may still buffer on some versions. - Send a comment ping (
: ping\n\n) every 30 seconds — this resets idle timers at every proxy hop and prevents ALB, Nginx, and CDN from closing "idle" connections. - Raise OS file descriptor limits (
ulimit -n 65535and matching/etc/security/limits.conf) before serving more than ~900 concurrent connections per worker. - Never enable gzip on SSE endpoints — compression forces full-body buffering, breaking streaming delivery entirely.
Production Checklist Permalink to this section
Frequently Asked Questions Permalink to this section
Why do my SSE events arrive all at once when the stream closes, not in real time?
This is almost always proxy buffering. Nginx, AWS ALB, and many CDNs buffer streaming responses until they complete. Add proxy_buffering off to the Nginx location block and X-Accel-Buffering: no to the FastAPI response headers. Also confirm gzip is disabled for the endpoint — gzip compression requires the full body before it can flush. Test with curl -N directly against Uvicorn (bypassing the proxy) to isolate which layer is buffering.
How do I authenticate SSE connections when EventSource doesn't support custom headers?
The two standard approaches are: (1) use withCredentials: true on EventSource and send the token as an HttpOnly SameSite=Strict cookie — FastAPI reads it via request.cookies; (2) pass a short-lived signed token as a query parameter (/stream?token=eyJ...) that FastAPI validates and immediately invalidates after use. Never put long-lived secrets in query parameters — they appear in server logs and browser history. See the guide on authenticating SSE streams for the full implementation.
What happens when a client disconnects mid-stream in FastAPI?
Uvicorn detects the TCP close and sets a disconnect flag on the ASGI connection. If your generator is blocked in await asyncio.sleep() or a Redis listen call, the ASGI framework cancels the task, raising asyncio.CancelledError at the next await point. Use a try/finally block to release subscriptions, database cursors, and queue references. Also poll await request.is_disconnected() periodically in tight loops — some ASGI servers deliver the disconnect message only when you explicitly check.
Should I use sse-starlette or raw StreamingResponse?
Use sse-starlette if you want typed ServerSentEvent objects, built-in ping tasks, and automatic disconnect handling — it removes significant boilerplate. Use raw StreamingResponse if you need full control over frame serialization, want zero extra dependencies, or are building a custom multiplexing layer. Both use the same Starlette/Uvicorn flush mechanism and perform identically at the network level.
How many concurrent SSE connections can one Uvicorn worker handle?
Typically 5,000–20,000 depending on per-connection state. Each connection consumes an async generator frame (~2–4 KB), an asyncio Task (~1 KB), and any application state (queues, Redis subscriptions). File descriptor limits are usually the first ceiling: each connection uses one FD, so ulimit -n 65535 sets a practical cap of ~64,000 per process (leaving headroom for logs, sockets, etc.). Run load tests at 10× your expected peak to find the actual limit on your instance type. For higher concurrency, add Uvicorn workers and distribute with Nginx upstream or a service mesh.