State-Management Integration for SSE Permalink to this section

Part of Frontend Consumption & Client Patterns.

An SSE connection is a raw byte stream. Turning it into reliable application state requires answering questions the EventSource API deliberately ignores: where do events land in your store, how do you deduplicate replayed events after a reconnect, who wins when an optimistic update conflicts with a server event, and what happens to your store when 300 events arrive in two seconds? This guide covers the architectural decisions and production-ready code for wiring SSE into Redux Toolkit, Pinia, and Zustand, with attention to normalization, ordering guarantees, and memory bounds.

SSE to State-Management Data Flow Diagram showing data flowing from SSE server through EventSource, a middleware/composable layer, into a store, and finally into UI components. Reconnection and dedup logic is annotated on the path. SSE Server text/event-stream EventSource onmessage / addEventListener HTTP/2 Middleware / Composable parse · dedup · batch dispatch actions events Store Redux / Pinia / Zustand actions UI Components selectors / reactivity Auto-reconnect sends Last-Event-ID header Dedup by event id seenIds Set / LRU cache Batching / throttle prevents render-storm — solid: data path - - dashed: control / feedback
SSE event flow from server through EventSource, middleware, and into the application store, with reconnection, dedup, and batching annotated.

How the Integration Works Permalink to this section

The core tension Permalink to this section

EventSource pushes events as they arrive. State managers are pull-based: components subscribe to slices and re-render when those slices change. Bridging them means solving three problems simultaneously:

  1. Routing — which event type maps to which slice and action?
  2. Ordering / idempotency — reconnects replay events from Last-Event-ID; you must not apply the same mutation twice.
  3. Throughput control — 50 events per second will cause 50 renders per second unless you batch.

The SSE wire format (covered in Understanding the Event Stream Format) provides id, event, and data fields. The event field maps cleanly to store action types. The id field is your deduplication key and replay cursor.

Event field → action type mapping Permalink to this section

id: 42
event: order_updated
data: {"orderId":"ORD-99","status":"shipped","ts":1718900000}

id: 43
event: inventory_adjusted
data: {"sku":"WIDGET-7","delta":-1,"ts":1718900001}

The event field becomes the action type (or a lookup key for one). The data field is the payload. The id field gates deduplication and is stored as lastEventId for reconnect — see Event ID & Retry Mechanism Design for the protocol semantics.

Redux Toolkit Integration Permalink to this section

Middleware approach Permalink to this section

The cleanest Redux integration uses a custom middleware that owns the EventSource lifecycle. This keeps SSE concerns out of components and thunks.

// store/sseMiddleware.ts
import { Middleware } from "@reduxjs/toolkit";

interface SseOptions {
  url: string;
  // Map SSE event names to RTK action creators
  eventMap: Record<string, (payload: unknown) => { type: string; payload: unknown }>;
  // Max events to buffer per animation frame
  batchSize?: number;
}

export function createSseMiddleware(opts: SseOptions): Middleware {
  return (store) => (next) => (action) => {
    // Intercept a start action to open the connection
    if (action.type === "sse/connect") {
      const es = new EventSource(opts.url, { withCredentials: true });
      // Track seen IDs to dedup replayed events (bounded Set via LRU below)
      const seenIds = new Set<string>();
      let pendingBatch: Array<{ type: string; payload: unknown }> = [];
      let rafHandle = 0;

      function flush() {
        if (pendingBatch.length === 0) return;
        const batch = pendingBatch.splice(0, opts.batchSize ?? 50);
        // Dispatch each action; RTK batches re-renders via React 18 automatic batching
        batch.forEach((a) => store.dispatch(a));
        rafHandle = 0;
      }

      Object.entries(opts.eventMap).forEach(([eventName, creator]) => {
        es.addEventListener(eventName, (e: MessageEvent) => {
          const msgEvent = e as MessageEvent & { lastEventId: string };
          const id = msgEvent.lastEventId;

          // Dedup: skip events we have already processed
          if (id && seenIds.has(id)) return;
          if (id) {
            seenIds.add(id);
            // Evict oldest entries to bound memory (keep last 1 000)
            if (seenIds.size > 1000) {
              const first = seenIds.values().next().value;
              seenIds.delete(first);
            }
          }

          let payload: unknown;
          try {
            payload = JSON.parse((e as MessageEvent).data);
          } catch {
            console.warn("[sse] unparseable data", (e as MessageEvent).data);
            return;
          }

          pendingBatch.push(creator(payload));

          // Coalesce into one render frame
          if (!rafHandle) {
            rafHandle = requestAnimationFrame(flush);
          }
        });
      });

      es.onerror = () => {
        store.dispatch({ type: "sse/error" });
        // EventSource auto-reconnects; no manual retry needed
      };

      // Store es on the action so the stop handler can close it
      return next({ ...action, payload: { es } });
    }

    if (action.type === "sse/disconnect") {
      const es = action.payload?.es as EventSource | undefined;
      es?.close();
    }

    return next(action);
  };
}
// store/index.ts
import { configureStore } from "@reduxjs/toolkit";
import { createSseMiddleware } from "./sseMiddleware";
import { orderUpdated, inventoryAdjusted } from "./slices";

