Redis Pub/Sub Fan-Out for SSE Permalink to this section

Part of Backend Stream Generation & Connection Management.

A single Node.js, Go, or Python process can serve SSE connections directly — until you deploy a second instance. Then every server only knows about the connections it holds, and any event published to the wrong node is silently dropped. The fix: decouple producers from transports by routing all events through Redis pub/sub. Publishers write to a Redis channel once; every SSE server subscribes and fans the message out to every open connection it owns. This page covers the mechanics of that fan-out, channel naming conventions, at-least-once delivery concerns, and what breaks at scale.

Redis pub/sub fan-out architecture for SSE A producer publishes to a Redis channel; multiple SSE server nodes each subscribe and forward messages to their own pool of connected clients. Producer API / worker PUBLISH channel Redis Pub/Sub channel events:room:42 SUBSCRIBE SSE Node 1 3 connections SSE Node 2 5 connections SSE Node 3 4 connections 👤 👤 👤 PUBLISH (once) SUBSCRIBE fan-out SSE write to client
One PUBLISH reaches all connected SSE clients across every node via Redis fan-out.

How Redis Pub/Sub Fan-Out Works Permalink to this section

Redis pub/sub is a message-passing primitive: a PUBLISH call delivers a message to every client currently subscribed to the same channel, synchronously within Redis. There is no persistence, no acknowledgement, and no message queue. If no subscriber is connected at the moment of publication, the message is gone.

Each SSE server process opens one persistent Redis connection per channel it cares about (via SUBSCRIBE). When a new SSE client connects, the handler notes which channel(s) that client should receive, adds the HTTP response to an in-memory registry, and starts draining the Redis subscription. When Redis delivers a message on the channel, the server iterates the registry and writes the SSE-formatted event to every open response.

Wire format refresher. The browser EventSource API expects Content-Type: text/event-stream and newline-delimited fields:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
X-Accel-Buffering: no

id: 1750491234567
event: message
data: {"type":"order_update","orderId":99}

Each field is field: value\n; the event is terminated by a blank line (\n\n). Multi-line data: fields, event IDs, and the retry mechanism are all handled at the SSE layer, not by Redis.

Channel naming convention. Design channel names so every SSE server can subscribe to exactly the channels its current connections need — no more.

Pattern Example Use case
User-scoped sse:user:u_9182 Private notifications per user
Room/topic sse:room:r_42 Chat room, collaborative doc
Broadcast sse:global System-wide announcements
Wildcard (PSUBSCRIBE) sse:room:* One subscriber for all rooms on a node

Prefer PSUBSCRIBE with a wildcard only when you have many rooms per node; the pattern-matching CPU is negligible but the single connection simplifies lifecycle management.

Node.js Server-Side Implementation Permalink to this section

The example uses ioredis (recommended over the legacy redis package for its reconnect semantics) and the built-in http module. The same pattern works with Express or Fastify — replace res.writeHead / res.write accordingly.

// sse-server.js — Node.js SSE server with Redis pub/sub fan-out
import http from 'node:http';
import Redis from 'ioredis';

// One subscriber connection shared across all rooms on this process.
// A subscriber connection CANNOT issue commands while in subscribe mode.
const sub = new Redis({ host: 'redis', port: 6379, lazyConnect: true });
await sub.connect();

// Separate connection for PUBLISH (called by producer endpoints).
const pub = new Redis({ host: 'redis', port: 6379 });

// Registry: channel → Set<http.ServerResponse>
const registry = new Map();

function getOrSubscribeChannel(channel) {
  if (!registry.has(channel)) {
    registry.set(channel, new Set());
    // Subscribe only when first client joins this channel.
    sub.subscribe(channel, (err) => {
      if (err) console.error('SUBSCRIBE error', channel, err);
    });
  }
  return registry.get(channel);
}

function removeClient(channel, res) {
  const clients = registry.get(channel);
  if (!clients) return;
  clients.delete(res);
  if (clients.size === 0) {
    registry.delete(channel);
    // Release the Redis subscription when no clients remain.
    sub.unsubscribe(channel);
  }
}

