Implement the ResumableStreamStore interface to back resumable streams with Postgres, Cloudflare Durable Objects, Upstash REST, InstantDB, or any other backend.
The built-in InMemory and Redis adapters cover most deployments. Write your own ResumableStreamStore when you need a backend you already operate (Postgres, MySQL), an edge-native primitive (Cloudflare Durable Objects, Workers KV), an HTTP-only key-value service (Upstash REST), or a realtime database (InstantDB). The contract is six async methods over an opaque streamId and a monotonic byte log.
Interface walkthrough
The full interface lives in assistant-stream/resumable:
export interface ResumableStreamStore {
acquire(
streamId: string,
options?: ResumableStreamAcquireOptions,
): Promise<ResumableStreamRole>;
append(streamId: string, chunk: Uint8Array): Promise<void>;
finalize(
streamId: string,
status: "done" | "error",
error?: string,
): Promise<void>;
read(
streamId: string,
cursor: string,
signal: AbortSignal,
): AsyncIterable<ResumableStreamEntry>;
status(streamId: string): Promise<ResumableStreamStatus>;
delete(streamId: string): Promise<void>;
}acquire(streamId, options?) arbitrates ownership. The first caller for a given streamId resolves to "producer"; every later caller, including those arriving after finalize, resolves to "consumer". Implementations must perform the check and the insert atomically (see below). options.ttlMs overrides the store default for this stream; honor it when you set the expiration timestamp.
append(streamId, chunk) adds a Uint8Array to the log under a fresh, monotonically increasing cursor. Callers expect the chunk to be observable to read before the promise resolves. Implementations should refresh the TTL on each call so a stream that is still actively producing does not expire mid-flight, and should reject when the stream is missing or already finalized.
finalize(streamId, status, error?) flips the stream into a terminal state. Pending and future read iterables drain buffered entries and then either complete ("done") or throw with error ("error"). Implementations must make finalize idempotent: a duplicate call with the same status is a no-op, and the producer task may retry on transient errors.
read(streamId, cursor, signal) is the only streaming method. It yields every entry whose cursor sorts strictly after the supplied cursor, then waits for new appends, then completes when the stream finalizes. Aborting signal resolves the iterable cleanly without throwing. Networked stores typically combine a bounded fetch loop with pub/sub, long-poll, or notify wakeups; do not busy-loop.
status(streamId) returns one of "streaming" | "done" | "error" | "missing" synchronously with respect to the underlying store. It exists so the context can decide whether to start a new producer or attach a consumer without holding a read iterator open.
delete(streamId) removes all state for the stream. It must be a no-op when the stream does not exist, and it should cause active read iterables to terminate (treat outstanding readers as if the stream finalized).
Acquire semantics
acquire is the only method that requires linearizability across processes. Two route handlers that race to start the same streamId must see exactly one "producer" result; the loser becomes a "consumer" and replays the winner's bytes. A single-process store can guard a Map with a synchronous if (!map.has(id)) map.set(id, ...). Networked stores need a primitive that does the check and the insert in one round trip:
- Redis:
SET key value NX EX ttl, orINCRagainst a per-stream counter. - Postgres:
INSERT ... ON CONFLICT (stream_id) DO NOTHING RETURNING .... - Durable Objects: a single object instance per
streamIdplus a boolean field. - Upstash REST:
setwithnx=true.
If your backend cannot offer atomicity, do not paper over it with read-then-write; you will silently produce two writers for the same stream under contention, and consumers will observe interleaved bytes.
The cursor contract
Cursors are opaque strings. Callers never inspect them; the store assigns them, the context echoes them back on the next read call, and the store uses them to resume from the correct position. Two rules:
- Cursors must be strictly monotonic per stream. Whatever scheme you pick (sequence number, ULID, Postgres
bigserial, Redis stream id), entry N+1 sorts after entry N. - The empty string means start from the beginning.
read(streamId, "", signal)yields every entry the store has, oldest first.
You do not need cross-stream ordering. You do need a deterministic mapping from cursor back to position so that read can resume a consumer that disconnected mid-replay.
A worked example
A Map-backed implementation suitable for a single-process server. It is deliberately small and skips TTL eviction; treat it as a starting point for a custom backend rather than a replacement for createInMemoryResumableStreamStore.
import type { ResumableStreamStore } from "assistant-stream/resumable";
type State = {
entries: { cursor: string; chunk: Uint8Array }[];
seq: number;
final?: { status: "done" | "error"; error?: string };
waiters: Array<() => void>;
};
export function createMapResumableStreamStore(): ResumableStreamStore {
const streams = new Map<string, State>();
const wake = (s: State) => s.waiters.splice(0).forEach((fn) => fn());
return {
async acquire(id) {
if (streams.has(id)) return "consumer";
streams.set(id, { entries: [], seq: 0, waiters: [] });
return "producer";
},
async append(id, chunk) {
const s = streams.get(id);
if (!s || s.final) throw new Error(`Cannot append: ${id}`);
s.entries.push({ cursor: (++s.seq).toString(36), chunk });
wake(s);
},
async finalize(id, status, error) {
const s = streams.get(id);
if (!s || s.final) return;
s.final = { status, error };
wake(s);
},
async *read(id, cursor, signal) {
const s = streams.get(id);
if (!s) throw new Error(`Stream not found: ${id}`);
let i = cursor === "" ? 0 : Number.parseInt(cursor, 36);
while (!signal.aborted) {
while (i < s.entries.length) yield s.entries[i++]!;
if (s.final) {
if (s.final.status === "error") throw new Error(s.final.error);
return;
}
await new Promise<void>((r) => {
s.waiters.push(r);
signal.addEventListener("abort", () => r(), { once: true });
});
}
},
async status(id) {
const s = streams.get(id);
return !s ? "missing" : s.final ? s.final.status : "streaming";
},
async delete(id) {
const s = streams.get(id);
if (!s) return;
streams.delete(id);
s.final ??= { status: "done" };
wake(s);
},
};
}TTL and eviction
acquire receives options.ttlMs; if absent, fall back to a store-level default (the built-in stores use 24 hours). Refresh the expiration on every append and on finalize so a stream that finishes near the deadline still has time to be consumed. Persist the TTL alongside the entries so a worker reading the stream much later can decide whether the data is still valid.
When a stream expires, treat it the same as finalize(streamId, "error", "Stream expired"): any active read iterable must throw or terminate, and status must transition to "missing" once the eviction has run. Stores backed by Redis or a similar TTL-aware engine can lean on the engine's own expiration; SQL-backed stores need a periodic sweep, and Durable Objects can use setAlarm.
Wiring it up
createResumableStreamContext takes any object that satisfies ResumableStreamStore. There is no registry and no extra configuration; pass your instance as store:
import { createResumableStreamContext } from "assistant-stream/resumable";
import { createMapResumableStreamStore } from "@/lib/map-resumable-store";
export const resumableContext = createResumableStreamContext({
store: createMapResumableStreamStore(),
});From this point the route handlers in Resumable Streams work unchanged: resumableContext.run(streamId, makeStream) calls your acquire, append, and finalize, and resumableContext.resume(streamId) calls your read.