const sseMiddleware = createSseMiddleware({
  url: "/api/events",
  eventMap: {
    order_updated: orderUpdated,       // RTK action creator
    inventory_adjusted: inventoryAdjusted,
  },
  batchSize: 30,
});

export const store = configureStore({
  reducer: { orders: ordersReducer, inventory: inventoryReducer },
  middleware: (getDefault) => getDefault().concat(sseMiddleware),
});

Normalizing incoming events Permalink to this section

Append-only lists balloon memory. Use RTK’s createEntityAdapter to upsert by ID; duplicate events from replays become no-ops at the reducer level.

// store/slices/ordersSlice.ts
import { createSlice, createEntityAdapter, PayloadAction } from "@reduxjs/toolkit";

interface Order {
  orderId: string;
  status: string;
  ts: number;
}

const adapter = createEntityAdapter<Order>({ selectId: (o) => o.orderId });

const ordersSlice = createSlice({
  name: "orders",
  initialState: adapter.getInitialState(),
  reducers: {
    // RTK action creator exported and used in sseMiddleware eventMap
    orderUpdated: (state, action: PayloadAction<Order>) => {
      const existing = state.entities[action.payload.orderId];
      // Server-truth wins only if the incoming event is newer
      if (!existing || action.payload.ts > existing.ts) {
        adapter.upsertOne(state, action.payload);
      }
    },
  },
});

export const { orderUpdated } = ordersSlice.actions;
export const ordersSelectors = adapter.getSelectors(
  (s: RootState) => s.orders
);
export default ordersSlice.reducer;

Pinia Integration (Vue 3) Permalink to this section

For Vue applications, a Pinia store with a dedicated useOrdersStream composable keeps SSE lifecycle bound to a component tree. See Vue EventSource Composables for the lower-level composable pattern; here we lift it into a Pinia action.

// stores/orders.ts
import { defineStore } from "pinia";
import { ref, shallowRef } from "vue";

interface Order { orderId: string; status: string; ts: number }

export const useOrdersStore = defineStore("orders", () => {
  // shallowRef: Vue does not need deep reactivity on the Map itself
  const orders = shallowRef(new Map<string, Order>());
  const connectionStatus = ref<"connecting" | "open" | "error" | "closed">("closed");
  let es: EventSource | null = null;
  const seenIds = new Set<string>();

  function connect(url: string) {
    if (es) return; // already connected
    connectionStatus.value = "connecting";
    es = new EventSource(url, { withCredentials: true });

    es.addEventListener("order_updated", (e: Event) => {
      const msgEvent = e as MessageEvent;
      const id = (msgEvent as MessageEvent & { lastEventId: string }).lastEventId;
      if (id && seenIds.has(id)) return;
      if (id) seenIds.add(id);

      const order: Order = JSON.parse(msgEvent.data);
      // Immutable swap: replace Map to trigger shallowRef reactivity
      const next = new Map(orders.value);
      const prev = next.get(order.orderId);
      if (!prev || order.ts > prev.ts) {
        next.set(order.orderId, order);
        orders.value = next;
      }
    });

    es.onopen = () => { connectionStatus.value = "open"; };
    es.onerror = () => { connectionStatus.value = "error"; };
  }

  function disconnect() {
    es?.close();
    es = null;
    connectionStatus.value = "closed";
  }

  return { orders, connectionStatus, connect, disconnect };
});
<!-- OrderDashboard.vue -->
<script setup lang="ts">
import { onMounted, onUnmounted } from "vue";
import { storeToRefs } from "pinia";
import { useOrdersStore } from "@/stores/orders";

