Managing Memory Buffers in Go Streaming Servers Permalink to this section
Part of Buffer Management & Chunked Transfer Encoding.
Persistent SSE connections keep goroutines alive for minutes or hours, and every uncontrolled allocation compounds across thousands of concurrent streams. The problems usually surface first as OOM kills under load, then as erratic GC pauses that stall writes, then as zombie goroutines that survive client disconnects. This guide covers the exact techniques β sync.Pool, bufio.Writer sizing, http.Flusher discipline, bounded channel queues, and context-scoped teardown β that bring Go SSE servers from unpredictable heap growth to flat, auditable memory profiles.
Symptoms and Developer Intent Permalink to this section
Production symptoms of unbounded buffer growth in Go SSE servers:
- OOM kills at sustained concurrency (>5 k active connections) even when event payload sizes are small.
- GC CPU spike >15 % correlating with event burst periods, visible as
PauseTotalNsdeltas inruntime.MemStats. - Stalled event delivery: clients receive partial payloads or hit read timeouts because
http.Flusher.Flush()was not called after each write, leaving bytes insidenet/httpβs internalbufio.Writer. - Goroutine accumulation:
runtime.NumGoroutine()climbs monotonically after repeated client connects and disconnects, indicating goroutines blocked on channel sends to closed clients.
The goal is deterministic allocation: each SSE connection should touch a bounded, reusable pool of memory, and that memory should be provably released when the connection closes.
Root Cause Analysis Permalink to this section
Why net/http buffers by default Permalink to this section
Goβs net/http response writer wraps the underlying TCP connection in a bufio.Writer with a 4 KB default buffer. For normal request-response cycles this is a throughput win β fewer write(2) syscalls. For SSE it becomes a liability: unless you call http.Flusher.Flush(), serialized data: lines sit inside that buffer until it fills up or the connection is torn down. The client sees silence, then a burst, then possibly a stall if the buffer never reaches its threshold.
Chunked transfer encoding and flush boundaries Permalink to this section
HTTP/1.1 chunked transfer encoding wraps each Flush() callβs output as one chunk. If you write a 200-byte event and flush immediately, the wire sees a single 200-byte chunk. If you write 40 events before flushing, all 40 accumulate in the buffer and the client receives them all at once β defeating the purpose of streaming. See Buffer Management & Chunked Transfer Encoding for how chunk boundaries map to TCP segments.
Allocation patterns that cause GC pressure Permalink to this section
Each fmt.Sprintf or string(bytes) call inside the write loop allocates a new []byte on the heap. At 5 000 connections emitting one event per second, that is 5 000 allocations per second for event bodies alone, before header formatting, JSON marshalling, or middleware overhead. The runtimeβs garbage collector must trace and sweep all of it. sync.Pool breaks the cycle by reusing allocated slices across goroutines.
Goroutine retention after disconnect Permalink to this section
Goβs connection lifecycle means the server cannot proactively detect a client TCP RST unless it attempts a write. A goroutine blocking on a channel send to a dead clientβs chan []byte will live until the context is cancelled or the channel is garbage-collected β which requires no more references. If the hub map still holds the *client pointer, neither condition is met.
Step-by-Step Resolution Permalink to this section
Step 1 β Assert http.Flusher immediately and set SSE headers Permalink to this section
func sseHandler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Disable nginx / proxy buffering at the HTTP layer
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Send an initial comment to flush headers and confirm the stream opened.
fmt.Fprint(w, ": ok\n\n")
flusher.Flush()
// ... event loop follows
}
This must happen before any write to the body. After WriteHeader, headers are frozen.
Step 2 β Pool write buffers with sync.Pool Permalink to this section
import (
"bytes"
"sync"
)
var eventBufPool = sync.Pool{
New: func() any {
// Pre-size to a typical event: id(20) + event(30) + data(256) + delimiters
b := bytes.NewBuffer(make([]byte, 0, 512))
return b
},
}
// acquireEventBuf returns a reset buffer from the pool.
func acquireEventBuf() *bytes.Buffer {
b := eventBufPool.Get().(*bytes.Buffer)
b.Reset()
return b
}
// releaseEventBuf returns the buffer to the pool.
func releaseEventBuf(b *bytes.Buffer) {
// Cap growth: don't pool buffers that grew huge (e.g., large JSON payloads).
if b.Cap() <= 32*1024 {
eventBufPool.Put(b)
}
}
Why cap at 32 KB? Pooling excessively large buffers keeps memory pinned even when no connections are active. Discard outliers rather than inflating the poolβs steady-state footprint.
Step 3 β Zero-allocation event serialization Permalink to this section
// formatSSEEvent writes a complete SSE event into buf and returns the raw bytes.
// Caller must call releaseEventBuf(buf) after the bytes have been flushed to the wire.
func formatSSEEvent(buf *bytes.Buffer, id, eventType, data string) []byte {
if id != "" {
buf.WriteString("id: ")
buf.WriteString(id)
buf.WriteByte('\n')
}
if eventType != "" && eventType != "message" {
buf.WriteString("event: ")
buf.WriteString(eventType)
buf.WriteByte('\n')
}
buf.WriteString("data: ")
buf.WriteString(data)
buf.WriteString("\n\n")
return buf.Bytes()
}
bytes.Buffer.WriteString does not allocate when the buffer has capacity. Avoid fmt.Fprintf inside the hot loop β the interface{} boxing it performs allocates even when the format string contains only %s verbs.
Step 4 β Bounded per-client channel queues Permalink to this section
const clientQueueDepth = 64 // events; tune per p99 burst size
type sseClient struct {
ch chan []byte
ctx context.Context
}
// newClient creates a client with a bounded event queue.
func newClient(ctx context.Context) *sseClient {
return &sseClient{
ch: make(chan []byte, clientQueueDepth),
ctx: ctx,
}
}
// enqueue attempts to send an event to the client without blocking.
// Returns false (event dropped) if the client is a slow consumer.
func (c *sseClient) enqueue(event []byte) bool {
select {
case c.ch <- event:
return true
default:
// Slow consumer: drop rather than accumulate unbounded heap.
return false
}
}
64 slots at ~512 bytes each consumes 32 KB per client β affordable at 10 k connections (320 MB worst case, fully occupied queues). See Rate Limiting & Backpressure Handling for strategies when drop rate is unacceptable (token-bucket leaky queues, client eviction).
Step 5 β Write loop with explicit flush and context teardown Permalink to this section
func (c *sseClient) serve(w http.ResponseWriter, flusher http.Flusher) {
for {
select {
case <-c.ctx.Done():
return // HTTP connection closed; buffers released on return
case event, ok := <-c.ch:
if !ok {
return
}
if _, err := w.Write(event); err != nil {
// Write error = client gone; exit promptly.
return
}
flusher.Flush() // one chunk per event; maintains stream latency
}
}
}
Never batch multiple events before flushing without explicit latency tolerance. Each Flush() call costs one write(2) syscall but guarantees the client receives the event within milliseconds, not when the 4 KB buffer fills. For idempotent event IDs to work correctly, the client must receive each id: field before the connection drops β that requires per-event flushing.
Step 6 β Server timeouts for streaming endpoints Permalink to this section
srv := &http.Server{
Addr: ":8080",
Handler: mux,
// Short read timeout rejects slow request bodies / malformed headers.
ReadHeaderTimeout: 5 * time.Second,
ReadTimeout: 10 * time.Second,
// Do NOT set WriteTimeout on SSE handlers β it would terminate the stream.
// Use per-request context deadlines instead.
IdleTimeout: 90 * time.Second,
}
Set WriteTimeout only on non-streaming muxes. For the SSE route, wrap the handler with a context that has a maximum stream duration:
func withMaxAge(next http.HandlerFunc, d time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), d)
defer cancel()
next(w, r.WithContext(ctx))
}
}
// Force clients to reconnect every 30 min; prevents zombie connections.
mux.HandleFunc("/events", withMaxAge(sseHandler, 30*time.Minute))
Forced reconnection keeps retry reconnection logic exercised and prevents memory from accumulating in connections that should have been cleaned up.
Step 7 β Deterministic hub cleanup on disconnect Permalink to this section
type hub struct {
mu sync.Mutex
clients map[*sseClient]struct{}
}
func (h *hub) register(c *sseClient) {
h.mu.Lock()
h.clients[c] = struct{}{}
h.mu.Unlock()
}
func (h *hub) unregister(c *sseClient) {
h.mu.Lock()
delete(h.clients, c)
h.mu.Unlock()
// Drain any buffered events to prevent channel GC delay.
for {
select {
case <-c.ch:
default:
return
}
}
}
// In the handler:
h.register(client)
defer h.unregister(client)
client.serve(w, flusher)
Deleting from the hub map removes the last reference, allowing the channel and its buffered slices to be collected in the next GC cycle.
Validation and Monitoring Permalink to this section
Heap profile under load Permalink to this section
# Run the server with pprof enabled
go run -race ./cmd/server &
# Apply load: 2 000 concurrent SSE connections for 60 s
wrk -t8 -c2000 -d60s --latency http://localhost:8080/events
# Capture a heap profile mid-run
curl -s http://localhost:6060/debug/pprof/heap > heap.prof
go tool pprof -http=:6061 heap.prof
In the pprof flame graph, sync.(*Pool).Get should dominate allocations over runtime.mallocgc. If bytes.Buffer.grow appears prominently, your initial pool capacity is undersized.
GC pause assertion Permalink to this section
func logGCStats() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Printf("HeapAlloc=%dKB HeapInuse=%dKB GCPause=%v NumGC=%d",
m.HeapAlloc/1024,
m.HeapInuse/1024,
time.Duration(m.PauseTotalNs),
m.NumGC,
)
}
Target: HeapInuse grows linearly with connection count at ~50 KB per connection (queue + buffers), then plateaus. PauseTotalNs growth per minute should be under 50 ms at 5 k connections.
Goroutine leak detector Permalink to this section
func TestNoGoroutineLeak(t *testing.T) {
before := runtime.NumGoroutine()
// Spin up 100 clients, disconnect all
// ... test setup ...
time.Sleep(200 * time.Millisecond) // let context cancellations propagate
after := runtime.NumGoroutine()
if delta := after - before; delta > 5 {
t.Errorf("goroutine leak: %d goroutines remain after disconnect", delta)
}
}
Chunk boundary verification Permalink to this section
# Confirm Transfer-Encoding: chunked and per-event flush boundaries
curl -v --no-buffer http://localhost:8080/events 2>&1 | head -40
Look for < Transfer-Encoding: chunked in the response headers. Each data: line followed by an empty line should arrive as a discrete chunk rather than batched.
| Metric | Baseline (no pool) | Target (with pool) |
|---|---|---|
| HeapAlloc at 5 k conns | ~800 MB | ~250 MB |
| GC pause / min | 200β500 ms | <50 ms |
| Goroutines at idle (post-disconnect) | climbing | stable Β±5 |
| Flush latency p99 | 80 ms | <5 ms |
Verification Checklist Permalink to this section
Frequently Asked Questions Permalink to this section
Does using sync.Pool guarantee zero allocations inside the event loop?
Not zero β but it amortises them. The pool avoids steady-state allocations by reusing slices that already cleared the escape analysis. You will still see allocations on the first connection burst while the pool warms up, and whenever a buffer grows beyond its initial capacity (triggering an internal grow). Profile with go test -benchmem to measure residual allocations per event.
Why not use bufio.NewWriterSize directly on the response writer?
net/http already wraps the TCP connection in a bufio.Writer. Wrapping it again with your own bufio.Writer creates double-buffering: your buffer fills and flushes into net/http's buffer, and that buffer must then be flushed to the wire. You end up needing to call Flush() on both layers or the event is still held. Use the response writer directly and call http.Flusher.Flush(); let net/http own the underlying buffer.
What channel depth should I use for the per-client queue?
Start at 64 and tune based on your event rate and acceptable drop budget. At 10 events/sec a depth of 64 gives a 6-second burst absorber before drops start. At 1 000 events/sec you either need a deeper buffer (more memory per client) or an explicit slow-consumer eviction policy. Never use an unbounded channel β it defeats the purpose of backpressure. See Rate Limiting & Backpressure Handling for token-bucket alternatives.
Does setting WriteTimeout on http.Server break SSE?
Yes. WriteTimeout starts a deadline from the end of the request headers and terminates the connection if no write completes within the window. SSE connections are designed to stay open indefinitely β they will be killed by any finite WriteTimeout. Instead, control stream lifetime with a context.WithTimeout or context.WithDeadline attached to the request context, and use a separate non-streaming http.Server instance (or a different port) with WriteTimeout for your REST endpoints.
How do I detect memory leaks introduced by middleware wrapping the response writer?
Some middleware (logging, tracing) wraps http.ResponseWriter in a struct that does not embed http.Flusher. The w.(http.Flusher) assertion at handler entry will catch this immediately β the cast fails and you return a 500. Fix: ensure your middleware's wrapper type asserts and forwards Flush(), or use a delegation pattern that preserves all optional interfaces. Inspect with go vet and a unit test that calls w.(http.Flusher) through the full middleware chain.
β‘ Production Directives
- Pre-warm
sync.Poolat startup with a goroutine that gets and puts a few hundred buffers before traffic arrives, so the first connection burst does not trigger a heap spike. - Never use
fmt.Fprintf(w, ...)inside the SSE hot loop β the variadicinterface{}arguments escape to heap. Usebuf.WriteStringandw.Write(buf.Bytes()). - Set
GOGC=50(more frequent GC) in memory-sensitive deployments to keep live heap smaller at the cost of slightly higher GC CPU; benchmark both values under your connection profile. - Export
runtime.MemStats.HeapInuseas a Prometheus gauge; alert when it grows faster than O(connection count) β that is the earliest signal of a leak. - Gate SSE endpoints behind a connection-count limiter at the load balancer layer to prevent a single traffic spike from pushing goroutine count past your ulimit.