# Custom Resumable Stream Stores
URL: /docs/guides/resumable-stream-stores

Implement the ResumableStreamStore interface to back resumable streams with Postgres, Cloudflare Durable Objects, Upstash REST, InstantDB, or any other backend.

> For AI agents: a documentation index is available at [llms.txt](/llms.txt). Use `.md` for canonical markdown pages; `.mdx` is kept as a backwards-compatible alias on supported URL paths.

The built-in InMemory and Redis adapters cover most deployments. Write your own `ResumableStreamStore` when you need a backend you already operate (Postgres, MySQL), an edge-native primitive (Cloudflare Durable Objects, Workers KV), an HTTP-only key-value service (Upstash REST), or a realtime database (InstantDB). The contract is six async methods over an opaque `streamId` and a monotonic byte log.

## Interface walkthrough

The full interface lives in `assistant-stream/resumable`:

```
export interface ResumableStreamStore {
  acquire(
    streamId: string,
    options?: ResumableStreamAcquireOptions,
  ): Promise<ResumableStreamRole>;
  append(streamId: string, chunk: Uint8Array): Promise<void>;
  finalize(
    streamId: string,
    status: "done" | "error",
    error?: string,
  ): Promise<void>;
  read(
    streamId: string,
    cursor: string,
    signal: AbortSignal,
  ): AsyncIterable<ResumableStreamEntry>;
  status(streamId: string): Promise<ResumableStreamStatus>;
  delete(streamId: string): Promise<void>;
}
```

`acquire(streamId, options?)` arbitrates ownership. The first caller for a given `streamId` resolves to `"producer"`; every later caller, including those arriving after `finalize`, resolves to `"consumer"`. Implementations must perform the check and the insert atomically (see below). `options.ttlMs` overrides the store default for this stream; honor it when you set the expiration timestamp.

`append(streamId, chunk)` adds a `Uint8Array` to the log under a fresh, monotonically increasing cursor. Callers expect the chunk to be observable to `read` before the promise resolves. Implementations should refresh the TTL on each call so a stream that is still actively producing does not expire mid-flight, and should reject when the stream is missing or already finalized.

`finalize(streamId, status, error?)` flips the stream into a terminal state. Pending and future `read` iterables drain buffered entries and then either complete (`"done"`) or throw with `error` (`"error"`). Implementations must make `finalize` idempotent: a duplicate call with the same status is a no-op, and the producer task may retry on transient errors.

`read(streamId, cursor, signal)` is the only streaming method. It yields every entry whose cursor sorts strictly after the supplied `cursor`, then waits for new appends, then completes when the stream finalizes. Aborting `signal` resolves the iterable cleanly without throwing. Networked stores typically combine a bounded fetch loop with pub/sub, long-poll, or notify wakeups; do not busy-loop.

`status(streamId)` returns one of `"streaming" | "done" | "error" | "missing"` synchronously with respect to the underlying store. It exists so the context can decide whether to start a new producer or attach a consumer without holding a `read` iterator open.

`delete(streamId)` removes all state for the stream. It must be a no-op when the stream does not exist, and it should cause active `read` iterables to terminate (treat outstanding readers as if the stream finalized).

## Acquire semantics

`acquire` is the only method that requires linearizability across processes. Two route handlers that race to start the same `streamId` must see exactly one `"producer"` result; the loser becomes a `"consumer"` and replays the winner's bytes. A single-process store can guard a `Map` with a synchronous `if (!map.has(id)) map.set(id, ...)`. Networked stores need a primitive that does the check and the insert in one round trip:

- Redis: `SET key value NX EX ttl`, or `INCR` against a per-stream counter.
- Postgres: `INSERT ... ON CONFLICT (stream_id) DO NOTHING RETURNING ...`.
- Durable Objects: a single object instance per `streamId` plus a boolean field.
- Upstash REST: `set` with `nx=true`.

If your backend cannot offer atomicity, do not paper over it with read-then-write; you will silently produce two writers for the same stream under contention, and consumers will observe interleaved bytes.

## The cursor contract