// Redis delivers messages here for every subscribed channel.
sub.on('message', (channel, message) => {
  const clients = registry.get(channel);
  if (!clients || clients.size === 0) return;

  // Parse once; fan out to all connections in this process.
  const payload = JSON.parse(message); // { id, event, data }
  const sseFrame =
    `id: ${payload.id}\n` +
    `event: ${payload.event}\n` +
    `data: ${JSON.stringify(payload.data)}\n\n`;

  for (const res of clients) {
    try {
      res.write(sseFrame);
    } catch {
      // Socket already gone; cleanup happens in the 'close' handler below.
    }
  }
});

const server = http.createServer((req, res) => {
  // Expect /events?channel=sse:room:r_42
  const url = new URL(req.url, 'http://x');
  const channel = url.searchParams.get('channel');
  if (!channel || !channel.startsWith('sse:')) {
    res.writeHead(400).end('Invalid channel');
    return;
  }

  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no',        // Nginx: disable proxy buffer
  });
  res.flushHeaders();

  // Send a comment as a heartbeat immediately to prevent proxy timeouts.
  res.write(': connected\n\n');

  const clients = getOrSubscribeChannel(channel);
  clients.add(res);

  // Send periodic heartbeats every 15 seconds.
  const heartbeat = setInterval(() => {
    try { res.write(': heartbeat\n\n'); } catch { /* closed */ }
  }, 15_000);

  req.on('close', () => {
    clearInterval(heartbeat);
    removeClient(channel, res);
  });
});

server.listen(3000);

Publishing from a producer (separate service or route):

// producer.js — emit an event to all SSE subscribers for a channel
async function publishEvent(channel, event, data) {
  const id = Date.now().toString(); // Use a monotonic ID strategy in production
  await pub.publish(channel, JSON.stringify({ id, event, data }));
}

// Example: notify all listeners in room 42
await publishEvent('sse:room:r_42', 'order_update', { orderId: 99, status: 'shipped' });

For production-grade idempotent event IDs rather than Date.now(), use a Redis INCR counter per channel or a UUID v7. Date.now() is fine for ordering but breaks if two producers publish in the same millisecond.

Python / Go Consumer Side Permalink to this section

Python (sse-starlette + redis-py) Permalink to this section

# main.py — FastAPI SSE endpoint backed by Redis pub/sub
import asyncio
import json
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import redis.asyncio as aioredis

app = FastAPI()
redis_client = aioredis.from_url("redis://redis:6379", decode_responses=True)

async def event_generator(request: Request, channel: str):
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(channel)
    try:
        async for raw in pubsub.listen():
            if await request.is_disconnected():
                break
            if raw["type"] != "message":
                continue
            payload = json.loads(raw["data"])
            yield {
                "id": payload["id"],
                "event": payload["event"],
                "data": json.dumps(payload["data"]),
            }
    finally:
        await pubsub.unsubscribe(channel)
        await pubsub.close()

@app.get("/events")
async def sse_endpoint(request: Request, channel: str):
    if not channel.startswith("sse:"):
        return {"error": "invalid channel"}, 400
    return EventSourceResponse(event_generator(request, channel))

See Streaming SSE Responses with FastAPI and sse-starlette for full middleware and CORS setup.

Go (net/http + go-redis) Permalink to this section

// main.go — Go SSE handler with Redis pub/sub fan-out
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"

    "github.com/redis/go-redis/v9"
)

var rdb = redis.NewClient(&redis.Options{Addr: "redis:6379"})

