Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.aion.to/llms.txt

Use this file to discover all available pages before exploring further.

This page describes how Aion Server adapts A2A requests to LangGraph and maps LangGraph outputs and events back into A2A Messages, Tasks, and streaming events.

1. Inbound Messages

1.1 Graph Invocation

Both SendMessage and SendStreamingMessage use the same event generation flow: AgentExecutor.execute() always produces an event stream via graph.astream(). The difference is in how DefaultRequestHandler.ResultAggregator consumes this stream:
  • SendMessage (blocking=true) collects all events and returns the final Task.
  • SendMessage (blocking=false) returns after the first event, then continues processing in background with status="submitted".
  • SendStreamingMessage yields events as they arrive and streams them to the client via SSE.
Both methods use the same SendMessageRequest payload; only the response mode differs. Older A2A examples before 0.3 used separate invoke() and stream() paths. In A2A 0.3+, execution is unified; only the consumption strategy differs.

1.2 Message Ingress

When an inbound A2A Message arrives:
  1. Append to state.messages (LLM-facing transcript): If the inbound A2A Message contains one or more text parts and the graph state includes a messages property, Aion Server appends a LangChain HumanMessage derived from the A2A text. The default policy concatenates all A2A text parts in order into a single HumanMessage.
  2. Populate AionContext (invocation-scoped context): Aion Server populates the AionContext for the current invocation with message (normalized inbound message), thread (normalized messaging thread), event (normalized event descriptor), and inbox (raw A2AInbox snapshot with task, message, and metadata for low-level access).
  3. Idempotency and dedupe: If the inbound A2A messageId has already been ingested for the current contextId, Aion Server does not append a duplicate HumanMessage.
Register AionContext as the graph’s context schema:
from typing import TypedDict, Annotated, Optional

from aion.shared.types import A2AOutbox
from aion.langgraph import AionContext
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, add_messages


class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    a2a_outbox: Optional[A2AOutbox]


builder = StateGraph(AgentState, context_schema=AionContext)
Access inbound data inside a node via Runtime[AionContext]:
from langgraph.runtime import Runtime


async def my_node(state: AgentState, runtime: Runtime[AionContext]) -> dict:
    message = runtime.context.message   # normalized inbound message (or None)
    thread = runtime.context.thread     # normalized thread / conversation context
    event = runtime.context.event       # event kind: message, command, reaction, card action
    inbox = runtime.context.inbox       # raw A2AInbox escape hatch (task, message, metadata)
    agent = runtime.context.self        # agent identity for the current invocation
    return {}

2. Outbound Messages

2.1 SendMessage -> graph.astream()

Valid responses to an A2A SendMessage call are a Task. Aion Server constructs the response using this precedence: (1) SDK-managed response buffer (authoritative when populated) The runtime maintains a request-scoped messaging buffer for the current turn. SDK helpers and other higher-level response surfaces should write to that buffer first. This includes streamed output that is intended to become the durable reply. When this buffer is non-empty, it is the authoritative source for A2A response compilation. (2) a2a_outbox If the returned dictionary contains a2a_outbox, it must be an A2AOutbox instance wrapping either a Task or a Message. Server-owned fields are enforced:
  • task_id and context_id are set to current values managed by Aion Server.
  • Canonical routing and identity metadata (for example aion:network and sender IDs) is server-controlled.
Behavior:
  • If a2a_outbox.message is set, append it to current task history.
  • If a2a_outbox.task is set, treat it as a patch to the server task: server merges or extends history and artifacts; graph-provided metadata merges shallowly; server-controlled keys take precedence.
from a2a.types import Message, Task, Part, Role
from aion.shared.types import A2AOutbox


# Option 1: return a Message
async def reply_node(state: AgentState) -> dict:
    message = Message(
        message_id=str(uuid.uuid4()),
        role=Role.ROLE_AGENT,
        parts=[Part(text="Done!")],
    )
    return {"a2a_outbox": A2AOutbox(message=message)}


