# Resumable Streams
URL: /docs/guides/resumable-streams

Persist an in-flight LLM response on the server so the client can reload, lose its connection, or open a new tab and pick up the same stream.

> For AI agents: a documentation index is available at [llms.txt](/llms.txt). Use `.md` for canonical markdown pages; `.mdx` is kept as a backwards-compatible alias on supported URL paths.

`assistant-stream/resumable` lets you continue a streaming LLM response across client reconnects. The server keeps writing to a store while the original request is in flight; if the browser reloads or loses its connection, a follow-up request replays the persisted bytes plus any new ones until the producer finalizes.

It works with any encoder that already ships in `assistant-stream` (the AI SDK UI message stream, the data stream protocol, the assistant transport SSE format, or your own), because persistence happens at the byte level after encoding.

## What it solves

A user sends a long prompt, walks away, and reloads the tab. Without resumable streams the LLM call is wasted; with them the client picks up where it left off. The same flow handles dropped mobile connections and lets a stream started on one device be read on another, gated by an opaque stream id.

If your responses are short or you do not care about reload survival, the standard `streamText().toUIMessageStreamResponse()` path is enough.

## Server side: minimum wiring

Construct a `ResumableStreamContext` once per process and reuse it across requests. The context is the seam between your route handlers and the storage backend.

```
import {
  createInMemoryResumableStreamStore,
  createResumableStreamContext,
} from "assistant-stream/resumable";

const store = createInMemoryResumableStreamStore();
export const resumableContext = createResumableStreamContext({ store });
```

In your chat route, wrap the response body in `ctx.run(streamId, makeStream)`. The first caller for `streamId` becomes the producer (your `makeStream` callback runs); later callers and reconnects become consumers that replay the persisted bytes.

```
import { streamText } from "ai";
import { RESUMABLE_STREAM_ID_HEADER } from "assistant-stream/resumable";
import { resumableContext } from "@/lib/resumable-context";

export async function POST(req: Request) {
  const { messages } = await req.json();
  const streamId = crypto.randomUUID();

  const result = streamText({ /* model, messages, tools, ... */ });
  const sourceBody = result.toUIMessageStreamResponse().body!;

  const stream = await resumableContext.run(streamId, () => sourceBody);

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      [RESUMABLE_STREAM_ID_HEADER]: streamId,
    },
  });
}
```

A separate GET endpoint replays the persisted bytes for reconnecting clients. `ctx.resume(streamId)` returns `null` when no stream exists; use `ctx.requireResume(streamId)` if you prefer to surface a `ResumableStreamError` with code `"missing"` instead.

```
import { RESUMABLE_STREAM_ID_HEADER } from "assistant-stream/resumable";
import { resumableContext } from "@/lib/resumable-context";

export async function GET(
  _req: Request,
  ctx: { params: Promise<{ streamId: string }> },
) {
  const { streamId } = await ctx.params;
  const stream = await resumableContext.resume(streamId);
  if (!stream) {
    return new Response(JSON.stringify({ error: "stream not found" }), {
      status: 404,
      headers: { "Content-Type": "application/json" },
    });
  }
  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      [RESUMABLE_STREAM_ID_HEADER]: streamId,
    },
  });
}
```

The context exposes two more verbs: `ctx.status(streamId)` returns `"streaming" | "done" | "error" | "missing"`, and `ctx.delete(streamId)` removes all persisted state for a stream and terminates active readers. The remaining options on `createResumableStreamContext` (`onAcquire`, `onAppend`, `onFinalize`, `onError`) are observability hooks covered in [Resumable Stream Deployment](/docs/guides/resumable-stream-deployment).

## Client side: native integration

`@assistant-ui/react-ai-sdk` ships a `resumable` option on `AssistantChatTransport`. It captures the stream id from the response header, redirects `chat.resumeStream()` reconnects to your resume route, and clears the stored id when the response finishes naturally. Pair it with `useChatRuntime`, which fires `chat.resumeStream()` on mount whenever a pending id is present in storage.

```
"use client";

import { AssistantRuntimeProvider } from "@assistant-ui/react";
import {
  AssistantChatTransport,
  createResumableSessionStorage,
  useChatRuntime,
} from "@assistant-ui/react-ai-sdk";
import { useMemo } from "react";
import { Thread } from "@/components/assistant-ui/thread";

const storage = createResumableSessionStorage();

export default function Page() {
  const transport = useMemo(
    () =>
      new AssistantChatTransport({
        api: "/api/chat",
        resumable: {
          storage,
          resumeApi: (streamId) => `/api/chat/resume/${streamId}`,
        },
      }),
    [],
  );
  const runtime = useChatRuntime({ transport });

  return (
    <AssistantRuntimeProvider runtime={runtime}>
      <Thread />
    </AssistantRuntimeProvider>
  );
}
```