const store = useOrdersStore();
const { orders, connectionStatus } = storeToRefs(store);

onMounted(() => store.connect("/api/events"));
onUnmounted(() => store.disconnect());
</script>

<template>
  <p>Status: {{ connectionStatus }}</p>
  <ul>
    <li v-for="[id, o] in orders" :key="id">
      {{ o.orderId }} — {{ o.status }}
    </li>
  </ul>
</template>

Why shallowRef over ref Permalink to this section

A ref(new Map()) makes Vue track every nested property recursively. For a store holding thousands of orders, each SSE event triggers a deep proxy walk. shallowRef limits reactivity to the reference itself; swapping the Map reference on each mutation triggers a single shallow comparison and re-render.

Zustand Integration Permalink to this section

Zustand’s minimal API is a natural fit for SSE: the store holds both data and the connection.

// store/useOrderStream.ts
import { create } from "zustand";
import { immer } from "zustand/middleware/immer";

interface Order { orderId: string; status: string; ts: number }

interface OrderStreamState {
  orders: Record<string, Order>;
  status: "idle" | "connecting" | "open" | "error";
  es: EventSource | null;
  seenIds: Set<string>;
  connect: (url: string) => void;
  disconnect: () => void;
  _upsertOrder: (order: Order) => void;
}

export const useOrderStream = create<OrderStreamState>()(
  immer((set, get) => ({
    orders: {},
    status: "idle",
    es: null,
    seenIds: new Set(),

    connect(url) {
      if (get().es) return;
      set((s) => { s.status = "connecting"; });
      const es = new EventSource(url, { withCredentials: true });

      es.addEventListener("order_updated", (raw: Event) => {
        const e = raw as MessageEvent & { lastEventId: string };
        const id = e.lastEventId;
        const seen = get().seenIds;
        if (id && seen.has(id)) return;
        if (id) seen.add(id);
        const order: Order = JSON.parse(e.data);
        get()._upsertOrder(order);
      });

      es.onopen = () => set((s) => { s.status = "open"; });
      es.onerror = () => set((s) => { s.status = "error"; });

      set((s) => { s.es = es; });
    },

    disconnect() {
      get().es?.close();
      set((s) => { s.es = null; s.status = "idle"; });
    },

    _upsertOrder(order) {
      set((s) => {
        const prev = s.orders[order.orderId];
        // Server-truth: accept only if incoming event is newer
        if (!prev || order.ts > prev.ts) {
          s.orders[order.orderId] = order;
        }
      });
    },
  }))
);
// Component usage (React)
function OrderList() {
  const { orders, status, connect, disconnect } = useOrderStream();

  useEffect(() => {
    connect("/api/events");
    return () => disconnect();
  }, []);

  return (
    <ul>
      {Object.values(orders).map((o) => (
        <li key={o.orderId}>{o.orderId}{o.status}</li>
      ))}
    </ul>
  );
}

Zustand’s immer middleware lets you write mutable-looking updates that produce new immutable state, which React’s reconciler can diff cheaply.

Optimistic Updates vs Server Truth Permalink to this section

Pattern When to use Risk Recovery
Optimistic-first Mutation latency < 200 ms; UX demands instant feedback Server rejects or sends different value Rollback on error event or on mismatch
Server-truth-only Financial data, inventory, anything audited Visible lag on slow connections Skeleton loaders + Error Handling & Reconnection UX
Merge by timestamp Mixed read/write workloads Clock skew between clients Use logical clocks (Lamport) or server-assigned sequence numbers

For optimistic updates, store the tentative state under a different key and reconcile on the SSE confirmation event:

// In your Redux slice:
reducers: {
  // Dispatched immediately on user action
  orderOptimisticUpdate: (state, action: PayloadAction<Order>) => {
    adapter.upsertOne(state, { ...action.payload, _optimistic: true });
  },
  // Dispatched when SSE confirms the mutation
  orderUpdated: (state, action: PayloadAction<Order>) => {
    // Authoritative: overwrite regardless of _optimistic flag
    adapter.upsertOne(state, { ...action.payload, _optimistic: false });
  },
  // Dispatched on REST mutation error response
  orderOptimisticRollback: (state, action: PayloadAction<string>) => {
    adapter.removeOne(state, action.payload); // or restore previous
  },
}

