# Assistant Transport
URL: /docs/runtimes/custom/assistant-transport
Stream agent state to the frontend and handle user commands for custom agents.
`AssistantTransport` is a state-streaming protocol layered on `ExternalStoreRuntime` (see [architecture](/docs/runtimes/concepts/architecture)). Instead of streaming message parts, your backend streams snapshots of its full agent state and the runtime converts them into UI messages.
Three things make this useful:
* **State streaming** — efficient updates to your agent state (any JSON object).
* **UI integration** — your agent's native state becomes assistant-ui messages.
* **Command handling** — user actions (messages, tool results, custom commands) flow back to the agent.
## When to use it \[#when-to-use-it]
Pick `AssistantTransport` when:
* Your backend does not have a streaming protocol yet and you want one.
* Your agent has internal state worth surfacing in the UI directly.
* You are building a custom agent framework or one without a streaming protocol (e.g. open-source LangGraph).
* You need bidirectional commands beyond simple message turns.
If you only need message streaming, [DataStream](/docs/runtimes/custom/data-stream) is simpler.
## Mental model \[#mental-model]
The frontend receives state snapshots and converts them to React components. The UI is a stateless view on top of the agent state.
The agent server receives commands from the frontend. When a user interacts with the UI (sends a message, clicks a button), the frontend queues a command and sends it. `AssistantTransport` defines `add-message` and `add-tool-result`; you can define more.
### Command lifecycle \[#command-lifecycle]
The runtime alternates between **idle** (no active backend request) and **sending** (request in flight). When a new command is created while idle, it is sent immediately; otherwise it is queued until the current request completes.
To implement this you build two pieces:
1. **Backend endpoint** that accepts commands and returns a stream of state snapshots.
2. **Frontend state converter** that maps state snapshots to assistant-ui's data format.
## Building a backend endpoint \[#building-a-backend-endpoint]
The endpoint receives POST requests with this payload:
```ts
{
state: T, // previous state the frontend has
commands: AssistantTransportCommand[],
system?: string,
tools?: Record, // tool definitions keyed by name
threadId: string | null, // null for new threads
parentId?: string | null, // present when editing or branching
callSettings?: { maxTokens, temperature, topP, presencePenalty, frequencyPenalty, seed },
config?: { apiKey, baseUrl, modelName },
}
```
The previous wire shape spread `callSettings` and `config` fields at the top level (e.g. `body.modelName`). Both formats are sent for compatibility, but the top-level fields are deprecated. Read from the nested objects.
The endpoint returns a stream of state snapshots using the [`assistant-stream`](https://www.npmjs.com/package/assistant-stream) library ([PyPI](https://pypi.org/project/assistant-stream/)).
### Handling commands \[#handling-commands]
```python
for command in request.commands:
if command.type == "add-message":
# Handle adding a user message
elif command.type == "add-tool-result":
# Handle tool execution result
elif command.type == "my-custom-command":
# Handle your custom command
```
### Streaming updates \[#streaming-updates]
Mutate `controller.state` inside your run callback:
```python
from assistant_stream import RunController, create_run
from assistant_stream.serialization import DataStreamResponse
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
async def run_callback(controller: RunController):
controller.state["message"] = "Hello" # emits "set" at ["message"]
controller.state["message"] += " World" # emits "append-text"
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
```
State changes are automatically streamed using the operations described in [streaming protocol](#streaming-protocol).
### Cancellation \[#cancellation]
`create_run` exposes `controller.is_cancelled` and `controller.cancelled_event`. If the response stream closes early (user cancel, client disconnect), these are set so your loop can exit cleanly. `create_run` gives callbacks a \~50ms cooperative shutdown window before forced cancellation. Put critical cleanup in `finally` blocks.
```python
async def run_callback(controller: RunController):
while not controller.is_cancelled:
await asyncio.sleep(0.05)
```
```python
async def run_callback(controller: RunController):
await controller.cancelled_event.wait()
# cancellation-aware shutdown
```
### Backend reference implementation \[#backend-reference-implementation]
```python
from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
async def run_callback(controller: RunController):
if controller.state is None:
controller.state = {"messages": []}
for command in request.commands:
if command.type == "add-message":
controller.state["messages"].append(command.message)
async for message in your_agent.stream():
controller.state["messages"].append(message)
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
```
```python
from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run
from assistant_stream.modules.langgraph import append_langgraph_event
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
async def run_callback(controller: RunController):
if controller.state is None:
controller.state = {"messages": []}
input_messages = []
for command in request.commands:
if command.type == "add-message":
text_parts = [
p.text for p in command.message.parts
if p.type == "text" and p.text
]
if text_parts:
input_messages.append(HumanMessage(content=" ".join(text_parts)))
async for namespace, event_type, chunk in graph.astream(
{"messages": input_messages},
stream_mode=["messages", "updates"],
subgraphs=True,
):
append_langgraph_event(controller.state, namespace, event_type, chunk)
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
```
Full LangGraph example: [`python/assistant-transport-backend-langgraph`](https://github.com/assistant-ui/assistant-ui/tree/main/python/assistant-transport-backend-langgraph).
## Streaming protocol \[#streaming-protocol]
assistant-stream replicates an arbitrary JSON object via two operations.
### Operations \[#operations]
These two operations cover all complex state mutations: `set` for value updates and structure, `append-text` for efficient streaming of text content.
#### `set` \[#set]
```json
// Operation
{ "type": "set", "path": ["status"], "value": "completed" }
// Before
{ "status": "pending" }
// After
{ "status": "completed" }
```
#### `append-text` \[#append-text]
```json
// Operation
{ "type": "append-text", "path": ["message"], "value": " World" }
// Before
{ "message": "Hello" }
// After
{ "message": "Hello World" }
```
### Wire format \[#wire-format]
The wire format will migrate to Server-Sent Events (SSE) in a future release.
Inspired by [AI SDK's data stream protocol](https://sdk.vercel.ai/docs/ai-sdk-ui/stream-protocol).
**state update:**
```
aui-state:[{"type":"set","path":["status"],"value":"completed"}]
```
**error:**
```
3:"error message"
```
## Building a frontend \[#building-a-frontend]
`useAssistantTransportRuntime` accepts:
```ts
{
initialState: T,
api: string,
resumeApi?: string,
protocol?: "data-stream" | "assistant-transport",
converter: (state: T, connectionMetadata: ConnectionMetadata) => AssistantTransportState,
headers?: Record | Headers | (() => Promise | Headers>),
body?: object | (() => Promise