Cursors are opaque strings. Callers never inspect them; the store assigns them, the context echoes them back on the next `read` call, and the store uses them to resume from the correct position. Two rules:

- Cursors must be strictly monotonic per stream. Whatever scheme you pick (sequence number, ULID, Postgres `bigserial`, Redis stream id), entry N+1 sorts after entry N.
- The empty string means start from the beginning. `read(streamId, "", signal)` yields every entry the store has, oldest first.

You do not need cross-stream ordering. You do need a deterministic mapping from cursor back to position so that `read` can resume a consumer that disconnected mid-replay.

## A worked example

A `Map`-backed implementation suitable for a single-process server. It is deliberately small and skips TTL eviction; treat it as a starting point for a custom backend rather than a replacement for `createInMemoryResumableStreamStore`.

```
import type { ResumableStreamStore } from "assistant-stream/resumable";

type State = {
  entries: { cursor: string; chunk: Uint8Array }[];
  seq: number;
  final?: { status: "done" | "error"; error?: string };
  waiters: Array<() => void>;
};

export function createMapResumableStreamStore(): ResumableStreamStore {
  const streams = new Map<string, State>();
  const wake = (s: State) => s.waiters.splice(0).forEach((fn) => fn());
  return {
    async acquire(id) {
      if (streams.has(id)) return "consumer";
      streams.set(id, { entries: [], seq: 0, waiters: [] });
      return "producer";
    },
    async append(id, chunk) {
      const s = streams.get(id);
      if (!s || s.final) throw new Error(`Cannot append: ${id}`);
      s.entries.push({ cursor: (++s.seq).toString(36), chunk });
      wake(s);
    },
    async finalize(id, status, error) {
      const s = streams.get(id);
      if (!s || s.final) return;
      s.final = { status, error };
      wake(s);
    },
    async *read(id, cursor, signal) {
      const s = streams.get(id);
      if (!s) throw new Error(`Stream not found: ${id}`);
      let i = cursor === "" ? 0 : Number.parseInt(cursor, 36);
      while (!signal.aborted) {
        while (i < s.entries.length) yield s.entries[i++]!;
        if (s.final) {
          if (s.final.status === "error") throw new Error(s.final.error);
          return;
        }
        await new Promise<void>((r) => {
          s.waiters.push(r);
          signal.addEventListener("abort", () => r(), { once: true });
        });
      }
    },
    async status(id) {
      const s = streams.get(id);
      return !s ? "missing" : s.final ? s.final.status : "streaming";
    },
    async delete(id) {
      const s = streams.get(id);
      if (!s) return;
      streams.delete(id);
      s.final ??= { status: "done" };
      wake(s);
    },
  };
}
```

## TTL and eviction

`acquire` receives `options.ttlMs`; if absent, fall back to a store-level default (the built-in stores use 24 hours). Refresh the expiration on every `append` and on `finalize` so a stream that finishes near the deadline still has time to be consumed. Persist the TTL alongside the entries so a worker reading the stream much later can decide whether the data is still valid.

When a stream expires, treat it the same as `finalize(streamId, "error", "Stream expired")`: any active `read` iterable must throw or terminate, and `status` must transition to `"missing"` once the eviction has run. Stores backed by Redis or a similar TTL-aware engine can lean on the engine's own expiration; SQL-backed stores need a periodic sweep, and Durable Objects can use `setAlarm`.

## Wiring it up

`createResumableStreamContext` takes any object that satisfies `ResumableStreamStore`. There is no registry and no extra configuration; pass your instance as `store`:

```
import { createResumableStreamContext } from "assistant-stream/resumable";
import { createMapResumableStreamStore } from "@/lib/map-resumable-store";

export const resumableContext = createResumableStreamContext({
  store: createMapResumableStreamStore(),
});
```

From this point the route handlers in [Resumable Streams](/docs/guides/resumable-streams) work unchanged: `resumableContext.run(streamId, makeStream)` calls your `acquire`, `append`, and `finalize`, and `resumableContext.resume(streamId)` calls your `read`.