Skip to main content
This page documents helper APIs for emitting A2A-compatible events over LangGraph custom stream output. These helpers are intended for use with streaming requests where Aion Server consumes: stream_mode=["values", "messages", "custom", "updates"].

Event Helpers

HelperEmitted A2A EventBehavior
emit_file_artifactTaskArtifactUpdateEventEmits a file artifact with a file part.
emit_data_artifactTaskArtifactUpdateEventEmits a structured artifact with a data part.
emit_messageTaskStatusUpdateEvent or TaskArtifactUpdateEventEmits message/status or stream artifacts.
emit_task_updateTaskStatusUpdateEventEmits one combined task update with message and/or metadata.

Functions

emit_file_artifact(...)

emit_file_artifact(writer, *, url=None, base64=None, mime_type, name=None, artifact_id=None, append=False, is_last_chunk=True) Emits a file artifact and maps to an A2A Artifact with a file part.
ParameterDescription
writerLangGraph StreamWriter from node signature
urlFile URL for remote files; mutually exclusive with base64
base64File content as base64; mutually exclusive with url
mime_typeMIME type such as application/pdf or image/png
nameArtifact name (default: file)
artifact_idExplicit artifact ID; auto-generated if not provided
appendSet true to append to a previously sent artifact
is_last_chunkSet false if more chunks are coming
Use cases: generated PDFs/images/documents, chunked file streaming, external file references. Example:
from uuid import uuid4
from langgraph.types import StreamWriter
from aion.langgraph import emit_file_artifact


def my_node(state: dict, writer: StreamWriter):
    # Single file by URL
    emit_file_artifact(
        writer,
        url="https://example.com/report.pdf",
        mime_type="application/pdf",
        name="report",
    )

    # Streaming file chunks with a stable artifact_id
    artifact_id = str(uuid4())
    for i, chunk_base64 in enumerate(file_chunks):
        emit_file_artifact(
            writer,
            base64=chunk_base64,
            mime_type="text/plain",
            artifact_id=artifact_id,
            append=True,
            is_last_chunk=(i == len(file_chunks) - 1),
        )

    return state

emit_data_artifact(...)

emit_data_artifact(writer, data, name=None, artifact_id=None, append=False, is_last_chunk=True) Emits a structured data artifact. data must be JSON-serializable.
ParameterDescription
writerLangGraph StreamWriter from node signature
dataDictionary or any JSON-serializable value
nameArtifact name (default: data)
artifact_idExplicit artifact ID; auto-generated if not provided
appendSet true to append to a previously sent artifact
is_last_chunkSet false if more chunks are coming
Use cases: analysis outputs, metrics, structured response payloads. Example:
from langgraph.types import StreamWriter
from aion.langgraph import emit_data_artifact


def my_node(state: dict, writer: StreamWriter):
    emit_data_artifact(
        writer,
        {"status": "success", "results": ["a", "b"]},
        name="analysis",
    )
    return state

emit_message(...)

emit_message(writer, message, ephemeral=False) Emits a programmatic message during graph execution. Supports full messages and streaming chunks.
ParameterDescription
writerLangGraph StreamWriter from node signature
messageLangChain AIMessage or AIMessageChunk
ephemeralIf true, event is sent to client but not persisted in task history
ephemeral=False (default):
  • AIMessage -> TaskStatusUpdateEvent(working, message=...); persisted in history.
  • AIMessageChunk -> TaskArtifactUpdateEvent(STREAM_DELTA); streamed and not persisted.
ephemeral=True:
  • AIMessage or AIMessageChunk -> TaskArtifactUpdateEvent(EPHEMERAL_MESSAGE).
  • Emitted to client and filtered out by task store (not persisted).
  • Does not change streaming accumulation or final response fallback logic.
Use cases: progress notifications, “thinking” indicators, transient status events. Example:
from langgraph.types import StreamWriter
from langchain_core.messages import AIMessage, AIMessageChunk
from aion.langgraph import emit_message


def my_node(state: dict, writer: StreamWriter):
    emit_message(writer, AIMessageChunk(content="Processing..."))
    emit_message(writer, AIMessage(content="Searching knowledge base..."), ephemeral=True)
    return state

emit_task_update(...)

emit_task_update(writer, message=None, metadata=None) Emits one combined task update containing message and/or metadata. Only AIMessage is accepted for message. For chunk streaming, use emit_message().
ParameterDescription
writerLangGraph StreamWriter from node signature
messageOptional full message (AIMessage only)
metadataOptional metadata dictionary to merge into task metadata
At least one of message or metadata must be provided. Keys with the aion: prefix in metadata are ignored. Use cases: step completion updates with metadata, progress + message in one event. Example:
from langgraph.types import StreamWriter
from langchain_core.messages import AIMessage
from aion.langgraph import emit_task_update


def my_node(state: dict, writer: StreamWriter):
    emit_task_update(
        writer,
        message=AIMessage(content="Processing complete"),
        metadata={"progress": 100},
    )
    return state