Streaming

Agent provides token-by-token streaming from LangGraph agents via Server-Sent Events (SSE). Every update lands directly in Angular Signals โ€” no subscriptions, no manual change detection.

Prerequisites

Make sure you've completed the Installation guide first.

#How streaming works

Streaming starts on the agent side. LangGraph's astream() method controls what data is sent over the SSE connection. On the Angular side, agent() consumes those events and maps them to Signals.

from langgraph.graph import END, START, MessagesState, StateGraph
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-5-mini", streaming=True)
 
def call_model(state: MessagesState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}
 
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", END)
 
graph = builder.compile()
 
# Stream modes control what SSE chunks contain. LangGraph Platform accepts
# one mode or a list of modes, depending on the API you call:
 
# "values" โ€” full state snapshot after each node
async for chunk in graph.astream(
    {"messages": [("user", "Hello")]},
    stream_mode="values",
):
    print(chunk)
 
# "messages" โ€” individual message tokens as generated
async for chunk in graph.astream(
    {"messages": [("user", "Hello")]},
    stream_mode="messages",
):
    print(chunk)
 
# "events" โ€” raw run events (on_chain_start, on_llm_stream, etc.)
async for event in graph.astream_events(
    {"messages": [("user", "Hello")]},
    version="v2",
):
    print(event["event"], event.get("data"))

#Stream status

The status() signal reports the current lifecycle state of the SSE connection:

1
idle

No active stream. The resource is ready to accept a new message.

2
running

Tokens are arriving over the SSE connection. Signal values update in real-time with each chunk.

3
error

The connection was interrupted or the agent returned an error. Inspect error() for the full details.

#Stream modes

By default, agent() asks LangGraph Platform for the stream modes it needs to populate its public signals: values, messages-tuple, updates, and custom. It also enables streamSubgraphs so namespaced subgraph events can reach the client.

Override streamMode per run when you need a narrower stream. streamMode is a submit option, not an agent() option.

// Receives the full agent state after every node execution.
// Best when you only need state snapshots.
const chat = agent({
  assistantId: 'chat_agent',
});
 
await chat.submit(
  { message: 'Summarize this thread.' },
  { streamMode: ['values'] },
);
 
// chat.messages() always contains the complete message list
Choosing a mode

Use the default modes for most chat UIs. They keep messages(), state(), toolCalls(), customEvents(), and subagent streams populated from the same run. Narrow the mode list only when you know which signals your UI will read.

#Error handling

If the SSE connection drops or the agent throws, status() transitions to 'error' and error() is populated. Use these signals to render fallback UI and retry.

import { Component, computed, ChangeDetectionStrategy } from '@angular/core';
import { agent } from '@ngaf/langgraph';
 
@Component({
  selector: 'app-chat',
  templateUrl: './chat.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ChatComponent {
  readonly chat = agent({
    assistantId: 'chat_agent',
  });
 
  readonly hasError = computed(() => this.chat.status() === 'error');
 
  retry() {
    // Re-stream using the same thread so context is preserved
    this.chat.submit();
  }
}
Network errors vs agent errors

error() surfaces both transport-level failures (lost connection, 5xx) and application-level errors returned by the agent graph. Check error().cause for the underlying HTTP status when you need to distinguish them.

#Throttle configuration

By default Agent coalesces state-like signal updates every 16 ms. That is close to a 60 fps render cadence and prevents fast SSE streams from triggering hundreds of state renders per second.

const chat = agent({
  assistantId: 'chat_agent',
  // Batch incoming chunks and flush at most once every 50 ms
  throttle: 50,
});

The value is in milliseconds. Pass false or 0 to disable batching and forward state updates immediately. Token message updates are not throttled, so live markdown and typing indicators still receive every token emission.

Use caseRecommended throttle
Token-by-token typing effectdefault 16 ms
Standard chat bubbledefault 16 ms or 50 ms
Background summarisation150 ms
SSE connection behavior

Each call to chat.submit() opens a new SSE connection. Connections are automatically closed when the agent run completes or when the Angular component is destroyed โ€” you do not need to manage the lifecycle manually.

#What's Next