Edge Cases & Network Interference Permalink to this section

SSE streams traverse the same network stack as any HTTP request. Several layers can corrupt the integration silently.

Proxy buffering Permalink to this section

Nginx and AWS ALB default to response buffering. A buffered proxy accumulates your events and delivers them in bursts — or not at all if the buffer fills. The store receives a flood of stale events simultaneously, causing render storms.

# nginx: disable buffering for /api/events
location /api/events {
    proxy_pass          http://backend;
    proxy_buffering     off;
    proxy_cache         off;
    proxy_set_header    X-Accel-Buffering no;  # for nginx sub-proxies
    proxy_read_timeout  86400s;                # keep alive for long-lived streams
}

When events arrive in bursts due to buffering, your dedup set helps, but you also need the batching/RAF pattern above to avoid committing hundreds of state updates synchronously.

CDN edge caching Permalink to this section

Most CDNs treat text/event-stream responses as non-cacheable if Cache-Control: no-cache is set, but verify. Cloudflare, for instance, supports SSE natively on Workers but will still buffer by default on proxied origins — configure cf-cache-status to confirm a MISS. For scaling SSE across edge nodes, see Scaling SSE Across Multiple Nodes with Redis.

Reconnect replay and dedup Permalink to this section

The Event ID & Retry Mechanism Design spec guarantees that the browser sends Last-Event-ID on reconnect. The server replays missed events. Without client-side deduplication your store applies the same mutations twice:

  • Use a bounded LRU set keyed on lastEventId (1 000–5 000 entries is typically sufficient).
  • Numeric IDs: keep track of the highest seen; ignore anything ≤ that value if the ID space is monotonically increasing. See Generating Monotonic Event IDs for SSE.

Tab visibility and stale state Permalink to this section

When a user switches tabs, EventSource may be throttled or closed by the browser. On resume, reconnect delivers missed events; the store may hold stale data for minutes. Apply a “freshness window”: if reconnect lag > N seconds, fetch current state via REST before resuming SSE.

es.onopen = () => {
  const lagMs = Date.now() - lastConnectedAt;
  if (lagMs > 30_000) {
    // Stale: fetch authoritative snapshot first
    fetchOrderSnapshot().then((orders) => store.dispatch(ordersReset(orders)));
  }
  lastConnectedAt = Date.now();
};

Performance & Scale Considerations Permalink to this section

Render frequency Permalink to this section

Each set / dispatch call schedules a re-render. At 100 events/s with 50 subscribed components, you are scheduling 5 000 re-renders per second — well beyond 60 fps. Three mitigations:

  1. requestAnimationFrame batching (shown in the Redux middleware above): coalesce all events in a single frame into one dispatch call.
  2. Debounce / throttle at the handler level: acceptable for display-only data; unacceptable for financial transactions where every event matters.
  3. React 18 automatic batching: all state updates inside async callbacks are already batched in React 18+, but explicit unstable_batchedUpdates remains needed in React 17.

Memory bounds Permalink to this section

Resource Risk Mitigation
seenIds Set Unbounded growth over long sessions LRU eviction at 1 000–5 000 entries
Normalized entity store Unlimited accumulation of records Pagination cursor: remove entities outside visible window
Pending batch array Large burst before RAF fires Cap batch at batchSize; process remainder on next frame
EventSource object Leaked if component unmounts without .close() Always clean up in useEffect return / onUnmounted

Connection count Permalink to this section

EventSource opens one HTTP/1.1 or HTTP/2 connection per instance. With HTTP/1.1, browsers cap per-origin connections at 6 — a single SSE stream consumes one slot permanently. Prefer HTTP/2 to multiplex. If your store architecture requires one stream per resource type, merge them into a single multiplexed endpoint:

event: order_updated
data: {...}

event: inventory_adjusted
data: {...}

Route by event field in the middleware rather than opening multiple EventSource objects.

Validation & Debugging Permalink to this section

