Resumable Stream Deployment

Production hardening for resumable streams. Authorization, serverless lifetimes, TTLs, key isolation, observability, resource limits, and incident response.

This guide assumes you have the basic wiring from Resumable Streams in place and focuses on what to add before serving production traffic.

Authentication and authorization

The default resume endpoint serves any caller that knows the streamId. Treat the id as opaque, not as a credential. Bind every newly created streamId to the requesting user at acquire time and verify the binding on every resume.

Store the binding next to the rest of your session state, or in Redis under a separate key. The example below uses a parallel <keyPrefix>:owner:<streamId> entry that mirrors the TTL of the underlying stream.

/lib/resumable-context.ts
import { createResumableStreamContext } from "assistant-stream/resumable";
import { redis } from "@/lib/redis";
import { store } from "@/lib/resumable-store";

const OWNER_PREFIX = "aui:resumable:owner";
const OWNER_TTL_SEC = 24 * 60 * 60;

export const resumableContext = createResumableStreamContext({ store });

export async function bindStreamToUser(streamId: string, userId: string) {
  await redis.set(`${OWNER_PREFIX}:${streamId}`, userId, { EX: OWNER_TTL_SEC });
}

export async function assertStreamOwner(streamId: string, userId: string) {
  const owner = await redis.get(`${OWNER_PREFIX}:${streamId}`);
  if (owner !== userId) {
    throw new Response("Not Found", { status: 404 });
  }
}

Wrap resume with the ownership check. Returning 404 (not 403) avoids confirming the existence of a stream the caller does not own.

/app/api/chat/resume/[streamId]/route.ts
import { assertStreamOwner, resumableContext } from "@/lib/resumable-context";
import { getSessionUserId } from "@/lib/auth";

export async function GET(
  req: Request,
  ctx: { params: Promise<{ streamId: string }> },
) {
  const userId = await getSessionUserId(req);
  if (!userId) return new Response("Unauthorized", { status: 401 });

  const { streamId } = await ctx.params;
  await assertStreamOwner(streamId, userId);

  const stream = await resumableContext.resume(streamId);
  if (!stream) return new Response("Not Found", { status: 404 });

  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream" },
  });
}

waitUntil on serverless

On Vercel and Cloudflare the request handler is torn down once the response is returned, taking the producer task with it. Without a waitUntil hook the persisted stream stops growing the moment the originating request unwinds, so reconnects only see the bytes that happened to land before the response flushed.

On Vercel, pass after from next/server:

/lib/resumable-context.ts
import { after } from "next/server";
import { createResumableStreamContext } from "assistant-stream/resumable";
import { store } from "@/lib/resumable-store";

export const resumableContext = createResumableStreamContext({
  store,
  waitUntil: after,
});

On Cloudflare Workers, take the ExecutionContext from your handler and forward ctx.waitUntil:

/src/worker.ts
import { createResumableStreamContext } from "assistant-stream/resumable";
import { store } from "./resumable-store";

export default {
  async fetch(req: Request, env: Env, ctx: ExecutionContext) {
    const resumableContext = createResumableStreamContext({
      store,
      waitUntil: (promise) => ctx.waitUntil(promise),
    });
    return handle(req, resumableContext);
  },
};

In long-lived Node servers (a custom Express app, a container) waitUntil can be omitted; the producer task runs on the same event loop as the handler and is not preempted.

TTL strategy

Streams expire 24 hours after the last write. The default suits typical chat workloads where a user might reload after lunch, but every deployment should pick a number deliberately.

  • Shorten when chunks contain sensitive payloads (PII, drafts, internal documents). A 5 to 30 minute window usually covers reload survival without leaving recoverable bytes around.
  • Extend for long-running agent tasks that may legitimately stretch past a day. Set the TTL above the worst-case task duration so the producer can still finalize.
  • Match TTLs across layers. The store TTL, the owner-binding TTL, and any signed cookie that references streamId should expire together; otherwise one outlives the other and either leaks or 404s unexpectedly.

Configure on the store for the global default and on the context for a per-deployment override:

import {
  createInMemoryResumableStreamStore,
  createResumableStreamContext,
} from "assistant-stream/resumable";

