Skip to main content
This page describes how Aion Server adapts A2A requests to LangGraph and maps LangGraph outputs and events back into A2A Messages, Tasks, and streaming events. This page uses the v1 canonical JSON-RPC method names. Aion ingress may continue to accept legacy slash-style aliases for compatibility.

1. Inbound Messages

1.1 Graph Invocation

Both SendMessage and SendStreamingMessage use the same event generation flow: AgentExecutor.execute() always produces an event stream via graph.astream(). The difference is in how DefaultRequestHandler.ResultAggregator consumes this stream:
  • SendMessage (blocking=true) collects all events and returns the final Task.
  • SendMessage (blocking=false) returns after the first event, then continues processing in background with status="working".
  • SendStreamingMessage yields events as they arrive and streams them to the client via SSE.
Both methods use the same SendMessageRequest payload; only the response mode differs. Older A2A examples before 0.3 used separate invoke() and stream() paths. In A2A 0.3+, execution is unified; only the consumption strategy differs.

1.2 Message Ingress

When an inbound A2A Message arrives:
  1. Append to state.messages (LLM-facing transcript): If the inbound A2A Message contains one or more text parts and the graph state includes a messages property, Aion Server appends a LangChain HumanMessage derived from the A2A text. The default policy concatenates all A2A text parts in order into a single HumanMessage.
  2. Store the raw A2A envelope (transport-facing context): If the graph state includes an a2a_inbox property typed as A2AInbox, Aion Server populates it with: task (current Task), message (full inbound A2A Message, including non-text parts), and metadata (SendMessageRequest-level metadata such as distribution/network and trace info).
  3. Idempotency and dedupe: If the inbound A2A messageId has already been ingested for the current contextId, Aion Server does not append a duplicate HumanMessage.
Declare the typed state fields to opt in:
from typing import TypedDict, Annotated, Optional

from aion.shared.types import A2AInbox, A2AOutbox
from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages


class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    a2a_inbox: Optional[A2AInbox]
    a2a_outbox: Optional[A2AOutbox]
Access the inbox inside a node:
async def my_node(state: AgentState) -> dict:
    inbox = state.get("a2a_inbox")
    if inbox:
        task = inbox.task        # current A2A Task (or None)
        message = inbox.message  # inbound A2A Message (or None)
        metadata = inbox.metadata  # request-level metadata dict

2. Outbound Messages

2.1 SendMessage -> graph.astream()

Valid responses to an A2A SendMessage call are a Message or Task. Aion Server constructs the response using this precedence: (1) a2a_outbox (authoritative) If the returned dictionary contains a2a_outbox, it must be an A2AOutbox instance wrapping either a Task or a Message. Server-owned fields are enforced:
  • task_id and context_id are set to current values managed by Aion Server.
  • Canonical routing and identity metadata (for example aion:network and sender IDs) is server-controlled.
Behavior:
  • If a2a_outbox.message is set, append it to current task history.
  • If a2a_outbox.task is set, treat it as a patch to the server task: server merges or extends history and artifacts; graph-provided metadata merges shallowly; server-controlled keys take precedence.
from a2a.types import Message, Task, Part, Role
from aion.shared.types import A2AOutbox


# Option 1: return a Message
async def reply_node(state: AgentState) -> dict:
    message = Message(
        message_id=str(uuid.uuid4()),
        role=Role.ROLE_AGENT,
        parts=[Part(text="Done!")],
    )
    return {"a2a_outbox": A2AOutbox(message=message)}


# Option 2: return a Task (patch)
async def task_node(state: AgentState) -> dict:
    task = Task(
        id=task_id,
        context_id=context_id,
        status=TaskStatus(state=TaskState.TASK_STATE_WORKING),
        history=[...],
        artifacts=[...],
        metadata={"my_key": "my_value"},
    )
    return {"a2a_outbox": A2AOutbox(task=task)}
Aion Server also keeps state.messages in sync by appending an AIMessage and/or ToolMessage derived from the outbound A2A payload. Linkage: AIMessage.id = a2a.messageId. (2) Fallback: derive A2A Message from state.messages If no a2a_outbox exists and the returned dictionary contains a messages list, use the last AIMessage to construct an outbound A2A Message with role=Role.ROLE_AGENT and a single text part. If a developer needs a comprehensive A2A Message (for example data parts, rich metadata, or extension context), they should set a2a_outbox instead of relying on fallback inference.

3. Streaming

3.1 SendStreamingMessage -> graph.astream()

Valid responses to an A2A SendStreamingMessage call are:
  • TaskStatusUpdateEvent
  • TaskArtifactUpdateEvent
  • Message
  • Task
Aion Server requests LangGraph stream updates using: stream_mode=["values", "messages", "custom", "updates"]. Each outbound SSE frame is emitted as a StreamResponse wrapper containing exactly one of: { statusUpdate }, { artifactUpdate }, { message }, or { task }.

3.2 Event Type: values

The last values payload in the stream represents the final output and state snapshot. Aion Server uses it to update task state and determine the final terminal response if one has not already been sent. Output mapping follows the same precedence as Section 2.1, with one addition: (3) If neither a2a_outbox nor messages exist Aion Server constructs an A2A Message using accumulated streamed deltas collected in the "aion:stream-delta" artifact via messages mode (see Section 3.3).

3.3 Event Type: messages

messages stream mode yields LLM output chunks as (message_chunk, metadata). These events are not diffs to state.messages. Multiple LLM invocations in a graph can produce messages events. To bridge this to A2A, chunks are appended into a transitory streaming artifact:
  • artifact.name = "Stream Delta"
  • artifact.id = "aion:stream-delta"
  • append=true for each chunk
  • lastChunk=true once on completion
A TaskArtifactUpdateEvent is emitted for each chunk. This artifact is transitory and is not persisted to the task’s durable state by default.

3.4 Event Type: custom

The Aion SDK provides helper functions via LangGraph StreamWriter to emit custom events during graph execution. Aion Server listens for these custom payloads and forwards them as A2A events, enforcing canonical taskId and contextId. Precedence rule: explicit A2A streaming events emitted via custom are authoritative. Custom payloads are mapped into A2A streaming events, including status updates and artifact updates. When helper APIs are used, Aion Server applies the same canonical ID and metadata enforcement rules. For helper APIs, parameter semantics, and usage examples, see LangGraph Streaming API.

3.5 Event Type: updates

Used to track the currently executing node. Aion Server extracts the node name and updates execution context accordingly.

4. Summary of Responsibilities

LangGraph Graph Author

  • Keep state.messages as LangChain message types.
  • Optionally set a2a_outbox for full-fidelity A2A responses.
  • For streaming, optionally emit A2A-native events via custom using SDK helper functions.

Aion Server Adapter

  • Own canonical IDs and routing metadata.
  • Ensure idempotency on ingress.
  • Map LangGraph output and state into A2A Message and Task.
  • Stream A2A events as StreamResponse wrappers.