Provider streaming internals
This document explains how token/tool streaming is normalized in @oh-my-pi/pi-ai, then propagated through @oh-my-pi/pi-agent-core and coding-agent session events.
End-to-end flow
streamSimple()(packages/ai/src/stream.ts) maps generic options and dispatches to a provider stream function.- Provider stream functions (
anthropic.ts,openai-responses.ts,google.ts) translate provider-native stream events into the unifiedAssistantMessageEventsequence. - Each provider pushes events into
AssistantMessageEventStream(packages/ai/src/utils/event-stream.ts), which throttles delta events and exposes:- async iteration for incremental updates
result()for finalAssistantMessage
agentLoop(packages/agent/src/agent-loop.ts) consumes those events, mutates in-flight assistant state, and emitsmessage_updateevents carrying the rawassistantMessageEvent.AgentSession(packages/coding-agent/src/session/agent-session.ts) subscribes to agent events, persists messages, drives extension hooks, and applies session behaviors (retry, compaction, TTSR, streaming-edit abort checks).
Unified stream contract in @oh-my-pi/pi-ai
All providers emit the same shape (AssistantMessageEvent in packages/ai/src/types.ts):
start- content block lifecycle triplets:
- text:
text_start→text_delta* →text_end - thinking:
thinking_start→thinking_delta* →thinking_end - tool call:
toolcall_start→toolcall_delta* →toolcall_end
- text:
- terminal event:
donewithreason: "stop" | "length" | "toolUse"- or
errorwithreason: "aborted" | "error"
AssistantMessageEventStream guarantees:
- final result is resolved by terminal event (
doneorerror) - deltas are batched/throttled (~50ms)
- buffered deltas are flushed before non-delta events and before completion
Delta throttling and harmonization behavior
AssistantMessageEventStream treats text_delta, thinking_delta, and toolcall_delta as mergeable events:
- buffered deltas are merged only when type + contentIndex match
- merge keeps the latest
partialsnapshot - non-delta events force immediate flush
This smooths high-frequency provider streams for TUI/event consumers, but is not provider backpressure: providers still produce at full speed, while the local stream buffers.
Provider normalization details
Anthropic (anthropic-messages)
Source: packages/ai/src/providers/anthropic.ts
Normalization points:
message_startinitializes usage (input/output/cache tokens)content_block_startmaps to text/thinking/toolcall startscontent_block_deltamaps:text_delta→text_deltathinking_delta→thinking_deltainput_json_delta→toolcall_deltasignature_deltaupdatesthinkingSignatureonly (no event)
content_block_stopemits corresponding*_endmessage_delta.stop_reasonmaps viamapStopReason()
Tool-call argument streaming:
- each tool block carries internal
partialJson - every JSON delta appends to
partialJson argumentsare reparsed on each delta viaparseStreamingJson()toolcall_endreparses once more, then stripspartialJson
OpenAI Responses (openai-responses)
Source: packages/ai/src/providers/openai-responses.ts
Normalization points:
response.output_item.addedstarts reasoning/text/function-call blocks- reasoning summary events (
response.reasoning_summary_text.delta) becomethinking_delta - output/refusal deltas become
text_delta response.function_call_arguments.deltabecomestoolcall_deltaresponse.output_item.doneemitsthinking_end/text_end/toolcall_endresponse.completedmaps status to stop reason and usage
Tool-call argument streaming:
- same
partialJsonaccumulation pattern as Anthropic - providers that send only
response.function_call_arguments.donestill populate final args - tool call IDs are normalized as
"<call_id>|<item_id>"
Google Generative AI (google-generative-ai)
Source: packages/ai/src/providers/google.ts
Normalization points:
- iterates
candidate.content.parts - text parts are split into thinking vs text by
isThinkingPart(part) - block transitions close previous block before starting a new one
part.functionCallis treated as a complete tool call (start/delta/end emitted immediately)- finish reason mapped by
mapStopReason()fromgoogle-shared.ts
Tool-call argument streaming:
- function call args arrive as structured object, not incremental JSON text
- implementation emits one synthetic
toolcall_deltacontainingJSON.stringify(arguments) - no partial JSON parser needed for Google in this path
Partial tool-call JSON accumulation and recovery
Shared behavior for Anthropic/OpenAI Responses uses parseStreamingJson() (packages/ai/src/utils/json-parse.ts):
- try
JSON.parse - fallback to
partial-jsonparser for incomplete fragments - if both fail, return
{}
Implications:
- malformed or truncated argument deltas do not crash stream processing immediately
- in-progress
argumentsmay temporarily be{} - later valid deltas can recover structured arguments because parsing is retried on every append
- final
toolcall_endperforms one more parse attempt before emission
Stop reasons vs transport/runtime errors
Provider stop reasons are mapped to normalized stopReason:
- Anthropic:
end_turn→stop,max_tokens→length,tool_use→toolUse, safety/refusal cases→error - OpenAI Responses:
completed→stop,incomplete→length,failed/cancelled→error - Google:
STOP→stop,MAX_TOKENS→length, safety/prohibited/malformed-function-call classes→error
Error semantics are split in two stages:
- Model completion semantics (provider reported finish reason/status)
- Transport/runtime failure (network/client/parser/abort exceptions)
If provider stream throws or signals failure, each provider wrapper catches and emits terminal error event with:
stopReason = "aborted"when abort signal is set- otherwise
stopReason = "error" errorMessage = formatErrorMessageWithRetryAfter(error)
Malformed chunk / SSE parse failure behavior
For these provider paths, chunk/SSE framing is handled by vendor SDK streams (Anthropic SDK, OpenAI SDK, Google SDK). This code does not implement a custom SSE decoder here.
Observed behavior in current implementation:
- malformed chunk/SSE parsing at SDK level surfaces as an exception or stream
errorevent - provider wrapper converts that into unified terminal
errorevent - no provider-specific resume/retry inside the stream function itself
- higher-level retries are handled in
AgentSessionauto-retry logic (message-level retry, not stream-chunk replay)
Cancellation boundaries
Cancellation is layered:
- AI provider request:
options.signalis passed into provider client stream call. - Provider wrapper: after stream loop, aborted signal forces error path (
"Request was aborted"). - Agent loop: checks
signal.abortedbefore handling each provider event and can synthesize an aborted assistant message from the latest partial. - Session/agent controls:
AgentSession.abort()->agent.abort()-> shared abort controller cancellation.
Tool execution cancellation is separate from model stream cancellation:
- tool runners use
AbortSignal.any([agentSignal, steeringAbortSignal]) - steering interrupts can abort remaining tool execution while preserving already-produced tool results
Backpressure boundaries
There is no hard backpressure mechanism between provider SDK stream and downstream consumers:
EventStreamuses in-memory queues with no max size- throttling reduces UI update rate but does not slow provider intake
- if consumers lag significantly, queued events can grow until completion
Current design favors responsiveness and simple ordering over bounded-buffer flow control.
How stream events surface as agent/session events
agentLoop.streamAssistantResponse() bridges AssistantMessageEvent to AgentEvent:
- on
start: pushes placeholder assistant message and emitsmessage_start - on block events (
text_*,thinking_*,toolcall_*): updates last assistant message, emitsmessage_updatewith rawassistantMessageEvent - on terminal (
done/error): resolves final message fromresponse.result(), emitsmessage_end
AgentSession then consumes those events for session-level behaviors:
- TTSR watches
message_update.assistantMessageEventfortext_deltaandtoolcall_delta - streaming edit guard inspects
toolcall_delta/toolcall_endoneditcalls and can abort early - persistence writes finalized messages at
message_end - auto-retry examines assistant
stopReason === "error"pluserrorMessageheuristics
Unified vs provider-specific responsibilities
Unified (common contract):
- event shape (
AssistantMessageEvent) - final result extraction (
done/error) - delta throttling + merge rules
- agent/session event propagation model
Provider-specific (not fully abstracted):
- upstream event taxonomies and mapping logic
- stop-reason translation tables
- tool-call ID conventions
- reasoning/thinking block semantics and signatures
- usage token semantics and availability timing
- message conversion constraints per API
Implementation files
../../ai/src/stream.ts— provider dispatch, option mapping, API key/session plumbing.../../ai/src/utils/event-stream.ts— generic stream queue + assistant delta throttling.../../ai/src/utils/json-parse.ts— partial JSON parsing for streamed tool arguments.../../ai/src/providers/anthropic.ts— Anthropic event translation and tool JSON delta accumulation.../../ai/src/providers/openai-responses.ts— OpenAI Responses event translation and status mapping.../../ai/src/providers/google.ts— Gemini stream chunk-to-block translation.../../ai/src/providers/google-shared.ts— Gemini finish-reason mapping and shared conversion rules.../../agent/src/agent-loop.ts— provider stream consumption andmessage_updatebridging.../src/session/agent-session.ts— session-level handling of streaming updates, abort, retry, and persistence.