type Payload struct {
    ID    string          `json:"id"`
    Event string          `json:"event"`
    Data  json.RawMessage `json:"data"`
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
    channel := r.URL.Query().Get("channel")
    if channel == "" {
        http.Error(w, "missing channel", http.StatusBadRequest)
        return
    }

    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming unsupported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("X-Accel-Buffering", "no")
    w.WriteHeader(http.StatusOK)
    fmt.Fprint(w, ": connected\n\n")
    flusher.Flush()

    ctx, cancel := context.WithCancel(r.Context())
    defer cancel()

    ps := rdb.Subscribe(ctx, channel)
    defer ps.Close()

    ch := ps.Channel()
    for {
        select {
        case <-ctx.Done():
            return // client disconnected
        case msg, ok := <-ch:
            if !ok {
                return // Redis subscription closed
            }
            var p Payload
            if err := json.Unmarshal([]byte(msg.Payload), &p); err != nil {
                continue
            }
            fmt.Fprintf(w, "id: %s\nevent: %s\ndata: %s\n\n", p.ID, p.Event, p.Data)
            flusher.Flush()
        }
    }
}

func main() {
    http.HandleFunc("/events", sseHandler)
    http.ListenAndServe(":3000", nil)
}

For advanced Go streaming patterns including graceful shutdown, see Go Streaming Patterns for SSE.

At-Least-Once Delivery and Message Replay Permalink to this section

Redis pub/sub is fire-and-forget. If a subscriber disconnects and reconnects — even for 50 ms — messages published in that window are lost. The EventSource spec sends Last-Event-ID on reconnect, but your server must handle it explicitly.

Strategy 1 — Redis Streams as a replay buffer.
Replace PUBLISH / SUBSCRIBE with XADD / XREAD. On reconnect, read from Last-Event-ID before switching to blocking reads.

// Replay missed events from a Redis Stream on reconnect
async function replayFromId(stream, lastId, res) {
  // '>' means new entries only; use the actual lastId for replay
  const entries = await pub.xrange(stream, lastId, '+', 'COUNT', 500);
  for (const [id, fields] of entries) {
    const idx = fields.indexOf('payload');
    if (idx === -1) continue;
    const payload = JSON.parse(fields[idx + 1]);
    res.write(`id: ${id}\nevent: ${payload.event}\ndata: ${JSON.stringify(payload.data)}\n\n`);
  }
}

Set a stream MAXLEN to cap memory: XADD stream MAXLEN ~ 10000 * payload '...'.

Strategy 2 — short-lived pub/sub + Redis cache.
Keep pub/sub for live delivery. On each PUBLISH, also write to a Redis key with a short TTL (e.g. SET sse:last:r_42 <payload> EX 30). On reconnect, serve the cached event if Last-Event-ID is stale, then switch to live subscription.

Neither strategy guarantees exactly-once; design consumers to be idempotent. Use stable event IDs so clients can deduplicate.

Edge Cases and Network Interference Permalink to this section

Proxy Buffering Permalink to this section

Most reverse proxies buffer HTTP responses until they see Content-Length or a buffer fills. An SSE stream has neither, so without explicit config, events sit in the proxy buffer for seconds.

Proxy Directive Effect
Nginx proxy_buffering off; or X-Accel-Buffering: no header Disables per-request buffering
Apache ProxyBufSize 0 / SetEnv proxy-nokeepalive 1 Reduces proxy buffer
Caddy Automatic — no config needed Caddy flushes SSE by default
AWS ALB Chunked transfer encoding, ≤ 1 MB per chunk Hard limit; keep events under 1 MB
Cloudflare Streaming enabled by default for Workers; set cf-cache-status: DYNAMIC Cache must be bypassed

Always send X-Accel-Buffering: no as a response header even if you configure Nginx separately — it survives proxy hops. See Buffer Management & Chunked Transfer Encoding for a full treatment.

Redis Connection Limits Permalink to this section

Each SSE server node opens one Redis connection per subscribed channel (or one connection for a wildcard PSUBSCRIBE). With 50 nodes and 200 active rooms, that is 10,000 connections against Redis — easily hitting the default maxclients 10000. Mitigation:

  • Use PSUBSCRIBE sse:room:* to collapse all rooms onto one connection per node.
  • Place Redis Sentinel or Cluster with connection pooling middleware in front.
  • Monitor CLIENT LIST and INFO clients in Redis.

Connection Drops Between Node and Redis Permalink to this section

