Applying Token Bucket Rate Limiting to Event Streams

Incident Symptoms & Triage

Clients report intermittent ERR_CONNECTION_RESET, stalled UI updates, or net::ERR_HTTP2_STREAM_ERROR during high-throughput SSE broadcasts. The connection drops silently under load, causing state desynchronization and forcing full page reloads.

Immediate Triage:

  1. Open Chrome DevTools → Network tab → Filter by event-stream.
  2. Inspect failed requests. Look for net::ERR_HTTP2_STREAM_ERROR or abrupt 200 OK terminations mid-stream.
  3. Check server logs for ECONNRESET or write after end errors.
  4. Verify upstream producer throughput. If events/sec > client parse rate, TCP buffers are saturating.

Root Cause Analysis

Unbounded event producers flood the HTTP response stream faster than clients or intermediate proxies (NGINX, Cloudflare) can consume. Fixed-window rate limiters create artificial bursts at interval boundaries, violating the continuous delivery model of SSE. Without a mathematical pacing mechanism, the Backend Stream Generation & Connection Management pipeline saturates, triggering OS-level socket backpressure and forced disconnects. The stream lacks a leaky or token-based model to pace writes against actual network capacity.

Implementation: Per-Session Token Bucket

Deploy a token bucket algorithm to smooth event emission, prevent TCP buffer saturation, and maintain persistent connections without dropping critical state updates.

1. Core Token Bucket Logic

Initialize a bucket per client session. Define capacity (max burst tolerance) and refill_rate (tokens/sec).

class TokenBucket {
 constructor(capacity, refillRate) {
 this.capacity = capacity;
 this.tokens = capacity;
 this.refillRate = refillRate;
 this.lastRefill = Date.now();
 }

 consume() {
 this._refill();
 if (this.tokens > 0) {
 this.tokens--;
 return true;
 }
 return false;
 }

 _refill() {
 const now = Date.now();
 const elapsed = (now - this.lastRefill) / 1000;
 this.tokens = Math.min(this.capacity, this.tokens + (elapsed * this.refillRate));
 this.lastRefill = now;
 }
}

2. SSE Emitter Wrapper & Priority Buffering

Wrap the SSE emitter in a middleware layer. Check tokens > 0 before calling res.write(). When the bucket empties, buffer high-priority events in a fixed-size queue and discard low-priority telemetry.

const HIGH_PRIORITY = ['state_update', 'auth', 'transaction'];
const MAX_QUEUE_DEPTH = 500;

function createSSEEmitter(res, bucket) {
 const pendingQueue = [];
 let isDraining = false;

 res.on('drain', () => {
 isDraining = false;
 flushQueue();
 });

 function flushQueue() {
 while (pendingQueue.length > 0 && bucket.consume()) {
 const event = pendingQueue.shift();
 const written = res.write(`data: ${JSON.stringify(event)}\n\n`);
 if (!written) {
 // TCP buffer full, pause until drain
 isDraining = true;
 break;
 }
 }
 }

 return function emit(event, priority = 'normal') {
 if (bucket.consume()) {
 const written = res.write(`data: ${JSON.stringify(event)}\n\n`);
 if (!written) isDraining = true;
 } else {
 // Bucket empty: apply priority filtering
 if (HIGH_PRIORITY.includes(event.type) && pendingQueue.length < MAX_QUEUE_DEPTH) {
 pendingQueue.push(event);
 }
 // Discard low-priority telemetry to prevent memory leaks
 }
 };
}

3. Integration & Flow Control

Tie queue flushes to the underlying stream drain event to respect TCP flow control and avoid blocking the event loop. Schedule precise replenishment implicitly via the _refill() call on each consume() attempt. Integrate this pacing logic into your broader Rate Limiting & Backpressure Handling strategy to ensure consistent client-side parsing and predictable latency.

Server Setup Snippet:

const http = require('http');

http.createServer((req, res) => {
 if (req.url === '/stream') {
 res.writeHead(200, {
 'Content-Type': 'text/event-stream',
 'Cache-Control': 'no-cache',
 'Connection': 'keep-alive',
 'X-Accel-Buffering': 'no' // Disable NGINX buffering
 });

 const bucket = new TokenBucket(10, 5); // 10 burst, 5 tokens/sec
 const emit = createSSEEmitter(res, bucket);

 // Simulate upstream producer
 const interval = setInterval(() => {
 emit({ type: 'telemetry', ts: Date.now() }, 'normal');
 }, 50);

 req.on('close', () => clearInterval(interval));
 }
}).listen(3000);

Validation & Monitoring

Do not deploy without instrumentation. Track the following metrics to verify pacing stability:

Metric Target Alert Threshold
tokens_consumed_per_sec Matches refill_rate ±10% > refill_rate * 1.2
queue_depth < 50ms latency > MAX_QUEUE_DEPTH * 0.8
dropped_events < 0.1% of total > 1% sustained

Client-Side Validation Steps:

  1. Open DevTools Console. Attach a listener to EventSource.readyState transitions.
  2. Monitor lastEventId gaps. A gap > 1 indicates silent drops or out-of-order delivery.
  3. Simulate bursty upstream producers using k6 or autocannon. Verify SSE payloads remain evenly spaced without triggering proxy timeouts (e.g., NGINX proxy_read_timeout).
  4. Adjust capacity and refill_rate until queue depth stabilizes below 50ms and client disconnect rates drop below 0.1%.

Production Tuning: