Assistant Transport
If you've built an agent as a Python or TypeScript script and want to add a UI to it, you need to solve two problems: streaming updates to the frontend and integrating with the UI framework. Assistant Transport handles both.
Assistant Transport streams your agent's complete state to the frontend in real-time. Unlike traditional approaches that only stream predefined message types (like text or tool calls), it streams your entire agent state—whatever structure your agent uses internally.
It consists of:
- State streaming: Efficiently streams updates to your agent state (supports any JSON object)
- UI integration: Converts your agent's state into assistant-ui components that render in the browser
- Command handling: Sends user actions (messages, tool executions, custom commands) back to your agent
When to Use Assistant Transport
Use Assistant Transport when:
- You don't have a streaming protocol yet and need one
- You want your agent's native state to be directly accessible in the frontend
- You're building a custom agent framework or one without a streaming protocol (e.g. OSS LangGraph)
Mental Model
The frontend receives state snapshots and converts them to React components. The goal is to have the UI be a stateless view on top of the agent framework state.
The agent server receives commands from the frontend. When a user interacts with the UI (sends a message, clicks a button, etc.), the frontend queues a command and sends it to the backend. Assistant Transport defines standard commands like add-message and add-tool-result, and you can define custom commands.
Command Lifecycle
Commands go through the following lifecycle:
The runtime alternates between idle (no active backend request) and sending (request in flight). When a new command is created while idle, it's immediately sent. Otherwise, it's queued until the current request completes.
To implement this architecture, you need to build 2 pieces:
- Backend endpoint on the agent server that accepts commands and returns a stream of state snapshots
- Frontend-side state converter that converts state snapshots to assistant-ui's data format so that the UI primitives work
Building a Backend Endpoint
Let's build the backend endpoint step by step. You'll need to handle incoming commands, update your agent state, and stream the updates back to the frontend.
The backend endpoint receives POST requests with the following payload:
{
state: T, // The previous state that the frontend has access to
commands: AssistantTransportCommand[],
system?: string,
tools?: ToolDefinition[]
}The backend endpoint returns a stream of state snapshots using the assistant-stream library (npm / PyPI).
Handling Commands
The backend endpoint processes commands from the commands array:
for command in request.commands:
if command.type == "add-message":
# Handle adding a user message
elif command.type == "add-tool-result":
# Handle tool execution result
elif command.type == "my-custom-command":
# Handle your custom commandStreaming Updates
To stream state updates, modify controller.state within your run callback:
from assistant_stream import RunController, create_run
from assistant_stream.serialization import DataStreamResponse
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
async def run_callback(controller: RunController):
# Emits "set" at path ["message"] with value "Hello"
controller.state["message"] = "Hello"
# Emits "append-text" at path ["message"] with value " World"
controller.state["message"] += " World"
# Create and return the stream
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)The state snapshots are automatically streamed to the frontend using the operations described in Streaming Protocol.
Backend Reference Implementation
from assistant_stream import RunController, create_run
from assistant_stream.serialization import DataStreamResponse
async def run_callback(controller: RunController):
# Initialize state
if controller.state is None:
controller.state = {}
# Process commands
for command in request.commands:
# Handle commands...
# Run your agent and stream updates
async for event in agent.stream():
# update controller.state
pass
# Create and return the stream
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
"""Chat endpoint with custom agent streaming."""
async def run_callback(controller: RunController):
# Initialize controller state
if controller.state is None:
controller.state = {"messages": []}
# Process commands
for command in request.commands:
if command.type == "add-message":
# Add message to messages array
controller.state["messages"].append(command.message)
# Run your custom agent and stream updates
async for message in your_agent.stream():
# Push message to messages array
controller.state["messages"].append(message)
# Create streaming response
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run
from assistant_stream.modules.langgraph import append_langgraph_event
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
"""Chat endpoint using LangGraph with streaming."""
async def run_callback(controller: RunController):
# Initialize controller state
if controller.state is None:
controller.state = {}
if "messages" not in controller.state:
controller.state["messages"] = []
input_messages = []
# Process commands
for command in request.commands:
if command.type == "add-message":
text_parts = [
part.text for part in command.message.parts
if part.type == "text" and part.text
]
if text_parts:
input_messages.append(HumanMessage(content=" ".join(text_parts)))
# Create initial state for LangGraph
input_state = {"messages": input_messages}
# Stream events from LangGraph
async for namespace, event_type, chunk in graph.astream(
input_state,
stream_mode=["messages", "updates"],
subgraphs=True
):
append_langgraph_event(
controller.state,
namespace,
event_type,
chunk
)
# Create streaming response
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)Full example: python/assistant-transport-backend-langgraph
Streaming Protocol
The assistant-stream state replication protocol allows for streaming updates to an arbitrary JSON object.
Operations
The protocol supports two operations:
Note: We've found that these two operations are enough to handle all sorts of complex state operations efficiently. set handles value updates and nested structures, while append-text enables efficient streaming of text content.
set
Sets a value at a specific path in the JSON object.
// Operation
{ "type": "set", "path": ["status"], "value": "completed" }
// Before
{ "status": "pending" }
// After
{ "status": "completed" }append-text
Appends text to an existing string value at a path.
// Operation
{ "type": "append-text", "path": ["message"], "value": " World" }
// Before
{ "message": "Hello" }
// After
{ "message": "Hello World" }Wire Format
The wire format will be migrated to Server-Sent Events (SSE) in a future release.
The wire format is inspired by AI SDK's data stream protocol.
State Update:
aui-state:ObjectStreamOperation[]aui-state:[{"type":"set","path":["status"],"value":"completed"}]Error:
3:string3:"error message"Building a Frontend
Now let's set up the frontend. The state converter is the heart of the integration—it transforms your agent's state into the format assistant-ui expects.
The useAssistantTransportRuntime hook is used to configure the runtime. It accepts the following config:
{
initialState: T,
api: string,
resumeApi?: string,
converter: (state: T, connectionMetadata: ConnectionMetadata) => AssistantTransportState,
headers?: Record<string, string> | (() => Promise<Record<string, string>>),
body?: object,
onResponse?: (response: Response) => void,
onFinish?: () => void,
onError?: (error: Error) => void,
onCancel?: () => void
}State Converter
The state converter is the core of your frontend integration. It transforms your agent's state into assistant-ui's message format.
(
state: T, // Your agent's state
connectionMetadata: {
pendingCommands: Command[], // Commands not yet sent to backend
isSending: boolean // Whether a request is in flight
}
) => {
messages: ThreadMessage[], // Messages to display
isRunning: boolean // Whether the agent is running
}Converting Messages
Use the createMessageConverter API to transform your agent's messages to assistant-ui format:
import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
// Define your message type
type YourMessageType = {
id: string;
role: "user" | "assistant";
content: string;
timestamp: number;
};
// Define a converter function for a single message
const exampleMessageConverter = (message: YourMessageType) => {
// Transform a single message to assistant-ui format
return {
role: message.role,
content: [{ type: "text", text: message.content }]
};
};
const messageConverter = createMessageConverter(exampleMessageConverter);
const converter = (state: YourAgentState) => {
return {
messages: messageConverter.toThreadMessages(state.messages),
isRunning: false
};
};import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
import { convertLangChainMessages } from "@assistant-ui/react-langgraph";
const messageConverter = createMessageConverter(convertLangChainMessages);
const converter = (state: YourAgentState) => {
return {
messages: messageConverter.toThreadMessages(state.messages),
isRunning: false
};
};Reverse mapping:
The message converter allows you to retrieve the original message format anywhere inside assistant-ui. This lets you access your agent's native message structure from any assistant-ui component:
// Get original message(s) from a ThreadMessage anywhere in assistant-ui
const originalMessage = messageConverter.toOriginalMessage(threadMessage);Optimistic Updates from Commands
The converter also receives connectionMetadata which contains pending commands. Use this to show optimistic updates:
const converter = (state: State, connectionMetadata: ConnectionMetadata) => {
// Extract pending messages from commands
const optimisticMessages = connectionMetadata.pendingCommands
.filter((c) => c.type === "add-message")
.map((c) => c.message);
return {
messages: [...state.messages, ...optimisticMessages],
isRunning: connectionMetadata.isSending || false
};
};Handling Errors and Cancellations
The onError and onCancel callbacks receive an updateState function that allows you to update the agent state on the client side without making a server request:
const runtime = useAssistantTransportRuntime({
// ... other options
onError: (error, { commands, updateState }) => {
console.error("Error occurred:", error);
console.log("Commands in transit:", commands);
// Update state to reflect the error
updateState((currentState) => ({
...currentState,
lastError: error.message,
}));
},
onCancel: ({ commands, updateState }) => {
console.log("Request cancelled");
console.log("Commands in transit or queued:", commands);
// Update state to reflect cancellation
updateState((currentState) => ({
...currentState,
status: "cancelled",
}));
},
});Note: onError receives commands that were in transit, while onCancel receives both in-transit and queued commands.
Custom Headers and Body
You can pass custom headers and body to the backend endpoint:
const runtime = useAssistantTransportRuntime({
// ... other options
headers: {
"Authorization": "Bearer token",
"X-Custom-Header": "value",
},
body: {
customField: "value",
},
});Dynamic Headers and Body
You can also evaluate the header and body payloads on every request by passing an async function:
const runtime = useAssistantTransportRuntime({
// ... other options
headers: async () => ({
"Authorization": `Bearer ${await getAccessToken()}`,
"X-Request-ID": crypto.randomUUID(),
}),
body: {
customField: "value",
},
});Resuming from a Sync Server
We provide a sync server currently only as part of the enterprise plan. Please contact us for more information.
To enable resumability, you need to:
- Pass a
resumeApiURL touseAssistantTransportRuntimethat points to your sync server - Use the
unstable_resumeRunAPI to resume a conversation
import { useAssistantApi } from "@assistant-ui/react";
const runtime = useAssistantTransportRuntime({
// ... other options
api: "http://localhost:8010/assistant",
resumeApi: "http://localhost:8010/resume", // Sync server endpoint
// ... other options
});
// Typically called on thread switch or mount to check if sync server has anything to resume
const api = useAssistantApi();
api.thread().unstable_resumeRun({
parentId: null, // Ignored (will be removed in a future version)
});Accessing Runtime State
Use the useAssistantTransportState hook to access the current agent state from any component:
import { useAssistantTransportState } from "@assistant-ui/react";
function MyComponent() {
const state = useAssistantTransportState();
return <div>{JSON.stringify(state)}</div>;
}You can also pass a selector function to extract specific values:
function MyComponent() {
const messages = useAssistantTransportState((state) => state.messages);
return <div>Message count: {messages.length}</div>;
}Type Safety
Use module augmentation to add types for your agent state:
import "@assistant-ui/react";
declare module "@assistant-ui/react" {
namespace Assistant {
interface ExternalState {
myState: {
messages: Message[];
customField: string;
};
}
}
}Note: Place this file anywhere in your project (e.g., src/assistant.config.ts or at the project root). TypeScript will automatically pick up the type augmentation through module resolution—you don't need to import this file anywhere.
After adding the type augmentation, useAssistantTransportState will be fully typed:
function MyComponent() {
// TypeScript knows about your custom fields
const customField = useAssistantTransportState((state) => state.customField);
return <div>{customField}</div>;
}Accessing the Original Message
If you're using createMessageConverter, you can access the original message format from any assistant-ui component using the converter's toOriginalMessage method:
import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
import { useMessage } from "@assistant-ui/react";
const messageConverter = createMessageConverter(yourMessageConverter);
function MyMessageComponent() {
const message = useMessage();
// Get the original message(s) from the converted ThreadMessage
const originalMessage = messageConverter.toOriginalMessage(message);
// Access your agent's native message structure
return <div>{originalMessage.yourCustomField}</div>;
}You can also use toOriginalMessages to get all original messages when a ThreadMessage was created from multiple source messages:
const originalMessages = messageConverter.toOriginalMessages(message);Frontend Reference Implementation
"use client";
import {
AssistantRuntimeProvider,
AssistantTransportConnectionMetadata,
useAssistantTransportRuntime,
} from "@assistant-ui/react";
type State = {
messages: Message[];
};
// Converter function: transforms agent state to assistant-ui format
const converter = (
state: State,
connectionMetadata: AssistantTransportConnectionMetadata,
) => {
// Add optimistic updates for pending commands
const optimisticMessages = connectionMetadata.pendingCommands
.filter((c) => c.type === "add-message")
.map((c) => c.message);
return {
messages: [...state.messages, ...optimisticMessages],
isRunning: connectionMetadata.isSending || false,
};
};
export function MyRuntimeProvider({ children }) {
const runtime = useAssistantTransportRuntime({
initialState: {
messages: [],
},
api: "http://localhost:8010/assistant",
converter,
headers: async () => ({
"Authorization": "Bearer token",
}),
body: {
"custom-field": "custom-value",
},
onResponse: (response) => {
console.log("Response received from server");
},
onFinish: () => {
console.log("Conversation completed");
},
onError: (error, { commands, updateState }) => {
console.error("Assistant transport error:", error);
console.log("Commands in transit:", commands);
},
onCancel: ({ commands, updateState }) => {
console.log("Request cancelled");
console.log("Commands in transit or queued:", commands);
},
});
return (
<AssistantRuntimeProvider runtime={runtime}>
{children}
</AssistantRuntimeProvider>
);
}"use client";
import {
AssistantRuntimeProvider,
AssistantTransportConnectionMetadata,
unstable_createMessageConverter as createMessageConverter,
useAssistantTransportRuntime,
} from "@assistant-ui/react";
import {
convertLangChainMessages,
LangChainMessage,
} from "@assistant-ui/react-langgraph";
type State = {
messages: LangChainMessage[];
};
const LangChainMessageConverter = createMessageConverter(
convertLangChainMessages,
);
// Converter function: transforms agent state to assistant-ui format
const converter = (
state: State,
connectionMetadata: AssistantTransportConnectionMetadata,
) => {
// Add optimistic updates for pending commands
const optimisticStateMessages = connectionMetadata.pendingCommands.map(
(c): LangChainMessage[] => {
if (c.type === "add-message") {
return [
{
type: "human" as const,
content: [
{
type: "text" as const,
text: c.message.parts
.map((p) => (p.type === "text" ? p.text : ""))
.join("\n"),
},
],
},
];
}
return [];
},
);
const messages = [...state.messages, ...optimisticStateMessages.flat()];
return {
messages: LangChainMessageConverter.toThreadMessages(messages),
isRunning: connectionMetadata.isSending || false,
};
};
export function MyRuntimeProvider({ children }) {
const runtime = useAssistantTransportRuntime({
initialState: {
messages: [],
},
api: "http://localhost:8010/assistant",
converter,
headers: async () => ({
"Authorization": "Bearer token",
}),
body: {
"custom-field": "custom-value",
},
onResponse: (response) => {
console.log("Response received from server");
},
onFinish: () => {
console.log("Conversation completed");
},
onError: (error, { commands, updateState }) => {
console.error("Assistant transport error:", error);
console.log("Commands in transit:", commands);
},
onCancel: ({ commands, updateState }) => {
console.log("Request cancelled");
console.log("Commands in transit or queued:", commands);
},
});
return (
<AssistantRuntimeProvider runtime={runtime}>
{children}
</AssistantRuntimeProvider>
);
}Full example: examples/with-assistant-transport
Custom Commands
Defining Custom Commands
Use module augmentation to define a custom command:
import "@assistant-ui/react";
declare module "@assistant-ui/react" {
namespace Assistant {
interface Commands {
myCustomCommand: {
type: "my-custom-command";
data: string;
};
}
}
}Issuing Commands
Use the useAssistantTransportSendCommand hook to send custom commands:
import { useAssistantTransportSendCommand } from "@assistant-ui/react";
function MyComponent() {
const sendCommand = useAssistantTransportSendCommand();
const handleClick = () => {
sendCommand({
type: "my-custom-command",
data: "Hello, world!",
});
};
return <button onClick={handleClick}>Send Custom Command</button>;
}Backend Integration
The backend receives custom commands in the commands array, just like built-in commands:
for command in request.commands:
if command.type == "add-message":
# Handle add-message command
elif command.type == "add-tool-result":
# Handle add-tool-result command
elif command.type == "my-custom-command":
# Handle your custom command
data = command.dataOptimistic Updates
Update the state converter to optimistically handle the custom command:
const converter = (state: State, connectionMetadata: ConnectionMetadata) => {
// Filter custom commands from pending commands
const customCommands = connectionMetadata.pendingCommands.filter(
(c) => c.type === "my-custom-command"
);
// Apply optimistic updates based on custom commands
const optimisticState = {
...state,
customData: customCommands.map((c) => c.data),
};
return {
messages: state.messages,
state: optimisticState,
isRunning: connectionMetadata.isSending || false,
};
};Cancellation and Error Behavior
Custom commands follow the same lifecycle as built-in commands. You can update your onError and onCancel handlers to take custom commands into account:
const runtime = useAssistantTransportRuntime({
// ... other options
onError: (error, { commands, updateState }) => {
// Check if any custom commands were in transit
const customCommands = commands.filter((c) => c.type === "my-custom-command");
if (customCommands.length > 0) {
// Handle custom command errors
updateState((state) => ({
...state,
customCommandFailed: true,
}));
}
},
onCancel: ({ commands, updateState }) => {
// Check if any custom commands were queued or in transit
const customCommands = commands.filter((c) => c.type === "my-custom-command");
if (customCommands.length > 0) {
// Handle custom command cancellation
updateState((state) => ({
...state,
customCommandCancelled: true,
}));
}
},
});