ioredis auto-reconnects by default; messages published during the reconnect window are lost (pure pub/sub). Configure retryStrategy for exponential backoff:

const sub = new Redis({
  host: 'redis',
  retryStrategy: (times) => Math.min(times * 100, 3000), // max 3s
  maxRetriesPerRequest: null, // subscriber connections need unlimited retries
});

During the reconnect, SSE clients remain connected to the SSE server; they receive no events but the connection stays open. When Redis reconnects, event delivery resumes. Clients will notice the gap via missing IDs.

Firewall and CDN Idle Timeouts Permalink to this section

Many CDNs and corporate firewalls close TCP connections idle for 60–300 seconds. SSE channels with infrequent events will be severed silently. Counter this with keep-alive heartbeat comments:

: heartbeat\n\n

Send every 15–30 seconds. These are valid SSE comment lines the browser ignores; they keep the TCP flow active and reset idle timers.

EventSource Reconnection Loop Permalink to this section

If your SSE endpoint returns a non-200 status, the browser EventSource retries after retry: ms (default 3000 ms) — indefinitely. A misconfigured Redis that rejects connections will cause a thundering herd of SSE reconnects. Set retry: 30000 in the first SSE frame when Redis is degraded, and return 503 with Retry-After to suppress EventSource auto-retry until Redis recovers.

Performance and Scale Considerations Permalink to this section

Connection Counts Permalink to this section

Each SSE client is a persistent HTTP connection. With connection pooling at the OS level, Linux defaults allow ~1,000 connections per process; bump ulimit -n and tune fs.file-max for tens of thousands. Redis pub/sub adds exactly one TCP connection per subscribed channel per node — not per SSE client. That asymmetry is the core efficiency win.

Memory Per Client Permalink to this section

Component Memory per SSE connection
Node.js http.ServerResponse ~8–16 KB (socket + headers)
Go http.ResponseWriter ~4 KB (goroutine stack)
Python asyncio task ~8 KB (coroutine + event loop overhead)
Redis subscriber connection ~20–50 KB (Redis server-side)

At 10,000 concurrent SSE clients across 5 nodes (2,000 per node), expect roughly 160 MB application memory per Node.js node, plus 50 MB Redis memory for the subscriber connections themselves.

Backpressure Permalink to this section

Redis pub/sub does not apply backpressure to publishers. A slow SSE client that cannot drain its socket will cause the Node.js res.write to queue in the kernel send buffer (~128 KB default). When that fills, write returns false (Node.js stream backpressure). Log slow clients and close connections that have not drained within a timeout:

sub.on('message', (channel, message) => {
  const clients = registry.get(channel);
  if (!clients) return;
  for (const res of clients) {
    const ok = res.write(sseFrame);
    if (!ok) {
      // Socket is full; give it 5 s to drain, then drop.
      setTimeout(() => {
        if (!res.writableEnded) res.destroy(new Error('slow client timeout'));
      }, 5_000);
    }
  }
});

For deeper backpressure strategies including token-bucket rate limiting see Rate Limiting & Backpressure Handling.

CPU: Serialisation Cost Permalink to this section

You serialise the payload once in the producer and deserialise it once per SSE node (not once per client). The formatted SSE string is built once per node and written to N sockets as a string — efficient. Avoid JSON.stringify in the write loop; do it before the fan-out.

Redis Cluster Sharding Permalink to this section

With Redis Cluster, channels are hash-slotted: sse:room:r_42 may live on a different shard than sse:room:r_43. Each node must subscribe via a connection to the correct shard. ioredis handles this transparently when you instantiate a Redis.Cluster client. go-redis ClusterClient does the same.

Validation and Debugging Permalink to this section

Verify the fan-out end-to-end with curl Permalink to this section

# Terminal 1 — open an SSE connection
curl -N -H "Accept: text/event-stream" \
  "http://localhost:3000/events?channel=sse:room:r_42"

# Terminal 2 — publish a test event directly via redis-cli
redis-cli PUBLISH sse:room:r_42 \
  '{"id":"1750491234567","event":"test","data":{"msg":"hello"}}'