Verify the event stream with curl Permalink to this section

# Confirm events arrive and carry correct ids
curl -N -H "Accept: text/event-stream" https://api.example.com/api/events

Expected output:

: keepalive

id: 100
event: order_updated
data: {"orderId":"ORD-1","status":"processing","ts":1718900100}

id: 101
event: inventory_adjusted
data: {"sku":"WIDGET-7","delta":-1,"ts":1718900101}

Chrome DevTools Permalink to this section

  1. Network → Filter: EventStream — select your SSE request to see the event log tab. Verify id and event fields appear on each message.
  2. Application → EventSource (Chrome 120+) — shows connection state, lastEventId, and a live event list with timestamps.
  3. Performance profiler — record a 5-second interval; check for “Long Task” markers caused by synchronous store updates.

Structured logging in the middleware Permalink to this section

// Add inside the event listener before dispatching:
if (process.env.NODE_ENV !== "production") {
  console.debug("[sse]", {
    event: eventName,
    id,
    payload,
    seenIds: seenIds.size,
    pending: pendingBatch.length,
    ts: Date.now(),
  });
}

In production, emit these fields to your observability pipeline (Datadog, Grafana, etc.) sampled at 1% of events:

if (Math.random() < 0.01) {
  telemetry.count("sse.event", 1, { event: eventName });
  telemetry.gauge("sse.seen_ids_size", seenIds.size);
}

Confirming dedup is working Permalink to this section

Force a reconnect (disable network in DevTools, re-enable) and watch the event log. Events with IDs already in seenIds should not produce new store entries. Add a Redux DevTools trace: the action list should not show duplicate orderUpdated actions for the same order ID and timestamp.

⚡ Production Directives

  • Disable proxy buffering (`proxy_buffering off` / `X-Accel-Buffering: no`) before deploying SSE behind nginx or ALB — buffering is the most common cause of event delivery failures.
  • Bound your seenIds Set to 1 000–5 000 entries with LRU eviction; unbounded growth is a memory leak in long-lived sessions.
  • Batch store dispatches with requestAnimationFrame or React 18 automatic batching; never dispatch one action per SSE event at high-frequency streams.
  • After a reconnect gap longer than 30 seconds, fetch an authoritative REST snapshot before resuming SSE to prevent serving stale state.
  • Use a single multiplexed SSE endpoint with multiple event types rather than multiple EventSource instances to preserve HTTP/1.1 connection budget.

Production Checklist Permalink to this section

Frequently Asked Questions Permalink to this section

Should I open one EventSource per feature (orders, inventory) or one shared connection?

One shared connection, almost always. HTTP/1.1 browsers allow only 6 connections per origin; each EventSource permanently holds one. With HTTP/2 the connection limit is less pressing, but a shared stream simplifies reconnect logic and reduces server-side connection tracking overhead. Route by the event field in your middleware to separate concerns.

What happens to optimistic updates when the network drops?

The EventSource fires onerror and starts reconnecting. During that window, any pending optimistic state is visible but unconfirmed. Set a staleness timer: if no confirming SSE event arrives within N seconds (try 10 s), roll back the optimistic entry and surface an error to the user. On reconnect, the server replay will deliver the authoritative state.

How do I handle SSE events that arrive before the Redux store is ready?

The custom middleware approach avoids this: the middleware intercepts a sse/connect action dispatched after the store is configured. If you open EventSource in module-level code before configureStore, you risk lost events. Delay connecting until after the store is initialised, or buffer events in a module-level queue and drain them as the first middleware action.

Can I use RTK Query alongside SSE middleware?

Yes. RTK Query handles REST queries and mutations; the SSE middleware handles push updates. The two write to the same entity cache by dispatching the same action creators RTK Query generates. On optimistic mutation, RTK Query's onQueryStarted / pessimisticUpdate helper manages rollback; the SSE confirmation event then calls updateQueryData to commit the server state.

How do I test the SSE middleware without a real server?

Use jest.spyOn(window, 'EventSource') or a mock EventSource library that lets you programmatically emit events. Wire the mock into the middleware in your test setup, dispatch sse/connect, emit synthetic events, and assert store state. For integration tests, use msw (Mock Service Worker) with its http.get handler returning a ReadableStream of SSE-formatted text.

Deep Dives