const store = createInMemoryResumableStreamStore({
  defaultTtlMs: 30 * 60 * 1000,
});

export const resumableContext = createResumableStreamContext({
  store,
  ttlMs: 30 * 60 * 1000,
});

The Redis adapters accept the same defaultTtlMs option.

Multi-tenant key isolation

When multiple apps or tenants share a Redis instance, set keyPrefix per environment so a misconfigured stream in one tenant cannot collide with, or be deleted alongside, another's. The prefix becomes the leading segment of every meta and data key.

/lib/resumable-store.ts
import { createClient } from "redis";
import { createRedisResumableStreamStore } from "assistant-stream/resumable/redis";

const client = createClient({ url: process.env.REDIS_URL });
await client.connect();

export const store = createRedisResumableStreamStore(client, {
  keyPrefix: `aui:${process.env.APP_NAME}:${process.env.TENANT_ID}`,
});

Per-tenant prefixes also make incident response cheaper. A SCAN MATCH aui:app:tenant-42:* lets you audit or purge a single tenant without touching the rest.

Observability hooks

ResumableStreamContextOptions exposes lifecycle hooks for structured logging, metrics, and tracing. Each hook is invoked synchronously around the underlying store call; throwing inside a hook surfaces as a producer error.

/lib/resumable-context.ts
import { createResumableStreamContext } from "assistant-stream/resumable";
import { logger, metrics } from "@/lib/observability";
import { store } from "@/lib/resumable-store";

export const resumableContext = createResumableStreamContext({
  store,
  onAcquire: (streamId, role) => {
    metrics.increment("resumable.acquire", { role });
    logger.info("resumable.acquire", { streamId, role });
  },
  onAppend: (streamId, byteLength) => {
    metrics.histogram("resumable.append.bytes", byteLength);
  },
  onFinalize: (streamId, status, error) => {
    metrics.increment("resumable.finalize", { status });
    logger.info("resumable.finalize", { streamId, status, error });
  },
  onError: (streamId, error) => {
    const message = error instanceof Error ? error.message : String(error);
    logger.error("resumable.error", { streamId, error: message });
  },
});

Keep hook bodies cheap. They run on the producer's hot path and any latency they add becomes streaming latency seen by the client.

Resource limits

The in-memory store enforces three caps that the Redis adapters intentionally leave to the underlying database. Set them whenever your process can be reached by untrusted callers.

import { createInMemoryResumableStreamStore } from "assistant-stream/resumable";

const store = createInMemoryResumableStreamStore({
  maxChunkBytes: 64 * 1024,
  maxEntriesPerStream: 5000,
  maxStreams: 10_000,
});
  • maxChunkBytes rejects oversized writes from a misbehaving producer (a runaway tool result, a base64 blob accidentally piped through). The producer task fails fast instead of pinning memory.
  • maxEntriesPerStream caps the per-stream entry count. This bounds how much any single stream can grow before it starts erroring; pair it with TTLs so finalized streams clear quickly.
  • maxStreams caps total live streams. Useful as a backstop in shared development environments and in single-tenant containers; in serverless deployments the platform already constrains concurrency.

These limits exist on the in-memory store. For Redis, configure maxmemory and an eviction policy on the server, and rely on application-level rate limiting upstream.

Incident response

The streamId leaks through response headers, browser session storage, server access logs, and (in some setups) error reports. If you suspect any of those channels were compromised, treat all in-flight stream ids as exposed.

What to log up front, so you have it when you need it:

  • The acquiring user id, request id, and IP for every acquire call (via onAcquire).
  • The finalize status (and any error) for every stream (via onFinalize).
  • The owner-binding writes and reads, with the user id and the streamId.

What to rotate or invalidate during an incident:

  • Bump keyPrefix on the store. Existing streams become unreachable and new ones land under the rotated namespace.
  • Invalidate signed session cookies that reference any cached streamId.
  • Drop the owner-binding keys for affected users (DEL aui:resumable:owner:* scoped by user) so resumes are forced through a fresh acquire.
  • Shorten defaultTtlMs temporarily so any orphaned stream rolls off quickly.

After rotation, reissue stream ids server-side and redirect clients through a fresh acquire; do not trust any streamId the client already holds.