# Agent SSE Events 本文档描述 `GET /api/v1/agent/runs/{thread_id}/events` 的事件协议。 --- ## 1) 事件管道 后端事件流转如下: 1. Runtime 直接产出 AG-UI 事件(如 `RUN_STARTED`、`TOOL_CALL_RESULT`) 2. `agui_codec` 仅做协议对齐与字段净化(例如移除仅后端内部统计字段) 3. 事件同时: - 持久化到数据库(用于 history) - 发布到 Redis Stream(用于 SSE) 4. `/runs/{thread_id}/events` 从 Redis Stream 读取并输出 SSE --- ## 2) SSE 帧格式 每条事件遵循标准 SSE: ```text id: event: data: ``` - `id` 可用于断点续流(`Last-Event-ID`) - `event` 与 JSON 内 `type` 一致(例如 `RUN_STARTED`) - 空闲时可能出现 keep-alive 注释帧: ```text : keep-alive ``` --- ## 3) 事件类型(当前实现) ### 3.1 Run 生命周期 #### `RUN_STARTED` ```json { "type": "RUN_STARTED", "threadId": "...", "runId": "..." } ``` #### `RUN_FINISHED` ```json { "type": "RUN_FINISHED", "threadId": "...", "runId": "..." } ``` #### `RUN_ERROR` ```json { "type": "RUN_ERROR", "threadId": "...", "runId": "...", "message": "runtime execution failed", "code": null } ``` ### 3.2 阶段事件 #### `STEP_STARTED` ```json { "type": "STEP_STARTED", "threadId": "...", "runId": "...", "stepName": "router" | "worker" } ``` #### `STEP_FINISHED` ```json { "type": "STEP_FINISHED", "threadId": "...", "runId": "...", "stepName": "router" | "worker" } ``` ### 3.3 Tool 事件 #### `TOOL_CALL_START` ```json { "type": "TOOL_CALL_START", "threadId": "...", "runId": "...", "messageId": "...", "toolCallId": "...", "toolCallName": "...", "stage": "worker" } ``` #### `TOOL_CALL_ARGS` ```json { "type": "TOOL_CALL_ARGS", "threadId": "...", "runId": "...", "messageId": "...", "toolCallId": "...", "toolCallName": "...", "args": {}, "stage": "worker" } ``` #### `TOOL_CALL_END` ```json { "type": "TOOL_CALL_END", "threadId": "...", "runId": "...", "messageId": "...", "toolCallId": "...", "toolCallName": "...", "stage": "worker" } ``` #### `TOOL_CALL_RESULT` ```json { "type": "TOOL_CALL_RESULT", "threadId": "...", "runId": "...", "messageId": "...", "role": "tool", "stage": "worker", "tool_name": "...", "tool_call_id": "...", "tool_call_args": {}, "status": "success" | "failure" | "partial", "result": "...", "error": null } ``` 说明:`TOOL_CALL_RESULT` 不再携带 `ui_schema`。tool 结果通过 `result` 字段提供紧凑、结构化、可执行的信息(优先包含 id/status/count 等关键事实),用于 agent 后续推理与工具编排。 补充约束: - `tool_call_id` 必须与同次调用的 `TOOL_CALL_START/ARGS/END.toolCallId` 一致,并在每次工具调用中保持唯一。 - `tool_call_args` 仅表示输入参数快照。 - `result` 仅表示执行输出事实,不重复 `tool_call_args` 已包含的输入参数。 ### 3.4 文本完成事件 #### `TEXT_MESSAGE_END` 当前实现仅在 worker 输出完成后发送完整结果,不发送 token delta 事件。 ```json { "type": "TEXT_MESSAGE_END", "threadId": "...", "runId": "...", "messageId": "...", "role": "assistant", "stage": "worker", "status": "success" | "partial_success" | "failed", "answer": "...", "key_points": [], "result_type": "execution_report" | "clarification" | "error_report" | "unknown", "suggested_actions": [], "error": null, "ui_schema": {} } ``` `inputTokens`、`outputTokens`、`cost`、`latencyMs`、`model` 属于后端内部统计字段,不在 SSE 对外协议中暴露。 --- ## 5) Usage 审计协议(后端内部) 本节描述后端对 LLM usage 的内部审计与计费策略。该协议用于数据库持久化、成本统计与运行观测,不对 SSE 外部协议直接暴露。 ### 5.1 当前厂商范围 - DashScope(Qwen) - DeepSeek 当前实现仅针对上述两家做深度适配。 ### 5.2 原始字段采集(Provider -> Runtime) `TrackingChatModel` 会优先读取 provider 直接字段,读取不到时再从 metadata 补齐。 优先级如下: 1. 直接字段(优先) - `usage.input_tokens` - `usage.output_tokens` - `usage.total_tokens` - `usage.time`(秒) - `usage.cost`(若存在) 2. metadata 字段(补齐) - `metadata.prompt_tokens` - `metadata.completion_tokens` - `metadata.total_tokens` - `metadata.prompt_tokens_details.cached_tokens` - `metadata.prompt_cache_hit_tokens` - `metadata.prompt_cache_miss_tokens` - `metadata.completion_tokens_details.reasoning_tokens` - `metadata.cost` / `metadata.total_cost`(若存在) ### 5.3 归一化后的内部 usage_summary 字段 `TrackingChatModel.usage_summary()` 当前输出: - `input_tokens` - `output_tokens` - `total_tokens` - `latency_ms`(由 `usage.time * 1000` 转换) - `cached_prompt_tokens` - `prompt_cache_hit_tokens` - `prompt_cache_miss_tokens` - `reasoning_tokens` - `direct_cost` - `direct_cost_observed`(0/1) - `direct_cost_complete`(0/1) - `model_call_records` - `usage_records` - `direct_cost_records` - `cost_source`(`provider` | `catalog_fallback`) ### 5.4 成本计算策略(严谨优先) 核心原则:**能直接用 provider 返回就直接用;缺失才 fallback。** `LiteLLMService.build_usage_metadata()` 执行规则: 1. 仅当以下条件同时满足时使用 provider 直出成本: - `usageComplete == true`(`model_call_records == usage_records`) - `direct_cost_observed == 1` - `direct_cost_complete == 1` - `direct_cost` 为有效非负数 2. 否则使用 catalog 价格回退计算(`calculate_cost`) ### 5.5 Fallback 计费细节 - 档位选择:按 `prompt_tokens` 命中 `pricing_tiers.max_prompt_tokens` - 公式: ```text cost = uncached_prompt_tokens * input_cost_per_token + cached_prompt_tokens * cached_token_rate + completion_tokens * output_cost_per_token ``` - `cached_token_rate` 规则: - 若 tier 配置了 `cache_hit_cost_per_token` 且 > 0,使用该值 - 否则回退为 `input_cost_per_token` ### 5.6 内部 costSource 语义 - `provider`: 使用 provider 直接成本 - `catalog_fallback`: 正常使用价格表回退 - `catalog_fallback_incomplete_provider_cost`: provider 返回了部分 direct cost,但不完整,回退价格表 - `incomplete_usage_fallback`: usage 本身不完整,回退价格表 ### 5.7 DeepSeek / DashScope 当前观测到的返回特征 根据当前线上探针与运行结果: - 两家都稳定返回:`input_tokens`、`output_tokens`、`time` - `usage.total_tokens` 顶层可能为空,但 `metadata.total_tokens` 可用 - DeepSeek 常见 `prompt_tokens_details.cached_tokens`、`prompt_cache_hit_tokens`、`prompt_cache_miss_tokens` - DashScope 常见 `completion_tokens_details.reasoning_tokens`(可能为 `null`) - 两家当前都未稳定提供直接 `cost` 字段,因此多数场景为 catalog fallback ## 6) 快照事件 编码器支持以下 AG-UI 类型映射: - `STATE_SNAPSHOT` - `MESSAGES_SNAPSHOT` 当前 `/runs/{thread_id}/events` 主流程通常不主动产出这两类事件;历史查询请使用 `/history`。 --- ## 7) 字段命名约定 - 事件顶层通用字段使用 AG-UI 风格:`type`、`threadId`、`runId` - 部分业务字段沿运行时模型历史命名保留下划线: - `tool_name` - `tool_call_id` - `tool_call_args` - `ui_schema` 这部分命名属于当前后端实现约束,文档与实现保持一致。