Skip to main content
This page documents the lower-level helpers that already exist in aion.langgraph today. Use these helpers when you need direct control over emitted TaskStatusUpdateEvent or TaskArtifactUpdateEvent objects. For ordinary reply and post authoring, the higher-level Thread and Message APIs described elsewhere in this section are the intended fluent surface. These helpers still participate in the same request-scoped response accumulator. They do not replace the standard precedence of response buffer, a2a_outbox, and framework-native fallback. These helpers are intended for use with streaming requests where Aion Server consumes stream_mode=["values", "messages", "custom", "updates"].

Event Helpers

HelperEmitted A2A EventBehavior
emit_file_artifactTaskArtifactUpdateEventEmits a file artifact with a file part.
emit_data_artifactTaskArtifactUpdateEventEmits a structured artifact with a data part.
emit_messageTaskStatusUpdateEvent or TaskArtifactUpdateEventEmits message or stream updates.
emit_task_updateTaskStatusUpdateEventEmits one combined task update.
emit_reactionA2A ReactionActionPayloadEmits a reaction on an existing provider message.

Functions

emit_file_artifact(...)

emit_file_artifact(
    writer,
    *,
    url=None,
    data=None,
    mime_type,
    name=None,
    artifact_id=None,
    append=False,
    is_last_chunk=True,
)
Emits a file artifact and maps to an A2A Artifact with a file part.
ParameterDescription
writerLangGraph StreamWriter from node signature
urlFile URL for remote files; mutually exclusive with data
dataFile content as bytes; mutually exclusive with url
mime_typeMIME type such as application/pdf or image/png
nameArtifact name (default: file)
artifact_idExplicit artifact ID; auto-generated if not provided
appendSet true to append to a previously sent artifact
is_last_chunkSet false if more chunks are coming
Use cases: generated PDFs or images, chunked file streaming, external file references. Example:
from uuid import uuid4

from langgraph.types import StreamWriter

from aion.langgraph import emit_file_artifact


def my_node(state: dict, writer: StreamWriter):
    emit_file_artifact(
        writer,
        url="https://example.com/report.pdf",
        mime_type="application/pdf",
        name="report",
    )

    artifact_id = str(uuid4())
    for i, file_bytes in enumerate(file_chunks):
        emit_file_artifact(
            writer,
            data=file_bytes,
            mime_type="text/plain",
            artifact_id=artifact_id,
            append=True,
            is_last_chunk=(i == len(file_chunks) - 1),
        )

    return state

emit_data_artifact(...)

emit_data_artifact(writer, data, name=None, artifact_id=None, append=False, is_last_chunk=True) Emits a structured data artifact. data must be JSON-serializable.
ParameterDescription
writerLangGraph StreamWriter from node signature
dataDictionary or any JSON-serializable value
nameArtifact name (default: data)
artifact_idExplicit artifact ID; auto-generated if not provided
appendSet true to append to a previously sent artifact
is_last_chunkSet false if more chunks are coming
Use cases: analysis outputs, metrics, and structured response payloads. Example:
from langgraph.types import StreamWriter

from aion.langgraph import emit_data_artifact


def my_node(state: dict, writer: StreamWriter):
    emit_data_artifact(
        writer,
        {"status": "success", "results": ["a", "b"]},
        name="analysis",
    )
    return state

emit_message(...)

emit_message(writer, message, ephemeral=False, routing=None) Emits a programmatic message during graph execution. Supports full messages and streaming chunks.
ParameterDescription
writerLangGraph StreamWriter from node signature
messageLangChain AIMessage or AIMessageChunk
ephemeralIf true, event is sent to client but not persisted in task history
routingOptional MessageActionPayload for explicit delivery routing; if not provided, uses default reply target
ephemeral=False (default):
  • AIMessage -> TaskStatusUpdateEvent(working, message=...); persisted in history.
  • AIMessageChunk -> TaskArtifactUpdateEvent(STREAM_DELTA); streamed and not persisted.
ephemeral=True:
  • AIMessage or AIMessageChunk -> TaskArtifactUpdateEvent(EPHEMERAL_MESSAGE).
  • Emitted to client and filtered out by task store.
  • Does not change durable response fallback behavior.
Use cases: progress notifications, thinking indicators, and transient status events. Example:
from langchain_core.messages import AIMessage, AIMessageChunk
from langgraph.types import StreamWriter

from aion.langgraph import emit_message


def my_node(state: dict, writer: StreamWriter):
    emit_message(writer, AIMessageChunk(content="Processing..."))
    emit_message(
        writer,
        AIMessage(content="Searching knowledge base..."),
        ephemeral=True,
    )
    return state

emit_task_update(...)

emit_task_update(writer, message=None, metadata=None) Emits one combined task update containing message and or metadata. Only AIMessage is accepted for message. For chunk streaming, use emit_message().
ParameterDescription
writerLangGraph StreamWriter from node signature
messageOptional full message (AIMessage only)
metadataOptional metadata dictionary to merge into task metadata
At least one of message or metadata must be provided. Keys with the aion: prefix in metadata are ignored. Use cases: step completion updates with metadata, progress plus message in one event. Example:
from langchain_core.messages import AIMessage
from langgraph.types import StreamWriter

from aion.langgraph import emit_task_update


def my_node(state: dict, writer: StreamWriter):
    emit_task_update(
        writer,
        message=AIMessage(content="Processing complete"),
        metadata={"progress": 100},
    )
    return state

emit_reaction(...)

emit_reaction(writer, payload) Emits a reaction action on an existing provider message. Instructs the distribution to add or remove a reaction.
ParameterDescription
writerLangGraph StreamWriter from node signature
payloadReactionActionPayload with reaction details
The ReactionActionPayload must include:
  • context_id: Message context/thread identifier
  • message_id: Message identifier to react to
  • reaction_key: Reaction identifier (e.g., thumbsup, heart)
  • operation: "add" or "remove"
  • display_value: Optional human-readable reaction text
Use cases: automated reactions to messages, conditional emoji responses, approval workflows. Example:
from aion.core.types.a2a.extensions.messaging import ReactionActionPayload
from langgraph.types import StreamWriter

from aion.langgraph import emit_reaction


def my_node(state: dict, writer: StreamWriter):
    emit_reaction(
        writer,
        ReactionActionPayload(
            context_id="C06ROOM123",
            message_id="1728162300.551219",
            reaction_key="thumbsup",
            operation="add",
            display_value="Good point",
        ),
    )
    return state