`createResumableSessionStorage` returns a `ResumableClientStorage` backed by `window.sessionStorage`. Pass `{ key }` to namespace per route or per chat surface, or supply your own implementation of the three methods (`getStreamId`, `setStreamId`, `clear`). If you are running on a transport that already wraps `fetch` or `prepareReconnectToStreamRequest`, the `resumable` option composes with your existing handlers.

The default finish detector scans the SSE body for the AI SDK `"type":"finish"` marker. Override `isFinishEvent` on the `resumable` option when you ship a custom encoder.

## Storage choices

The core package ships `createInMemoryResumableStreamStore` for development and tests. State lives in a process-local `Map`, so it does not survive a server restart. Useful options include `defaultTtlMs`, `maxChunkBytes`, `maxEntriesPerStream`, `maxStreams`, and `gcIntervalMs` for periodic eviction.

For production, use one of the optional Redis adapters via the `assistant-stream/resumable/redis` (node-redis v5) or `assistant-stream/resumable/ioredis` sub-paths. Both adapters batch the per-append `XADD` and TTL refresh into a single pipelined round trip, store chunk values as binary, and accept the same `keyPrefix`, `defaultTtlMs`, `pollIntervalMs`, and `maxChunkBytes` options. Cluster routing works because each stream's keys share a `{streamId}` hash tag.

```
import {
  createResumableStreamContext,
  type ResumableStreamStore,
} from "assistant-stream/resumable";

async function createStore(): Promise<ResumableStreamStore> {
  if (!process.env.REDIS_URL) {
    const { createInMemoryResumableStreamStore } = await import(
      "assistant-stream/resumable"
    );
    return createInMemoryResumableStreamStore();
  }
  const { createClient } = await import("redis");
  const { createRedisResumableStreamStore } = await import(
    "assistant-stream/resumable/redis"
  );
  const client = createClient({ url: process.env.REDIS_URL });
  await client.connect();
  return createRedisResumableStreamStore(client);
}

export const resumableContext = createResumableStreamContext({
  store: await createStore(),
});
```

For Postgres, Cloudflare Durable Objects, Upstash REST, or any other backend, implement the `ResumableStreamStore` interface directly. See [Custom Resumable Stream Stores](/docs/guides/resumable-stream-stores) for the contract walkthrough and a worked example.

## Production checklist

- **Auth.** The resume route in the snippets above will serve any caller that knows the stream id. Bind `streamId` to the requesting user at acquire time and verify the binding inside the resume handler. Treat the id as opaque, not as a credential; it leaks via response headers, `sessionStorage`, browser history, and access logs.
- **`waitUntil` on serverless.** On Vercel and Cloudflare the request handler is killed once the response returns, which interrupts the producer task. Pass `after` from `next/server` (or your platform's `ctx.waitUntil`) when constructing the context so the task survives past the response: `createResumableStreamContext({ store, waitUntil: after })`.
- **TTL.** Streams expire 24 hours after the last write by default. Configure with `defaultTtlMs` on the store, or override per deployment via `ttlMs` on the context. Match TTLs across the store, any owner-binding key, and any signed cookie that references a `streamId`.
- **Stream id format.** The Redis adapters validate `streamId` against `/^[A-Za-z0-9_.:-]{1,256}$/` to keep keys well-formed. UUIDv4 is fine.

For the full treatment of authorization, multi-tenant key prefixes, observability hooks, resource limits, and incident response, see [Resumable Stream Deployment](/docs/guides/resumable-stream-deployment).

A new `ResumableStreamError` class is exported from `assistant-stream/resumable` with codes `"missing" | "exists" | "finalized" | "invalid-id"`; catch it in the resume route to distinguish "stream gone" from other failures.

## Helpers for `AssistantStreamController` callbacks

If you produce streams via `createAssistantStream` rather than the AI SDK, the package ships two helpers that bridge the controller-callback style and any encoder to the store:

```
import {
  createResumableAssistantStreamResponse,
  createResumeAssistantStreamResponse,
} from "assistant-stream/resumable";
import { resumableContext } from "@/lib/resumable-context";

// POST handler
return createResumableAssistantStreamResponse({
  context: resumableContext,
  streamId,
  callback: (controller) => {
    /* same shape as createAssistantStreamResponse */
  },
});

// GET resume handler
return createResumeAssistantStreamResponse({
  context: resumableContext,
  streamId,
});
```

Both helpers default to the data-stream encoder; pass `encoder: () => new AssistantTransportEncoder()` (or any custom encoder) to override. They set the `x-resumable-stream-id` response header automatically, which is what `AssistantChatTransport`'s `resumable` adapter looks for.

## Example app

[`examples/with-resumable-stream`](https://github.com/assistant-ui/assistant-ui/tree/main/examples/with-resumable-stream) is a runnable Next.js app that uses `useChat`, the `resumable` transport option, and `useChatRuntime`. It falls back to a built-in mock when `OPENAI_API_KEY` is unset, and switches the store from in-memory to Redis when `REDIS_URL` is set.

```
npx assistant-ui create my-app -e with-resumable-stream
```