You should see the formatted SSE event appear in Terminal 1 within milliseconds.

Inspect active subscriptions Permalink to this section

# List all channels with at least one subscriber
redis-cli PUBSUB CHANNELS "sse:*"

# Count subscribers per channel
redis-cli PUBSUB NUMSUB sse:room:r_42 sse:global

# Count pattern subscribers
redis-cli PUBSUB NUMPAT

Structured logging checklist Permalink to this section

Log the following events at info or debug level; index by channel and nodeId:

  • Client connected: { event: "sse_connect", channel, clientId, nodeId }
  • Client disconnected: { event: "sse_disconnect", channel, clientId, durationMs, nodeId }
  • Redis message received: { event: "redis_msg", channel, payloadBytes, clientCount, nodeId }
  • Redis subscribe/unsubscribe: { event: "redis_sub", channel, action, nodeId }
  • Slow client detected: { event: "slow_client", channel, clientId, bufferBytes }

Browser DevTools Permalink to this section

Open Network → Filter: text/event-stream. Select the SSE request and switch to the EventStream tab to see each event frame as it arrives with its id, event, and data fields. The Timing tab shows when the response started streaming versus when individual chunks arrived — useful for spotting proxy buffering.

⚡ Production Directives

  • Send X-Accel-Buffering: no on every SSE response; configure proxy_buffering off in Nginx upstream blocks.
  • Use PSUBSCRIBE sse:* per node instead of per-channel SUBSCRIBE to cap Redis connections at one per node.
  • Store events in a Redis Stream (XADD) alongside pub/sub so reconnecting clients can replay missed messages via Last-Event-ID.
  • Send a comment heartbeat (: heartbeat\n\n) every 15–30 seconds to prevent CDN and firewall idle-timeout disconnects.
  • Monitor PUBSUB NUMSUB and Redis INFO clients in your alerting stack; set alerts when subscriber counts diverge from expected SSE connection counts.

Production Checklist Permalink to this section

Frequently Asked Questions Permalink to this section

Why not use Redis keyspace notifications instead of pub/sub?

Keyspace notifications fire on Redis data changes (SET, DEL, EXPIRE) and are routed via pub/sub under the hood. They are useful for cache-invalidation events but carry overhead: you must enable notify-keyspace-events in redis.conf, every write emits a notification regardless of whether anyone listens, and the channel names are Redis-internal (e.g. __keyevent@0__:set). For application-level SSE events, explicit PUBLISH to named channels is simpler, cheaper, and gives you control over the payload schema.

Does Redis pub/sub guarantee message ordering?

Yes, within a single Redis instance: messages are delivered to subscribers in the order they were published. Across a Redis Cluster, channels on different shards have independent ordering guarantees. If two producers publish to the same channel from different application servers, the order depends on which PUBLISH reaches Redis first — you have no distributed ordering guarantee without a coordination layer (e.g. a Lua script with INCR + PUBLISH in one atomic call, or using Redis Streams which provide a total-order log per stream key).

How many SSE clients can one Redis pub/sub channel support?

Redis delivers the message once per subscriber connection (i.e. once per SSE node), not once per SSE client. A single channel can support effectively unlimited SSE clients: the bottleneck is the SSE server's ability to write to N sockets simultaneously, not Redis throughput. In practice, a single Redis instance can handle hundreds of thousands of pub/sub messages per second. The SSE server fan-out loop is the hot path; profile it with synthetic load using tools like k6 or wrk before scaling horizontally.

Can I use Redis Sentinel or Redis Cluster with SSE fan-out?

Yes. ioredis supports both: pass a sentinels array for Sentinel mode or use new Redis.Cluster([...]) for Cluster. go-redis provides NewSentinelClient and NewClusterClient. In Cluster mode, subscriber connections are routed to the correct shard automatically. The main caveat is that PSUBSCRIBE with wildcards spanning multiple hash slots requires a connection to each shard — ioredis handles this transparently; confirm with PUBSUB CHANNELS on each node.

Deep Dives