# Custom Resumable Stream Stores URL: /docs/guides/resumable-stream-stores Implement the ResumableStreamStore interface to back resumable streams with Postgres, Cloudflare Durable Objects, Upstash REST, InstantDB, or any other backend. The built-in InMemory and Redis adapters cover most deployments. Write your own `ResumableStreamStore` when you need a backend you already operate (Postgres, MySQL), an edge-native primitive (Cloudflare Durable Objects, Workers KV), an HTTP-only key-value service (Upstash REST), or a realtime database (InstantDB). The contract is six async methods over an opaque `streamId` and a monotonic byte log. ## Interface walkthrough \[#interface-walkthrough] The full interface lives in `assistant-stream/resumable`: ```ts title="packages/assistant-stream/src/resumable/types.ts" export interface ResumableStreamStore { acquire( streamId: string, options?: ResumableStreamAcquireOptions, ): Promise; append(streamId: string, chunk: Uint8Array): Promise; finalize( streamId: string, status: "done" | "error", error?: string, ): Promise; read( streamId: string, cursor: string, signal: AbortSignal, ): AsyncIterable; status(streamId: string): Promise; delete(streamId: string): Promise; } ``` `acquire(streamId, options?)` arbitrates ownership. The first caller for a given `streamId` resolves to `"producer"`; every later caller, including those arriving after `finalize`, resolves to `"consumer"`. Implementations must perform the check and the insert atomically (see below). `options.ttlMs` overrides the store default for this stream; honor it when you set the expiration timestamp. `append(streamId, chunk)` adds a `Uint8Array` to the log under a fresh, monotonically increasing cursor. Callers expect the chunk to be observable to `read` before the promise resolves. Implementations should refresh the TTL on each call so a stream that is still actively producing does not expire mid-flight, and should reject when the stream is missing or already finalized. `finalize(streamId, status, error?)` flips the stream into a terminal state. Pending and future `read` iterables drain buffered entries and then either complete (`"done"`) or throw with `error` (`"error"`). Implementations must make `finalize` idempotent: a duplicate call with the same status is a no-op, and the producer task may retry on transient errors. `read(streamId, cursor, signal)` is the only streaming method. It yields every entry whose cursor sorts strictly after the supplied `cursor`, then waits for new appends, then completes when the stream finalizes. Aborting `signal` resolves the iterable cleanly without throwing. Networked stores typically combine a bounded fetch loop with pub/sub, long-poll, or notify wakeups; do not busy-loop. `status(streamId)` returns one of `"streaming" | "done" | "error" | "missing"` synchronously with respect to the underlying store. It exists so the context can decide whether to start a new producer or attach a consumer without holding a `read` iterator open. `delete(streamId)` removes all state for the stream. It must be a no-op when the stream does not exist, and it should cause active `read` iterables to terminate (treat outstanding readers as if the stream finalized). ## Acquire semantics \[#acquire-semantics] `acquire` is the only method that requires linearizability across processes. Two route handlers that race to start the same `streamId` must see exactly one `"producer"` result; the loser becomes a `"consumer"` and replays the winner's bytes. A single-process store can guard a `Map` with a synchronous `if (!map.has(id)) map.set(id, ...)`. Networked stores need a primitive that does the check and the insert in one round trip: * Redis: `SET key value NX EX ttl`, or `INCR` against a per-stream counter. * Postgres: `INSERT ... ON CONFLICT (stream_id) DO NOTHING RETURNING ...`. * Durable Objects: a single object instance per `streamId` plus a boolean field. * Upstash REST: `set` with `nx=true`. If your backend cannot offer atomicity, do not paper over it with read-then-write; you will silently produce two writers for the same stream under contention, and consumers will observe interleaved bytes. ## The cursor contract \[#the-cursor-contract] Cursors are opaque strings. Callers never inspect them; the store assigns them, the context echoes them back on the next `read` call, and the store uses them to resume from the correct position. Two rules: * Cursors must be strictly monotonic per stream. Whatever scheme you pick (sequence number, ULID, Postgres `bigserial`, Redis stream id), entry N+1 sorts after entry N. * The empty string means start from the beginning. `read(streamId, "", signal)` yields every entry the store has, oldest first. You do not need cross-stream ordering. You do need a deterministic mapping from cursor back to position so that `read` can resume a consumer that disconnected mid-replay. ## A worked example \[#a-worked-example] A `Map`-backed implementation suitable for a single-process server. It is deliberately small and skips TTL eviction; treat it as a starting point for a custom backend rather than a replacement for `createInMemoryResumableStreamStore`. ```ts title="/lib/map-resumable-store.ts" import type { ResumableStreamStore } from "assistant-stream/resumable"; type State = { entries: { cursor: string; chunk: Uint8Array }[]; seq: number; final?: { status: "done" | "error"; error?: string }; waiters: Array<() => void>; }; export function createMapResumableStreamStore(): ResumableStreamStore { const streams = new Map(); const wake = (s: State) => s.waiters.splice(0).forEach((fn) => fn()); return { async acquire(id) { if (streams.has(id)) return "consumer"; streams.set(id, { entries: [], seq: 0, waiters: [] }); return "producer"; }, async append(id, chunk) { const s = streams.get(id); if (!s || s.final) throw new Error(`Cannot append: ${id}`); s.entries.push({ cursor: (++s.seq).toString(36), chunk }); wake(s); }, async finalize(id, status, error) { const s = streams.get(id); if (!s || s.final) return; s.final = { status, error }; wake(s); }, async *read(id, cursor, signal) { const s = streams.get(id); if (!s) throw new Error(`Stream not found: ${id}`); let i = cursor === "" ? 0 : Number.parseInt(cursor, 36); while (!signal.aborted) { while (i < s.entries.length) yield s.entries[i++]!; if (s.final) { if (s.final.status === "error") throw new Error(s.final.error); return; } await new Promise((r) => { s.waiters.push(r); signal.addEventListener("abort", () => r(), { once: true }); }); } }, async status(id) { const s = streams.get(id); return !s ? "missing" : s.final ? s.final.status : "streaming"; }, async delete(id) { const s = streams.get(id); if (!s) return; streams.delete(id); s.final ??= { status: "done" }; wake(s); }, }; } ``` ## TTL and eviction \[#ttl-and-eviction] `acquire` receives `options.ttlMs`; if absent, fall back to a store-level default (the built-in stores use 24 hours). Refresh the expiration on every `append` and on `finalize` so a stream that finishes near the deadline still has time to be consumed. Persist the TTL alongside the entries so a worker reading the stream much later can decide whether the data is still valid. When a stream expires, treat it the same as `finalize(streamId, "error", "Stream expired")`: any active `read` iterable must throw or terminate, and `status` must transition to `"missing"` once the eviction has run. Stores backed by Redis or a similar TTL-aware engine can lean on the engine's own expiration; SQL-backed stores need a periodic sweep, and Durable Objects can use `setAlarm`. ## Wiring it up \[#wiring-it-up] `createResumableStreamContext` takes any object that satisfies `ResumableStreamStore`. There is no registry and no extra configuration; pass your instance as `store`: ```ts title="/lib/resumable-context.ts" import { createResumableStreamContext } from "assistant-stream/resumable"; import { createMapResumableStreamStore } from "@/lib/map-resumable-store"; export const resumableContext = createResumableStreamContext({ store: createMapResumableStreamStore(), }); ``` From this point the route handlers in [Resumable Streams](/docs/guides/resumable-streams) work unchanged: `resumableContext.run(streamId, makeStream)` calls your `acquire`, `append`, and `finalize`, and `resumableContext.resume(streamId)` calls your `read`.