State-Management Integration for SSE Permalink to this section
Part of Frontend Consumption & Client Patterns.
An SSE connection is a raw byte stream. Turning it into reliable application state requires answering questions the EventSource API deliberately ignores: where do events land in your store, how do you deduplicate replayed events after a reconnect, who wins when an optimistic update conflicts with a server event, and what happens to your store when 300 events arrive in two seconds? This guide covers the architectural decisions and production-ready code for wiring SSE into Redux Toolkit, Pinia, and Zustand, with attention to normalization, ordering guarantees, and memory bounds.
How the Integration Works Permalink to this section
The core tension Permalink to this section
EventSource pushes events as they arrive. State managers are pull-based: components subscribe to slices and re-render when those slices change. Bridging them means solving three problems simultaneously:
- Routing — which event type maps to which slice and action?
- Ordering / idempotency — reconnects replay events from
Last-Event-ID; you must not apply the same mutation twice. - Throughput control — 50 events per second will cause 50 renders per second unless you batch.
The SSE wire format (covered in Understanding the Event Stream Format) provides id, event, and data fields. The event field maps cleanly to store action types. The id field is your deduplication key and replay cursor.
Event field → action type mapping Permalink to this section
id: 42
event: order_updated
data: {"orderId":"ORD-99","status":"shipped","ts":1718900000}
id: 43
event: inventory_adjusted
data: {"sku":"WIDGET-7","delta":-1,"ts":1718900001}
The event field becomes the action type (or a lookup key for one). The data field is the payload. The id field gates deduplication and is stored as lastEventId for reconnect — see Event ID & Retry Mechanism Design for the protocol semantics.
Redux Toolkit Integration Permalink to this section
Middleware approach Permalink to this section
The cleanest Redux integration uses a custom middleware that owns the EventSource lifecycle. This keeps SSE concerns out of components and thunks.
// store/sseMiddleware.ts
import { Middleware } from "@reduxjs/toolkit";
interface SseOptions {
url: string;
// Map SSE event names to RTK action creators
eventMap: Record<string, (payload: unknown) => { type: string; payload: unknown }>;
// Max events to buffer per animation frame
batchSize?: number;
}
export function createSseMiddleware(opts: SseOptions): Middleware {
return (store) => (next) => (action) => {
// Intercept a start action to open the connection
if (action.type === "sse/connect") {
const es = new EventSource(opts.url, { withCredentials: true });
// Track seen IDs to dedup replayed events (bounded Set via LRU below)
const seenIds = new Set<string>();
let pendingBatch: Array<{ type: string; payload: unknown }> = [];
let rafHandle = 0;
function flush() {
if (pendingBatch.length === 0) return;
const batch = pendingBatch.splice(0, opts.batchSize ?? 50);
// Dispatch each action; RTK batches re-renders via React 18 automatic batching
batch.forEach((a) => store.dispatch(a));
rafHandle = 0;
}
Object.entries(opts.eventMap).forEach(([eventName, creator]) => {
es.addEventListener(eventName, (e: MessageEvent) => {
const msgEvent = e as MessageEvent & { lastEventId: string };
const id = msgEvent.lastEventId;
// Dedup: skip events we have already processed
if (id && seenIds.has(id)) return;
if (id) {
seenIds.add(id);
// Evict oldest entries to bound memory (keep last 1 000)
if (seenIds.size > 1000) {
const first = seenIds.values().next().value;
seenIds.delete(first);
}
}
let payload: unknown;
try {
payload = JSON.parse((e as MessageEvent).data);
} catch {
console.warn("[sse] unparseable data", (e as MessageEvent).data);
return;
}
pendingBatch.push(creator(payload));
// Coalesce into one render frame
if (!rafHandle) {
rafHandle = requestAnimationFrame(flush);
}
});
});
es.onerror = () => {
store.dispatch({ type: "sse/error" });
// EventSource auto-reconnects; no manual retry needed
};
// Store es on the action so the stop handler can close it
return next({ ...action, payload: { es } });
}
if (action.type === "sse/disconnect") {
const es = action.payload?.es as EventSource | undefined;
es?.close();
}
return next(action);
};
}
// store/index.ts
import { configureStore } from "@reduxjs/toolkit";
import { createSseMiddleware } from "./sseMiddleware";
import { orderUpdated, inventoryAdjusted } from "./slices";
const sseMiddleware = createSseMiddleware({
url: "/api/events",
eventMap: {
order_updated: orderUpdated, // RTK action creator
inventory_adjusted: inventoryAdjusted,
},
batchSize: 30,
});
export const store = configureStore({
reducer: { orders: ordersReducer, inventory: inventoryReducer },
middleware: (getDefault) => getDefault().concat(sseMiddleware),
});
Normalizing incoming events Permalink to this section
Append-only lists balloon memory. Use RTK’s createEntityAdapter to upsert by ID; duplicate events from replays become no-ops at the reducer level.
// store/slices/ordersSlice.ts
import { createSlice, createEntityAdapter, PayloadAction } from "@reduxjs/toolkit";
interface Order {
orderId: string;
status: string;
ts: number;
}
const adapter = createEntityAdapter<Order>({ selectId: (o) => o.orderId });
const ordersSlice = createSlice({
name: "orders",
initialState: adapter.getInitialState(),
reducers: {
// RTK action creator exported and used in sseMiddleware eventMap
orderUpdated: (state, action: PayloadAction<Order>) => {
const existing = state.entities[action.payload.orderId];
// Server-truth wins only if the incoming event is newer
if (!existing || action.payload.ts > existing.ts) {
adapter.upsertOne(state, action.payload);
}
},
},
});
export const { orderUpdated } = ordersSlice.actions;
export const ordersSelectors = adapter.getSelectors(
(s: RootState) => s.orders
);
export default ordersSlice.reducer;
Pinia Integration (Vue 3) Permalink to this section
For Vue applications, a Pinia store with a dedicated useOrdersStream composable keeps SSE lifecycle bound to a component tree. See Vue EventSource Composables for the lower-level composable pattern; here we lift it into a Pinia action.
// stores/orders.ts
import { defineStore } from "pinia";
import { ref, shallowRef } from "vue";
interface Order { orderId: string; status: string; ts: number }
export const useOrdersStore = defineStore("orders", () => {
// shallowRef: Vue does not need deep reactivity on the Map itself
const orders = shallowRef(new Map<string, Order>());
const connectionStatus = ref<"connecting" | "open" | "error" | "closed">("closed");
let es: EventSource | null = null;
const seenIds = new Set<string>();
function connect(url: string) {
if (es) return; // already connected
connectionStatus.value = "connecting";
es = new EventSource(url, { withCredentials: true });
es.addEventListener("order_updated", (e: Event) => {
const msgEvent = e as MessageEvent;
const id = (msgEvent as MessageEvent & { lastEventId: string }).lastEventId;
if (id && seenIds.has(id)) return;
if (id) seenIds.add(id);
const order: Order = JSON.parse(msgEvent.data);
// Immutable swap: replace Map to trigger shallowRef reactivity
const next = new Map(orders.value);
const prev = next.get(order.orderId);
if (!prev || order.ts > prev.ts) {
next.set(order.orderId, order);
orders.value = next;
}
});
es.onopen = () => { connectionStatus.value = "open"; };
es.onerror = () => { connectionStatus.value = "error"; };
}
function disconnect() {
es?.close();
es = null;
connectionStatus.value = "closed";
}
return { orders, connectionStatus, connect, disconnect };
});
<!-- OrderDashboard.vue -->
<script setup lang="ts">
import { onMounted, onUnmounted } from "vue";
import { storeToRefs } from "pinia";
import { useOrdersStore } from "@/stores/orders";
const store = useOrdersStore();
const { orders, connectionStatus } = storeToRefs(store);
onMounted(() => store.connect("/api/events"));
onUnmounted(() => store.disconnect());
</script>
<template>
<p>Status: {{ connectionStatus }}</p>
<ul>
<li v-for="[id, o] in orders" :key="id">
{{ o.orderId }} — {{ o.status }}
</li>
</ul>
</template>
Why shallowRef over ref Permalink to this section
A ref(new Map()) makes Vue track every nested property recursively. For a store holding thousands of orders, each SSE event triggers a deep proxy walk. shallowRef limits reactivity to the reference itself; swapping the Map reference on each mutation triggers a single shallow comparison and re-render.
Zustand Integration Permalink to this section
Zustand’s minimal API is a natural fit for SSE: the store holds both data and the connection.
// store/useOrderStream.ts
import { create } from "zustand";
import { immer } from "zustand/middleware/immer";
interface Order { orderId: string; status: string; ts: number }
interface OrderStreamState {
orders: Record<string, Order>;
status: "idle" | "connecting" | "open" | "error";
es: EventSource | null;
seenIds: Set<string>;
connect: (url: string) => void;
disconnect: () => void;
_upsertOrder: (order: Order) => void;
}
export const useOrderStream = create<OrderStreamState>()(
immer((set, get) => ({
orders: {},
status: "idle",
es: null,
seenIds: new Set(),
connect(url) {
if (get().es) return;
set((s) => { s.status = "connecting"; });
const es = new EventSource(url, { withCredentials: true });
es.addEventListener("order_updated", (raw: Event) => {
const e = raw as MessageEvent & { lastEventId: string };
const id = e.lastEventId;
const seen = get().seenIds;
if (id && seen.has(id)) return;
if (id) seen.add(id);
const order: Order = JSON.parse(e.data);
get()._upsertOrder(order);
});
es.onopen = () => set((s) => { s.status = "open"; });
es.onerror = () => set((s) => { s.status = "error"; });
set((s) => { s.es = es; });
},
disconnect() {
get().es?.close();
set((s) => { s.es = null; s.status = "idle"; });
},
_upsertOrder(order) {
set((s) => {
const prev = s.orders[order.orderId];
// Server-truth: accept only if incoming event is newer
if (!prev || order.ts > prev.ts) {
s.orders[order.orderId] = order;
}
});
},
}))
);
// Component usage (React)
function OrderList() {
const { orders, status, connect, disconnect } = useOrderStream();
useEffect(() => {
connect("/api/events");
return () => disconnect();
}, []);
return (
<ul>
{Object.values(orders).map((o) => (
<li key={o.orderId}>{o.orderId} — {o.status}</li>
))}
</ul>
);
}
Zustand’s immer middleware lets you write mutable-looking updates that produce new immutable state, which React’s reconciler can diff cheaply.
Optimistic Updates vs Server Truth Permalink to this section
| Pattern | When to use | Risk | Recovery |
|---|---|---|---|
| Optimistic-first | Mutation latency < 200 ms; UX demands instant feedback | Server rejects or sends different value | Rollback on error event or on mismatch |
| Server-truth-only | Financial data, inventory, anything audited | Visible lag on slow connections | Skeleton loaders + Error Handling & Reconnection UX |
| Merge by timestamp | Mixed read/write workloads | Clock skew between clients | Use logical clocks (Lamport) or server-assigned sequence numbers |
For optimistic updates, store the tentative state under a different key and reconcile on the SSE confirmation event:
// In your Redux slice:
reducers: {
// Dispatched immediately on user action
orderOptimisticUpdate: (state, action: PayloadAction<Order>) => {
adapter.upsertOne(state, { ...action.payload, _optimistic: true });
},
// Dispatched when SSE confirms the mutation
orderUpdated: (state, action: PayloadAction<Order>) => {
// Authoritative: overwrite regardless of _optimistic flag
adapter.upsertOne(state, { ...action.payload, _optimistic: false });
},
// Dispatched on REST mutation error response
orderOptimisticRollback: (state, action: PayloadAction<string>) => {
adapter.removeOne(state, action.payload); // or restore previous
},
}
Edge Cases & Network Interference Permalink to this section
SSE streams traverse the same network stack as any HTTP request. Several layers can corrupt the integration silently.
Proxy buffering Permalink to this section
Nginx and AWS ALB default to response buffering. A buffered proxy accumulates your events and delivers them in bursts — or not at all if the buffer fills. The store receives a flood of stale events simultaneously, causing render storms.
# nginx: disable buffering for /api/events
location /api/events {
proxy_pass http://backend;
proxy_buffering off;
proxy_cache off;
proxy_set_header X-Accel-Buffering no; # for nginx sub-proxies
proxy_read_timeout 86400s; # keep alive for long-lived streams
}
When events arrive in bursts due to buffering, your dedup set helps, but you also need the batching/RAF pattern above to avoid committing hundreds of state updates synchronously.
CDN edge caching Permalink to this section
Most CDNs treat text/event-stream responses as non-cacheable if Cache-Control: no-cache is set, but verify. Cloudflare, for instance, supports SSE natively on Workers but will still buffer by default on proxied origins — configure cf-cache-status to confirm a MISS. For scaling SSE across edge nodes, see Scaling SSE Across Multiple Nodes with Redis.
Reconnect replay and dedup Permalink to this section
The Event ID & Retry Mechanism Design spec guarantees that the browser sends Last-Event-ID on reconnect. The server replays missed events. Without client-side deduplication your store applies the same mutations twice:
- Use a bounded LRU set keyed on
lastEventId(1 000–5 000 entries is typically sufficient). - Numeric IDs: keep track of the highest seen; ignore anything ≤ that value if the ID space is monotonically increasing. See Generating Monotonic Event IDs for SSE.
Tab visibility and stale state Permalink to this section
When a user switches tabs, EventSource may be throttled or closed by the browser. On resume, reconnect delivers missed events; the store may hold stale data for minutes. Apply a “freshness window”: if reconnect lag > N seconds, fetch current state via REST before resuming SSE.
es.onopen = () => {
const lagMs = Date.now() - lastConnectedAt;
if (lagMs > 30_000) {
// Stale: fetch authoritative snapshot first
fetchOrderSnapshot().then((orders) => store.dispatch(ordersReset(orders)));
}
lastConnectedAt = Date.now();
};
Performance & Scale Considerations Permalink to this section
Render frequency Permalink to this section
Each set / dispatch call schedules a re-render. At 100 events/s with 50 subscribed components, you are scheduling 5 000 re-renders per second — well beyond 60 fps. Three mitigations:
- requestAnimationFrame batching (shown in the Redux middleware above): coalesce all events in a single frame into one dispatch call.
- Debounce / throttle at the handler level: acceptable for display-only data; unacceptable for financial transactions where every event matters.
- React 18 automatic batching: all state updates inside async callbacks are already batched in React 18+, but explicit
unstable_batchedUpdatesremains needed in React 17.
Memory bounds Permalink to this section
| Resource | Risk | Mitigation |
|---|---|---|
seenIds Set |
Unbounded growth over long sessions | LRU eviction at 1 000–5 000 entries |
| Normalized entity store | Unlimited accumulation of records | Pagination cursor: remove entities outside visible window |
| Pending batch array | Large burst before RAF fires | Cap batch at batchSize; process remainder on next frame |
EventSource object |
Leaked if component unmounts without .close() |
Always clean up in useEffect return / onUnmounted |
Connection count Permalink to this section
EventSource opens one HTTP/1.1 or HTTP/2 connection per instance. With HTTP/1.1, browsers cap per-origin connections at 6 — a single SSE stream consumes one slot permanently. Prefer HTTP/2 to multiplex. If your store architecture requires one stream per resource type, merge them into a single multiplexed endpoint:
event: order_updated
data: {...}
event: inventory_adjusted
data: {...}
Route by event field in the middleware rather than opening multiple EventSource objects.
Validation & Debugging Permalink to this section
Verify the event stream with curl Permalink to this section
# Confirm events arrive and carry correct ids
curl -N -H "Accept: text/event-stream" https://api.example.com/api/events
Expected output:
: keepalive
id: 100
event: order_updated
data: {"orderId":"ORD-1","status":"processing","ts":1718900100}
id: 101
event: inventory_adjusted
data: {"sku":"WIDGET-7","delta":-1,"ts":1718900101}
Chrome DevTools Permalink to this section
- Network → Filter: EventStream — select your SSE request to see the event log tab. Verify
idandeventfields appear on each message. - Application → EventSource (Chrome 120+) — shows connection state,
lastEventId, and a live event list with timestamps. - Performance profiler — record a 5-second interval; check for “Long Task” markers caused by synchronous store updates.
Structured logging in the middleware Permalink to this section
// Add inside the event listener before dispatching:
if (process.env.NODE_ENV !== "production") {
console.debug("[sse]", {
event: eventName,
id,
payload,
seenIds: seenIds.size,
pending: pendingBatch.length,
ts: Date.now(),
});
}
In production, emit these fields to your observability pipeline (Datadog, Grafana, etc.) sampled at 1% of events:
if (Math.random() < 0.01) {
telemetry.count("sse.event", 1, { event: eventName });
telemetry.gauge("sse.seen_ids_size", seenIds.size);
}
Confirming dedup is working Permalink to this section
Force a reconnect (disable network in DevTools, re-enable) and watch the event log. Events with IDs already in seenIds should not produce new store entries. Add a Redux DevTools trace: the action list should not show duplicate orderUpdated actions for the same order ID and timestamp.
⚡ Production Directives
- Disable proxy buffering (`proxy_buffering off` / `X-Accel-Buffering: no`) before deploying SSE behind nginx or ALB — buffering is the most common cause of event delivery failures.
- Bound your
seenIdsSet to 1 000–5 000 entries with LRU eviction; unbounded growth is a memory leak in long-lived sessions. - Batch store dispatches with
requestAnimationFrameor React 18 automatic batching; never dispatch one action per SSE event at high-frequency streams. - After a reconnect gap longer than 30 seconds, fetch an authoritative REST snapshot before resuming SSE to prevent serving stale state.
- Use a single multiplexed SSE endpoint with multiple
eventtypes rather than multipleEventSourceinstances to preserve HTTP/1.1 connection budget.
Production Checklist Permalink to this section
Frequently Asked Questions Permalink to this section
Should I open one EventSource per feature (orders, inventory) or one shared connection?
One shared connection, almost always. HTTP/1.1 browsers allow only 6 connections per origin; each EventSource permanently holds one. With HTTP/2 the connection limit is less pressing, but a shared stream simplifies reconnect logic and reduces server-side connection tracking overhead. Route by the event field in your middleware to separate concerns.
What happens to optimistic updates when the network drops?
The EventSource fires onerror and starts reconnecting. During that window, any pending optimistic state is visible but unconfirmed. Set a staleness timer: if no confirming SSE event arrives within N seconds (try 10 s), roll back the optimistic entry and surface an error to the user. On reconnect, the server replay will deliver the authoritative state.
How do I handle SSE events that arrive before the Redux store is ready?
The custom middleware approach avoids this: the middleware intercepts a sse/connect action dispatched after the store is configured. If you open EventSource in module-level code before configureStore, you risk lost events. Delay connecting until after the store is initialised, or buffer events in a module-level queue and drain them as the first middleware action.
Can I use RTK Query alongside SSE middleware?
Yes. RTK Query handles REST queries and mutations; the SSE middleware handles push updates. The two write to the same entity cache by dispatching the same action creators RTK Query generates. On optimistic mutation, RTK Query's onQueryStarted / pessimisticUpdate helper manages rollback; the SSE confirmation event then calls updateQueryData to commit the server state.
How do I test the SSE middleware without a real server?
Use jest.spyOn(window, 'EventSource') or a mock EventSource library that lets you programmatically emit events. Wire the mock into the middleware in your test setup, dispatch sse/connect, emit synthetic events, and assert store state. For integration tests, use msw (Mock Service Worker) with its http.get handler returning a ReadableStream of SSE-formatted text.