# Option 2: return a Task (patch)
async def task_node(state: AgentState) -> dict:
    task = Task(
        id=task_id,
        context_id=context_id,
        status=TaskStatus(state=TaskState.TASK_STATE_WORKING),
        history=[...],
        artifacts=[...],
        metadata={"my_key": "my_value"},
    )
    return {"a2a_outbox": A2AOutbox(task=task)}
Aion Server also keeps state.messages in sync by appending an AIMessage and/or ToolMessage derived from the outbound A2A payload. Linkage: AIMessage.id = a2a.taskId. (3) Framework-native fallback If neither the SDK-managed response buffer nor a2a_outbox exists, Aion falls back to framework-native output for the current turn:
  • first, accumulated streamed messages chunks
  • then, if needed, the last agent-authored AIMessage in state.messages
The fallback path produces a Task with a terminal status. If a developer needs full control over the task shape, status events, or artifact updates, they should use a2a_outbox instead of relying on fallback inference.

3. Streaming

3.1 SendStreamingMessage -> graph.astream()

Valid responses to an A2A SendStreamingMessage call are a Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent in the following sequence: aion-server-langgraph will first dispatch a Task with a "working" status, followed by one or more TaskStatusUpdateEvent or TaskArtifactUpdateEvent as the stream progresses, and finally a Task with a terminal status. Aion Server requests LangGraph stream updates using: stream_mode=["values", "messages", "custom", "updates"]. In the following sections, we will discuss how each Langgraph event type is mapped to A2A message stream response events.

3.2 Event Type: values

The last values payload in the stream represents the final output and state snapshot. Aion Server uses it to update task state and determine the final terminal response if one has not already been sent. Output mapping follows the same precedence as Section 2.1. If neither the SDK-managed response buffer nor a2a_outbox is populated, Aion Server may construct an A2A Message using accumulated streamed deltas collected in the "aion:stream-delta" artifact via messages mode (see Section 3.3).

3.3 Event Type: messages

messages stream mode yields LLM output chunks as (message_chunk, metadata). These events are not diffs to state.messages. Multiple LLM invocations in a graph can produce messages events. To bridge this to A2A, chunks are appended into a transitory streaming artifact:
  • artifact.name = "Stream Delta"
  • artifact.id = "aion:stream-delta"
  • append=true for each chunk
  • lastChunk=true once on completion
A TaskArtifactUpdateEvent is emitted for each chunk. This artifact is transitory and is not persisted to the task’s durable state by default.

3.4 Event Type: custom

The Aion SDK provides helper functions via LangGraph StreamWriter to emit custom events during graph execution. Aion Server listens for these custom payloads and forwards them as A2A events, enforcing canonical taskId and contextId. Precedence rule: explicit A2A streaming events emitted via custom are authoritative. Custom payloads are mapped into A2A streaming events, including status updates and artifact updates. When helper APIs are used, Aion Server applies the same canonical ID and metadata enforcement rules. For helper APIs, parameter semantics, and usage examples, see LangGraph Streaming API.

3.5 Event Type: updates

Used to track the currently executing node. Aion Server extracts the node name and updates execution context accordingly.

4. Summary of Responsibilities

LangGraph Graph Author

  • Register AionContext as the graph’s context schema via context_schema=AionContext.
  • Access inbound message, thread, event, and agent identity through Runtime[AionContext] in node signatures.
  • Keep state.messages as LangChain message types.
  • Prefer SDK helpers when you want to populate the shared runtime response buffer directly.
  • Optionally set a2a_outbox for full-fidelity A2A responses.
  • For streaming, optionally emit A2A-native events via custom using SDK helper functions.

Aion Server Adapter

  • Own canonical IDs and routing metadata.
  • Ensure idempotency on ingress.
  • Map LangGraph output and state into A2A Message and Task.
  • Stream A2A events as StreamResponse wrappers.