From 1c02503d1d0ab5222c576555133316bbf6bacc1b Mon Sep 17 00:00:00 2001 From: qzl Date: Fri, 13 Mar 2026 17:27:18 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=AE=80=E5=8C=96=20AgentScope=20?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E6=97=B6=E6=A8=A1=E5=9D=97=E4=B8=8E=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除冗余的 user_token 参数传递 - 重构 tool.result 事件使用 ToolAgentOutput 模型 - 重构 text.end 事件使用 WorkerAgentOutput 模型 - 简化 store 模块的 tool result 处理逻辑 - 更新 router/service 适配新事件结构 - 清理废弃的测试文件与设计文档 - 新增 AgentRuns 多模态存储设计文档 --- .../src/core/agentscope/events/agui_codec.py | 17 +- backend/src/core/agentscope/events/store.py | 123 +-- .../core/agentscope/prompts/agent_prompt.py | 26 +- .../core/agentscope/runtime/orchestrator.py | 124 ++- backend/src/core/agentscope/runtime/tasks.py | 13 - .../core/agentscope/runtime/ui_compiler.py | 14 +- .../agentscope/tools/tool_result_storage.py | 57 ++ backend/src/schemas/agent/__init__.py | 18 +- backend/src/schemas/agent/agui_input.py | 1 - backend/src/schemas/agent/runtime_models.py | 2 +- backend/src/schemas/messages/__init__.py | 4 +- backend/src/schemas/messages/chat_message.py | 18 +- backend/src/v1/agent/repository.py | 157 +--- backend/src/v1/agent/router.py | 84 +-- backend/src/v1/agent/schemas.py | 6 + backend/src/v1/agent/service.py | 121 ++- .../tests/integration/v1/agent/test_routes.py | 135 ++-- .../v1/agent/test_sse_flow_live.py | 19 +- .../core/agentscope/events/test_agui_codec.py | 40 +- .../unit/core/agentscope/events/test_store.py | 358 +-------- .../runtime/test_agent_route_runtime.py | 608 --------------- .../agentscope/runtime/test_orchestrator.py | 297 +++----- .../messages/test_chat_message_schema.py | 29 + .../tests/unit/v1/agent/test_repository.py | 270 +------ .../tests/unit/v1/agent/test_router_guards.py | 147 +--- backend/tests/unit/v1/agent/test_service.py | 709 ++---------------- ...13-agent-runs-multimodal-implementation.md | 261 +++++++ ...13-agent-runs-multimodal-storage-design.md | 87 +++ .../routes/agent-runs-events-history.md | 239 ++++++ 29 files changed, 1259 insertions(+), 2725 deletions(-) create mode 100644 backend/src/core/agentscope/tools/tool_result_storage.py delete mode 100644 backend/src/schemas/agent/agui_input.py delete mode 100644 backend/tests/unit/core/agentscope/runtime/test_agent_route_runtime.py create mode 100644 backend/tests/unit/schemas/messages/test_chat_message_schema.py create mode 100644 docs/plans/2026-03-13-agent-runs-multimodal-implementation.md create mode 100644 docs/plans/2026-03-13-agent-runs-multimodal-storage-design.md create mode 100644 docs/protocols/routes/agent-runs-events-history.md diff --git a/backend/src/core/agentscope/events/agui_codec.py b/backend/src/core/agentscope/events/agui_codec.py index 3715154..c4a07ee 100644 --- a/backend/src/core/agentscope/events/agui_codec.py +++ b/backend/src/core/agentscope/events/agui_codec.py @@ -38,16 +38,13 @@ def to_agui_wire_event(event: dict[str, Any]) -> dict[str, Any]: data = event.get("data") if isinstance(data, dict): if event_type == "tool.result": - for key in ( - "messageId", - "toolCallId", - "callId", - "toolName", - "stage", - "taskId", - "ui", - "content", - ): + for key in ("messageId", "toolCallId", "toolAgentOutput"): + value = data.get(key) + if value is not None: + payload[key] = value + return payload + if event_type == "text.end": + for key in ("messageId", "workerAgentOutput"): value = data.get(key) if value is not None: payload[key] = value diff --git a/backend/src/core/agentscope/events/store.py b/backend/src/core/agentscope/events/store.py index 2c238d0..53c3fea 100644 --- a/backend/src/core/agentscope/events/store.py +++ b/backend/src/core/agentscope/events/store.py @@ -1,16 +1,19 @@ from __future__ import annotations -import json import re from decimal import Decimal, InvalidOperation from typing import Any, Callable, Protocol from uuid import UUID, uuid4 -from core.agentscope.events.tool_result_summary import build_tool_content_summary from core.agentscope.events.persistence import MessageRepository, SessionRepository from core.logging import get_logger from models.agent_chat_message import AgentChatMessageRole from models.agent_chat_session import AgentChatSessionStatus +from schemas.agent.runtime_models import ( + ToolAgentOutput, + WorkerAgentOutputLite, + WorkerAgentOutputRich, +) class EventStore(Protocol): @@ -193,6 +196,19 @@ class SqlAlchemyEventStore: if isinstance(stage, str) and stage: metadata["stage"] = stage + worker_payload = event.get("workerAgentOutput") + if isinstance(worker_payload, dict): + try: + if "ui_hints" in worker_payload: + worker_output = WorkerAgentOutputRich.model_validate(worker_payload) + else: + worker_output = WorkerAgentOutputLite.model_validate(worker_payload) + except Exception: + worker_output = None + else: + content = worker_output.answer + metadata["worker_agent_output"] = worker_output.model_dump(mode="json") + role_value = context.get("role") if not isinstance(role_value, str): role_value = "assistant" @@ -252,6 +268,14 @@ class SqlAlchemyEventStore: if not isinstance(tool_name, str) or not tool_name: return + raw_output = event.get("toolAgentOutput") + if not isinstance(raw_output, dict): + return + try: + tool_output = ToolAgentOutput.model_validate(raw_output) + except Exception: + return + run_id = event.get("runId") run_id_value = run_id if isinstance(run_id, str) and run_id else "" task_id = event.get("taskId") @@ -264,43 +288,18 @@ class SqlAlchemyEventStore: else f"{task_id_value}-{uuid4().hex[:8]}" ) - summary = build_tool_content_summary( - tool_name=tool_name, - args=event.get("args") if isinstance(event.get("args"), dict) else None, - result=event.get("result"), - error=event.get("error"), - ) - - raw_result_value = event.get("result") - raw_result: dict[str, object] = ( - raw_result_value if isinstance(raw_result_value, dict) else {} - ) - ui_candidate = raw_result.get("ui") - ui_schema = ui_candidate if isinstance(ui_candidate, dict) else None - result_type = raw_result.get("type") - result_data = raw_result.get("data") - if ( - ui_schema is None - and isinstance(result_type, str) - and isinstance(result_data, dict) - ): - ui_schema = raw_result - payload: dict[str, object] = { - "toolName": tool_name, - "ui_schema": ui_schema, - "result": _sanitize_result(raw_result), - "error": _sanitize_error(event.get("error")), + "toolAgentOutput": tool_output.model_dump(mode="json"), "callId": call_id_value, "runId": run_id_value, "taskId": task_id_value, - "content": summary, + "content": tool_output.result_summary, } metadata: dict[str, object] = { "tool_name": tool_name, "tool_call_id": call_id_value, - "summary_version": "v1", + "tool_agent_output": tool_output.model_dump(mode="json"), } if run_id_value: metadata["run_id"] = run_id_value @@ -332,9 +331,7 @@ class SqlAlchemyEventStore: storage_path=storage_path, ) - content = summary or json.dumps( - payload, ensure_ascii=False, separators=(",", ":") - ) + content = tool_output.result_summary locked_session = await session_repo.lock_session_for_update( session_id=session_id @@ -429,63 +426,3 @@ def _sanitize_path_component(value: str) -> str: compact = re.sub(r"[^A-Za-z0-9._-]", "-", value.strip()) compact = compact.strip(".-") return compact or "id" - - -def _sanitize_error(value: object) -> str | None: - if isinstance(value, str) and value.strip(): - return " ".join(value.split())[:300] - if isinstance(value, dict): - for key in ("message", "error", "detail"): - item = value.get(key) - if isinstance(item, str) and item.strip(): - return " ".join(item.split())[:300] - return None - - -def _sanitize_result(value: object) -> dict[str, object]: - if not isinstance(value, dict): - return {} - - def _is_sensitive_key(key: str) -> bool: - normalized = key.strip().lower().replace("-", "_") - if not normalized: - return False - exact = { - "password", - "token", - "secret", - "api_key", - "apikey", - "credential", - "authorization", - "auth", - } - if normalized in exact: - return True - patterns = ( - "password", - "token", - "secret", - "auth", - "credential", - "api_key", - "apikey", - "authorization", - ) - return any(pattern in normalized for pattern in patterns) - - def _sanitize_value(item: object) -> object: - if isinstance(item, dict): - return _sanitize_result(item) - if isinstance(item, list): - return [_sanitize_value(entry) for entry in item] - return item - - sanitized: dict[str, object] = {} - for key, item in value.items(): - key_text = str(key) - if _is_sensitive_key(key_text): - sanitized[str(key)] = "[REDACTED]" - continue - sanitized[str(key)] = _sanitize_value(item) - return sanitized diff --git a/backend/src/core/agentscope/prompts/agent_prompt.py b/backend/src/core/agentscope/prompts/agent_prompt.py index 77d7d46..425e5dc 100644 --- a/backend/src/core/agentscope/prompts/agent_prompt.py +++ b/backend/src/core/agentscope/prompts/agent_prompt.py @@ -78,21 +78,19 @@ def build_intent_user_prompt( *, user_input: str | list[dict[str, Any]] ) -> str | list[dict[str, Any]]: if isinstance(user_input, list): + instruction_block = { + "type": "text", + "text": "\n\n".join( + [ + ROUTER_STAGE_INSTRUCTION, + "[Output Schema]", + _schema_json(RouterAgentOutput), + ] + ), + } return [ - { - "type": "text", - "text": "\n\n".join( - [ - ROUTER_STAGE_INSTRUCTION, - "[Output Schema]", - _schema_json(RouterAgentOutput), - "[User Input]", - json.dumps( - user_input, ensure_ascii=True, separators=(",", ":") - ), - ] - ), - } + instruction_block, + *user_input, ] return "\n\n".join( [ diff --git a/backend/src/core/agentscope/runtime/orchestrator.py b/backend/src/core/agentscope/runtime/orchestrator.py index 5f951d8..e30ae4f 100644 --- a/backend/src/core/agentscope/runtime/orchestrator.py +++ b/backend/src/core/agentscope/runtime/orchestrator.py @@ -50,11 +50,9 @@ class AgentScopeRuntimeOrchestrator: *, command: RunAgentInput, owner_id: UUID, - user_token: str, user_context: UserContext, session: AsyncSession, ) -> dict[str, Any]: - del user_token return await self._execute( command=command, owner_id=owner_id, @@ -68,11 +66,9 @@ class AgentScopeRuntimeOrchestrator: *, command: RunAgentInput, owner_id: UUID, - user_token: str, user_context: UserContext, session: AsyncSession, ) -> dict[str, Any]: - del user_token return await self._execute( command=command, owner_id=owner_id, @@ -116,7 +112,7 @@ class AgentScopeRuntimeOrchestrator: user_input = _to_resume_user_input(command) else: _, content_blocks = extract_latest_user_payload(command) - user_input = _to_user_input_payload(content_blocks) + user_input = _to_model_user_input(content_blocks) router_toolkit = build_stage_toolkit( stage="intent", session=session, @@ -159,16 +155,38 @@ class AgentScopeRuntimeOrchestrator: worker_payload = result.get("worker") if isinstance(result, dict) else None worker = worker_payload if isinstance(worker_payload, dict) else {} - response_metadata = worker.get("response_metadata") - metadata = response_metadata if isinstance(response_metadata, dict) else {} assistant_text = _resolve_worker_answer(worker) + tool_outputs_raw = worker.get("tool_outputs") + if isinstance(tool_outputs_raw, list): + for idx, item in enumerate(tool_outputs_raw, start=1): + if not isinstance(item, dict): + continue + tool_name = item.get("tool_name") + tool_call_id = item.get("tool_call_id") + if not isinstance(tool_name, str) or not tool_name: + continue + if not isinstance(tool_call_id, str) or not tool_call_id: + tool_call_id = f"{run_id}-tool-{idx}" + await self._pipeline.emit( + session_id=thread_id, + event={ + "type": "tool.result", + "threadId": thread_id, + "runId": run_id, + "data": { + "messageId": f"tool-{tool_call_id}", + "toolCallId": tool_call_id, + "toolAgentOutput": item, + }, + }, + ) await self._emit_stage_text( thread_id=thread_id, run_id=run_id, stage_name="worker", message_id=f"assistant-{run_id}", text=assistant_text, - response_metadata=metadata, + worker_agent_output=worker, ) await self._pipeline.emit( @@ -215,7 +233,7 @@ class AgentScopeRuntimeOrchestrator: stage_name: str, message_id: str, text: str, - response_metadata: dict[str, Any], + worker_agent_output: dict[str, Any], ) -> None: await self._pipeline.emit( session_id=thread_id, @@ -250,8 +268,7 @@ class AgentScopeRuntimeOrchestrator: "runId": run_id, "data": { "messageId": message_id, - "stage": stage_name, - **_text_end_telemetry_payload(response_metadata), + "workerAgentOutput": worker_agent_output, }, }, ) @@ -271,6 +288,28 @@ def _to_user_input_payload( return content_blocks +def _to_model_user_input( + content_blocks: list[dict[str, Any]], +) -> str | list[dict[str, Any]]: + normalized: list[dict[str, Any]] = [] + for block in content_blocks: + if not isinstance(block, dict): + continue + block_type = block.get("type") + if block_type == "text": + text = block.get("text") + if isinstance(text, str) and text.strip(): + normalized.append({"type": "text", "text": text}) + continue + if block_type != "binary": + continue + url = block.get("url") + if isinstance(url, str) and url: + normalized.append({"type": "image_url", "image_url": {"url": url}}) + + return _to_user_input_payload(normalized) + + def _to_resume_user_input(command: RunAgentInput) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] for message in command.messages: @@ -296,66 +335,3 @@ def _resolve_worker_answer(worker: dict[str, Any]) -> str: return message return "抱歉,这次没有产出可用结果,请重试。" - - -def _text_end_telemetry_payload(metadata: dict[str, Any]) -> dict[str, Any]: - payload: dict[str, Any] = {} - model = _first_non_empty_str(metadata, keys=("model", "model_code")) - if model is not None: - payload["model"] = model - - input_tokens = _first_number(metadata, keys=("inputTokens", "input_tokens")) - if input_tokens is not None: - payload["inputTokens"] = input_tokens - - output_tokens = _first_number(metadata, keys=("outputTokens", "output_tokens")) - if output_tokens is not None: - payload["outputTokens"] = output_tokens - - latency_ms = _first_number(metadata, keys=("latencyMs", "latency_ms")) - if latency_ms is not None: - payload["latencyMs"] = latency_ms - - cost = _first_number(metadata, keys=("cost", "total_cost"), allow_float=True) - if cost is not None: - payload["cost"] = cost - - return payload - - -def _first_non_empty_str( - metadata: dict[str, Any], *, keys: tuple[str, ...] -) -> str | None: - for key in keys: - value = metadata.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() - return None - - -def _first_number( - metadata: dict[str, Any], - *, - keys: tuple[str, ...], - allow_float: bool = False, -) -> int | float | None: - for key in keys: - value = metadata.get(key) - if isinstance(value, bool): - continue - if isinstance(value, int): - if value < 0: - continue - return value - if isinstance(value, float): - if value < 0: - continue - return value if allow_float else int(value) - if isinstance(value, str): - try: - parsed = float(value) if allow_float else int(value) - except ValueError: - continue - if parsed >= 0: - return parsed - return None diff --git a/backend/src/core/agentscope/runtime/tasks.py b/backend/src/core/agentscope/runtime/tasks.py index 4c3622a..3baab8f 100644 --- a/backend/src/core/agentscope/runtime/tasks.py +++ b/backend/src/core/agentscope/runtime/tasks.py @@ -68,16 +68,6 @@ def _build_user_context(*, owner_id: UUID, run_input: RunAgentInput) -> UserCont ) -def _extract_user_token( - *, command: dict[str, Any], run_input: RunAgentInput -) -> str | None: - del run_input - raw_token = command.get("user_token") - if isinstance(raw_token, str) and raw_token.strip(): - return raw_token.strip() - return None - - async def _build_recent_context_messages( *, session: Any, @@ -147,7 +137,6 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: if command_type == "resume": extract_latest_tool_result(parsed_run_input) user_context = _build_user_context(owner_id=owner_id, run_input=parsed_run_input) - user_token = _extract_user_token(command=command, run_input=parsed_run_input) or "" redis_client = await get_or_init_redis_client() bus = RedisStreamBus( @@ -189,7 +178,6 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: await runtime.resume( command=parsed_run_input, owner_id=owner_id, - user_token=user_token, user_context=user_context, session=session, ) @@ -197,7 +185,6 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: await runtime.run( command=parsed_run_input, owner_id=owner_id, - user_token=user_token, user_context=user_context, session=session, ) diff --git a/backend/src/core/agentscope/runtime/ui_compiler.py b/backend/src/core/agentscope/runtime/ui_compiler.py index 7859c27..7fd0274 100644 --- a/backend/src/core/agentscope/runtime/ui_compiler.py +++ b/backend/src/core/agentscope/runtime/ui_compiler.py @@ -167,16 +167,18 @@ class UiCompiler: timestamp: str | None = None, meta: dict[str, Any] | None = None, ) -> dict[str, Any]: - hints = output.ui_hints or self._build_default_worker_hints(output) - if output.error is not None and not self._contains_error_block(hints.blocks): + output_ui_hints = getattr(output, "ui_hints", None) + hints = output_ui_hints or self._build_default_worker_hints(output) + output_error = getattr(output, "error", None) + if output_error is not None and not self._contains_error_block(hints.blocks): hints = self._append_error_block( hints, UiHintErrorBlock( kind="error", - errorCode=output.error.code, - message=output.error.message, - retryable=output.error.retryable, - details=self._stringify_details(output.error.details), + errorCode=output_error.code, + message=output_error.message, + retryable=output_error.retryable, + details=self._stringify_details(output_error.details), ), ) diff --git a/backend/src/core/agentscope/tools/tool_result_storage.py b/backend/src/core/agentscope/tools/tool_result_storage.py new file mode 100644 index 0000000..a743dfc --- /dev/null +++ b/backend/src/core/agentscope/tools/tool_result_storage.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import json +from typing import Protocol + +from services.base.supabase import supabase_service + + +class ToolResultStorage(Protocol): + async def upload_json( + self, + *, + bucket: str, + path: str, + payload: dict[str, object], + ) -> str: ... + + async def read_json( + self, + *, + bucket: str, + path: str, + ) -> dict[str, object] | None: ... + + +class SupabaseToolResultStorage: + async def upload_json( + self, + *, + bucket: str, + path: str, + payload: dict[str, object], + ) -> str: + serialized = json.dumps(payload, ensure_ascii=True, separators=(",", ":")) + await supabase_service.upload_bytes( + bucket=bucket, + path=path, + content=serialized.encode("utf-8"), + content_type="application/json", + ) + return path + + async def read_json( + self, + *, + bucket: str, + path: str, + ) -> dict[str, object] | None: + raw = await supabase_service.download_bytes(bucket=bucket, path=path) + decoded = json.loads(raw.decode("utf-8")) + if isinstance(decoded, dict): + return decoded + return None + + +def create_tool_result_storage() -> ToolResultStorage: + return SupabaseToolResultStorage() diff --git a/backend/src/schemas/agent/__init__.py b/backend/src/schemas/agent/__init__.py index 6bb7d5d..485d6d0 100644 --- a/backend/src/schemas/agent/__init__.py +++ b/backend/src/schemas/agent/__init__.py @@ -1,11 +1,3 @@ -from core.agentscope.schemas.agui_input import ( - extract_latest_tool_result, - extract_latest_user_content, - extract_latest_user_payload, - extract_latest_user_text, - parse_run_input, - validate_run_request_messages_contract, -) from schemas.agent.runtime_models import ( ResultType, RouterAgentOutput, @@ -14,17 +6,17 @@ from schemas.agent.runtime_models import ( ToolAgentOutput, ToolStatus, UiMode, + WorkerAgentOutput, WorkerAgentOutputLite, WorkerAgentOutputRich, - WorkerAgentOutput, resolve_worker_output_model, ) +from schemas.agent.system_agent import AgentType, SystemAgentLLMConfig from schemas.agent.ui_hints import ( UiHintAction, UiHintBlock, UiHintsPayload, ) -from schemas.agent.system_agent import AgentType, SystemAgentLLMConfig __all__ = [ "AgentType", @@ -43,10 +35,4 @@ __all__ = [ "WorkerAgentOutputRich", "WorkerAgentOutput", "resolve_worker_output_model", - "extract_latest_tool_result", - "extract_latest_user_content", - "extract_latest_user_payload", - "extract_latest_user_text", - "parse_run_input", - "validate_run_request_messages_contract", ] diff --git a/backend/src/schemas/agent/agui_input.py b/backend/src/schemas/agent/agui_input.py deleted file mode 100644 index 5ce6bd7..0000000 --- a/backend/src/schemas/agent/agui_input.py +++ /dev/null @@ -1 +0,0 @@ -from core.agentscope.schemas.agui_input import * # noqa: F403 diff --git a/backend/src/schemas/agent/runtime_models.py b/backend/src/schemas/agent/runtime_models.py index b3abf84..b130f7a 100644 --- a/backend/src/schemas/agent/runtime_models.py +++ b/backend/src/schemas/agent/runtime_models.py @@ -374,7 +374,7 @@ class WorkerAgentOutputRich(WorkerAgentOutputLite): ) -WorkerAgentOutput = WorkerAgentOutputRich +WorkerAgentOutput = WorkerAgentOutputLite | WorkerAgentOutputRich def resolve_worker_output_model(ui_mode: UiMode) -> type[WorkerAgentOutputLite]: diff --git a/backend/src/schemas/messages/__init__.py b/backend/src/schemas/messages/__init__.py index 95a0c8d..2cf9e80 100644 --- a/backend/src/schemas/messages/__init__.py +++ b/backend/src/schemas/messages/__init__.py @@ -1,3 +1,3 @@ -from schemas.messages.chat_message import AgentChatMessageMetadata +from schemas.messages.chat_message import AgentChatMessage, AgentChatMessageMetadata -__all__ = ["AgentChatMessageMetadata"] +__all__ = ["AgentChatMessage", "AgentChatMessageMetadata"] diff --git a/backend/src/schemas/messages/chat_message.py b/backend/src/schemas/messages/chat_message.py index e9f7213..84b4e4b 100644 --- a/backend/src/schemas/messages/chat_message.py +++ b/backend/src/schemas/messages/chat_message.py @@ -1,8 +1,11 @@ from __future__ import annotations +from datetime import datetime +from decimal import Decimal from typing import ClassVar +from uuid import UUID -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field from ..agent import AgentType, ToolAgentOutput, WorkerAgentOutput @@ -22,3 +25,16 @@ class AgentChatMessageMetadata(BaseModel): user_message_attachments: UserMessageAttachments | None = None tool_agent_output: ToolAgentOutput | None = None worker_agent_output: WorkerAgentOutput | None = None + + +class AgentChatMessage(BaseModel): + """Canonical schema aligned with `messages` table columns.""" + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + id: UUID + seq: int + role: str + content: str + metadata: AgentChatMessageMetadata | dict[str, object] | None = None + timestamp: datetime diff --git a/backend/src/v1/agent/repository.py b/backend/src/v1/agent/repository.py index 46f5aff..647ed44 100644 --- a/backend/src/v1/agent/repository.py +++ b/backend/src/v1/agent/repository.py @@ -1,7 +1,6 @@ from __future__ import annotations from datetime import date, datetime, time, timedelta, timezone -import json from typing import Protocol from uuid import UUID @@ -9,10 +8,9 @@ from fastapi import HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from core.config.settings import config from models.agent_chat_message import AgentChatMessage, AgentChatMessageRole from models.agent_chat_session import AgentChatSession -from services.base.supabase import supabase_service +from schemas.messages.chat_message import AgentChatMessage as AgentChatMessageSchema class ToolResultPayloadStorage(Protocol): @@ -210,132 +208,17 @@ class AgentRepository: if isinstance(message.role, AgentChatMessageRole) else str(message.role) ) - payload: dict[str, object] = { - "id": str(message.id), - "role": role, - "timestamp": message.created_at.astimezone(timezone.utc).isoformat(), - } - - if role == AgentChatMessageRole.TOOL.value: - metadata = message.metadata_json or {} - tool_call_id = metadata.get("tool_call_id") - if isinstance(tool_call_id, str) and tool_call_id: - payload["toolCallId"] = tool_call_id - - parsed_content: dict[str, object] | None = None - try: - decoded = json.loads(message.content) - if isinstance(decoded, dict): - parsed_content = decoded - except (TypeError, ValueError): - parsed_content = None - - hydrated_content: dict[str, object] | None = None - if self._tool_result_storage is not None: - storage_bucket = metadata.get("storage_bucket") - storage_path = metadata.get("storage_path") - if isinstance(storage_bucket, str) and isinstance(storage_path, str): - expected_bucket = config.storage.bucket - message_session_id = getattr(message, "session_id", None) - expected_prefix = ( - f"tool-results/{message_session_id}/" - if message_session_id is not None - else None - ) - tool_call_id = metadata.get("tool_call_id") - is_legacy_path = isinstance( - tool_call_id, str - ) and storage_path.endswith(f"/{tool_call_id}.json") - if ( - storage_bucket == expected_bucket - and _is_safe_storage_path(storage_path) - and ( - ( - expected_prefix is not None - and storage_path.startswith(expected_prefix) - ) - or ( - storage_path.startswith("tool-results/") - and is_legacy_path - ) - ) - ): - try: - hydrated_content = ( - await self._tool_result_storage.read_json( - bucket=storage_bucket, - path=storage_path, - ) - ) - except Exception: - hydrated_content = None - - resolved_content = hydrated_content or parsed_content - payload["content"] = message.content - if resolved_content is not None: - ui = resolved_content.get("ui") - if not isinstance(ui, dict): - ui = resolved_content.get("ui_schema") - if isinstance(ui, dict): - payload["ui"] = ui - display_content = resolved_content.get("content") - if not isinstance(display_content, str): - nested_result = resolved_content.get("result") - if isinstance(nested_result, dict): - nested_content = nested_result.get("content") - if isinstance(nested_content, str): - display_content = nested_content - if ( - isinstance(display_content, str) - and display_content.strip() - and ( - not payload["content"] - or _looks_like_offloaded_placeholder(str(payload["content"])) - ) - ): - payload["content"] = display_content - else: - payload["content"] = message.content - - if role == AgentChatMessageRole.USER.value: - metadata = message.metadata_json or {} - user_attachments = metadata.get("user_message_attachments") - if isinstance(user_attachments, dict): - bucket = user_attachments.get("bucket") - path = user_attachments.get("path") - mime_type = user_attachments.get("mime_type") - if ( - isinstance(bucket, str) - and isinstance(path, str) - and isinstance(mime_type, str) - ): - try: - signed_url = await supabase_service.create_signed_url( - bucket=bucket, - path=path, - expires_in_seconds=3600, - ) - attachment_block = { - "type": "binary", - "mimeType": mime_type, - "url": signed_url, - } - existing_content = message.content - if ( - isinstance(existing_content, str) - and existing_content.strip() - ): - content_blocks = [ - {"type": "text", "text": existing_content} - ] - content_blocks.append(attachment_block) - payload["content"] = content_blocks - else: - payload["content"] = [attachment_block] - except Exception: # noqa: BLE001 - pass - - return payload + payload_model = AgentChatMessageSchema.model_validate( + { + "id": str(message.id), + "seq": int(message.seq), + "role": role, + "content": message.content, + "metadata": message.metadata_json, + "timestamp": message.created_at.astimezone(timezone.utc).isoformat(), + } + ) + return payload_model.model_dump(mode="json", exclude_none=True) def _has_title(title: object) -> bool: @@ -347,19 +230,3 @@ def _derive_session_title(content_text: str) -> str | None: if not normalized: return None return normalized[:80] - - -def _is_safe_storage_path(path: str) -> bool: - normalized = path.strip() - if not normalized: - return False - if normalized.startswith("/"): - return False - if ".." in normalized: - return False - return True - - -def _looks_like_offloaded_placeholder(content: str) -> bool: - normalized = content.strip().lower() - return normalized in {'{"offloaded":true}', '{"offloaded": true}'} diff --git a/backend/src/v1/agent/router.py b/backend/src/v1/agent/router.py index eb05062..f27baa8 100644 --- a/backend/src/v1/agent/router.py +++ b/backend/src/v1/agent/router.py @@ -11,9 +11,7 @@ from typing import Annotated, Union from ag_ui.core import RunAgentInput from core.agentscope.events import to_sse_event -from core.auth.jwt_verifier import JwtVerifier, TokenValidationError from core.auth.models import CurrentUser -from core.config.settings import config from core.logging import get_logger from fastapi import ( APIRouter, @@ -38,6 +36,7 @@ from v1.agent.dependencies import get_agent_service from v1.agent.schemas import ( AsrTranscribeResponse, AttachmentReference, + AttachmentSignedUrlResponse, AttachmentUploadResponse, TaskAcceptedResponse, ) @@ -63,42 +62,6 @@ _ALLOWED_AUDIO_CONTENT_TYPES = { } -def _verified_access_token_for_user( - *, - authorization: str | None, - current_user: CurrentUser, -) -> str: - if not isinstance(authorization, str): - raise HTTPException(status_code=401, detail="Unauthorized") - normalized = authorization.strip() - if not normalized: - raise HTTPException(status_code=401, detail="Unauthorized") - if not normalized.lower().startswith("bearer "): - raise HTTPException(status_code=401, detail="Unauthorized") - token = normalized[7:].strip() - if not token: - raise HTTPException(status_code=401, detail="Unauthorized") - - jwt_secret = config.supabase.jwt_secret - if jwt_secret is None: - raise HTTPException(status_code=503, detail="Auth verifier unavailable") - - verifier = JwtVerifier( - issuer=str(config.supabase.jwt_issuer), - jwt_secret=jwt_secret.get_secret_value(), - jwt_algorithm=config.supabase.jwt_algorithm, - ) - try: - payload = verifier.verify(token) - except TokenValidationError as exc: - raise HTTPException(status_code=401, detail="Unauthorized") from exc - - subject = payload.get("sub") - if not isinstance(subject, str) or subject != str(current_user.id): - raise HTTPException(status_code=403, detail="Forbidden") - return token - - def _looks_like_wav_header(header: bytes) -> bool: if len(header) < _WAV_HEADER_MIN_BYTES: return False @@ -164,7 +127,6 @@ async def enqueue_run( request: RunAgentInput, service: Annotated[AgentService, Depends(get_agent_service)], current_user: Annotated[CurrentUser, Depends(get_current_user)], - authorization: str | None = Header(default=None, alias="Authorization"), ) -> TaskAcceptedResponse: try: normalized = parse_run_input(request.model_dump(mode="json", by_alias=True)) @@ -174,15 +136,10 @@ async def enqueue_run( allowed = await _allow_run_request(user_id=str(current_user.id)) if not allowed: raise HTTPException(status_code=429, detail="Too many run requests") - user_token = _verified_access_token_for_user( - authorization=authorization, - current_user=current_user, - ) task = await service.enqueue_run( run_input=request, current_user=current_user, - user_token=user_token, ) return TaskAcceptedResponse( taskId=task.task_id, @@ -202,7 +159,6 @@ async def enqueue_resume( request: RunAgentInput, service: Annotated[AgentService, Depends(get_agent_service)], current_user: Annotated[CurrentUser, Depends(get_current_user)], - authorization: str | None = Header(default=None, alias="Authorization"), ) -> TaskAcceptedResponse: if request.thread_id != thread_id: raise HTTPException(status_code=422, detail="thread_id path/body mismatch") @@ -214,15 +170,10 @@ async def enqueue_resume( allowed = await _allow_run_request(user_id=str(current_user.id)) if not allowed: raise HTTPException(status_code=429, detail="Too many run requests") - user_token = _verified_access_token_for_user( - authorization=authorization, - current_user=current_user, - ) task = await service.enqueue_resume( thread_id=thread_id, run_input=request, current_user=current_user, - user_token=user_token, ) return TaskAcceptedResponse( taskId=task.task_id, @@ -304,20 +255,6 @@ async def stream_events( ) -@router.get("/runs/{thread_id}/history") -async def get_history_snapshot( - thread_id: str, - service: Annotated[AgentService, Depends(get_agent_service)], - current_user: Annotated[CurrentUser, Depends(get_current_user)], - before: date | None = Query(default=None), -) -> dict[str, object]: - return await service.get_history_snapshot( - thread_id=thread_id, - before=before, - current_user=current_user, - ) - - @router.get("/history") async def get_user_history_snapshot( service: Annotated[AgentService, Depends(get_agent_service)], @@ -360,6 +297,25 @@ async def upload_attachment( ) +@router.get( + "/attachments/signed-url", + response_model=AttachmentSignedUrlResponse, + status_code=status.HTTP_200_OK, +) +async def create_attachment_signed_url( + service: Annotated[AgentService, Depends(get_agent_service)], + current_user: Annotated[CurrentUser, Depends(get_current_user)], + bucket: str = Query(min_length=1, max_length=100), + path: str = Query(min_length=1, max_length=500), +) -> AttachmentSignedUrlResponse: + signed = await service.create_attachment_signed_url( + bucket=bucket, + path=path, + current_user=current_user, + ) + return AttachmentSignedUrlResponse(**signed) + + @router.post( "/transcribe", response_model=AsrTranscribeResponse, diff --git a/backend/src/v1/agent/schemas.py b/backend/src/v1/agent/schemas.py index 2d8670c..e634364 100644 --- a/backend/src/v1/agent/schemas.py +++ b/backend/src/v1/agent/schemas.py @@ -27,3 +27,9 @@ class AttachmentReference(BaseModel): class AttachmentUploadResponse(BaseModel): attachment: AttachmentReference + + +class AttachmentSignedUrlResponse(BaseModel): + bucket: str + path: str + url: str diff --git a/backend/src/v1/agent/service.py b/backend/src/v1/agent/service.py index 0e178b2..6b945f8 100644 --- a/backend/src/v1/agent/service.py +++ b/backend/src/v1/agent/service.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from datetime import date import hashlib from typing import Any, Protocol +from urllib.parse import urlparse import dashscope from ag_ui.core import RunAgentInput, StateSnapshotEvent @@ -23,19 +24,6 @@ _MAX_ATTACHMENT_BYTES = 5 * 1024 * 1024 _MAX_TOTAL_ATTACHMENT_BYTES = 12 * 1024 * 1024 -def _normalize_bearer_token(value: str | None) -> str | None: - if not isinstance(value, str): - return None - normalized = value.strip() - if not normalized: - return None - lower = normalized.lower() - if lower.startswith("bearer "): - token = normalized[7:].strip() - return token or None - return normalized - - @dataclass(frozen=True) class TaskAccepted: task_id: str @@ -70,14 +58,6 @@ class AgentRepositoryLike(Protocol): metadata: dict[str, object] | None, ) -> None: ... - async def get_message_attachment_reference( - self, - *, - session_id: str, - message_id: str, - attachment_index: int, - ) -> dict[str, str] | None: ... - class QueueClientLike(Protocol): async def enqueue( @@ -148,7 +128,6 @@ class AgentService: *, run_input: RunAgentInput, current_user: CurrentUser, - user_token: str | None = None, ) -> TaskAccepted: created = False thread_id = run_input.thread_id @@ -188,7 +167,6 @@ class AgentService: command={ "command": "run", "owner_id": str(current_user.id), - "user_token": _normalize_bearer_token(user_token), "run_input": run_input.model_dump(mode="json", by_alias=True), }, dedup_key=None, @@ -226,19 +204,28 @@ class AgentService: mime_type = "application/octet-stream" if self._attachment_storage is None: - continue + raise HTTPException( + status_code=503, + detail="Attachment storage unavailable", + ) try: - bucket, path = self._attachment_storage.parse_signed_url(url) + bucket, path = self._validate_binary_signed_url( + url=url, + thread_id=run_input.thread_id, + current_user=current_user, + ) user_attachments = UserMessageAttachments( bucket=bucket, path=path, mime_type=mime_type, ) break - except Exception: # noqa: BLE001 - logger.warning("Failed to parse signed URL", url=url) - continue + except HTTPException: + raise + except Exception as exc: # noqa: BLE001 + logger.warning("Failed to parse signed URL", url=url, error=str(exc)) + raise HTTPException(status_code=422, detail="Invalid signed image url") metadata: dict[str, object] | None = None if user_attachments is not None: @@ -329,13 +316,57 @@ class AgentService: "url": signed_url, } + async def create_attachment_signed_url( + self, + *, + bucket: str, + path: str, + current_user: CurrentUser, + ) -> dict[str, str]: + if self._attachment_storage is None: + raise HTTPException( + status_code=503, detail="Attachment storage unavailable" + ) + normalized_bucket = bucket.strip() + if normalized_bucket != config.storage.bucket: + raise HTTPException(status_code=422, detail="Invalid attachment bucket") + + normalized_path = path.strip() + expected_prefix = f"agent-inputs/{current_user.id}/" + if not _is_safe_attachment_path( + normalized_path, expected_prefix=expected_prefix + ): + raise HTTPException(status_code=422, detail="Invalid attachment path scope") + + try: + signed_url = await self._attachment_storage.create_signed_url( + bucket=normalized_bucket, + path=normalized_path, + expires_in_seconds=self._SIGNED_URL_EXPIRES_IN_SECONDS, + ) + except Exception: # noqa: BLE001 + logger.exception( + "Attachment signed URL generation failed", + extra={ + "bucket": normalized_bucket, + "path": normalized_path, + "user_id": str(current_user.id), + }, + ) + raise HTTPException(status_code=502, detail="Failed to generate signed URL") + + return { + "bucket": normalized_bucket, + "path": normalized_path, + "url": signed_url, + } + async def enqueue_resume( self, *, thread_id: str, run_input: RunAgentInput, current_user: CurrentUser, - user_token: str | None = None, ) -> TaskAccepted: owner = await self._repository.get_session_owner(session_id=thread_id) ensure_session_owner(owner_id=owner, current_user=current_user) @@ -345,7 +376,6 @@ class AgentService: command={ "command": "resume", "owner_id": str(current_user.id), - "user_token": _normalize_bearer_token(user_token), "run_input": run_input.model_dump(mode="json", by_alias=True), }, dedup_key=dedup_key, @@ -428,6 +458,37 @@ class AgentService: current_user=current_user, ) + def _validate_binary_signed_url( + self, + *, + url: str, + thread_id: str, + current_user: CurrentUser, + ) -> tuple[str, str]: + if self._attachment_storage is None: + raise HTTPException( + status_code=503, detail="Attachment storage unavailable" + ) + parsed = urlparse(url) + expected_host = urlparse(config.supabase.url).netloc + if parsed.netloc != expected_host: + raise HTTPException(status_code=422, detail="INVALID_BINARY_URL_HOST") + + try: + bucket, path = self._attachment_storage.parse_signed_url(url) + except Exception as exc: # noqa: BLE001 + raise HTTPException( + status_code=422, detail="Invalid signed image url" + ) from exc + + if bucket != config.storage.bucket: + raise HTTPException(status_code=422, detail="INVALID_BINARY_URL_BUCKET") + + expected_prefix = f"agent-inputs/{current_user.id}/{thread_id}/uploads/" + if not _is_safe_attachment_path(path, expected_prefix=expected_prefix): + raise HTTPException(status_code=422, detail="INVALID_BINARY_URL_PATH_SCOPE") + return bucket, path + class AsrService: def __init__(self) -> None: diff --git a/backend/tests/integration/v1/agent/test_routes.py b/backend/tests/integration/v1/agent/test_routes.py index 1392b52..2d1fd32 100644 --- a/backend/tests/integration/v1/agent/test_routes.py +++ b/backend/tests/integration/v1/agent/test_routes.py @@ -5,7 +5,6 @@ from types import SimpleNamespace from uuid import uuid4 from ag_ui.core import RunAgentInput -from fastapi import HTTPException from fastapi.testclient import TestClient from app import app @@ -24,9 +23,8 @@ class _FakeAgentService: *, run_input: RunAgentInput, current_user: CurrentUser, - user_token: str | None = None, ): - del current_user, user_token + del current_user return SimpleNamespace( task_id="task-run-1", thread_id=run_input.thread_id, @@ -40,9 +38,8 @@ class _FakeAgentService: thread_id: str, run_input: RunAgentInput, current_user: CurrentUser, - user_token: str | None = None, ): - del thread_id, current_user, user_token + del thread_id, current_user return SimpleNamespace( task_id="task-resume-1", thread_id=run_input.thread_id, @@ -73,31 +70,6 @@ class _FakeAgentService: } ] - async def get_history_snapshot( - self, - *, - thread_id: str, - before: str | None, - current_user: CurrentUser, - ) -> dict[str, object]: - del current_user - return { - "type": "STATE_SNAPSHOT", - "threadId": thread_id, - "snapshot": { - "scope": "history_day", - "day": before or "2026-03-07", - "hasMore": False, - "messages": [ - { - "id": "msg-h1", - "role": "assistant", - "content": "history-message", - } - ], - }, - } - async def get_user_history_snapshot( self, *, @@ -134,6 +106,20 @@ class _FakeAgentService: "url": "https://signed.example/upload.png", } + async def create_attachment_signed_url( + self, + *, + bucket: str, + path: str, + current_user: CurrentUser, + ) -> dict[str, str]: + del current_user + return { + "bucket": bucket, + "path": path, + "url": "https://signed.example/temp-url.png", + } + class _FailingStreamAgentService(_FakeAgentService): async def stream_events( @@ -151,7 +137,6 @@ def test_run_requires_auth_and_returns_202_task_id() -> None: app.dependency_overrides[get_agent_service] = lambda: _FakeAgentService() client = TestClient(app) original_allow_run = agent_router._allow_run_request - original_verify_token = agent_router._verified_access_token_for_user async def _allow_run(*, user_id: str) -> bool: del user_id @@ -159,13 +144,6 @@ def test_run_requires_auth_and_returns_202_task_id() -> None: agent_router._allow_run_request = _allow_run # type: ignore[assignment] - def _verify_token(**kwargs: object) -> str: - if kwargs.get("authorization"): - return "token-ok" - raise HTTPException(status_code=401, detail="Unauthorized") - - agent_router._verified_access_token_for_user = _verify_token # type: ignore[assignment] - try: unauthorized = client.post( "/api/v1/agent/runs", @@ -186,7 +164,6 @@ def test_run_requires_auth_and_returns_202_task_id() -> None: ) authorized = client.post( "/api/v1/agent/runs", - headers={"Authorization": "Bearer token-ok"}, json={ "threadId": "00000000-0000-0000-0000-000000000001", "runId": "run-1", @@ -202,23 +179,8 @@ def test_run_requires_auth_and_returns_202_task_id() -> None: assert authorized.json()["threadId"] == "00000000-0000-0000-0000-000000000001" assert authorized.json()["runId"] == "run-1" assert authorized.json()["created"] is False - - missing_header = client.post( - "/api/v1/agent/runs", - json={ - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-2", - "state": {}, - "messages": [{"id": "u2", "role": "user", "content": "hello"}], - "tools": [], - "context": [], - "forwardedProps": {}, - }, - ) - assert missing_header.status_code == 401 finally: agent_router._allow_run_request = original_allow_run # type: ignore[assignment] - agent_router._verified_access_token_for_user = original_verify_token # type: ignore[assignment] app.dependency_overrides = {} @@ -313,7 +275,8 @@ def test_history_returns_state_snapshot() -> None: try: unauthorized = client.get( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/history" + "/api/v1/agent/history", + params={"threadId": "00000000-0000-0000-0000-000000000001"}, ) assert unauthorized.status_code == 401 @@ -321,8 +284,11 @@ def test_history_returns_state_snapshot() -> None: id=uuid4(), email="user@example.com" ) authorized = client.get( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/history", - params={"before": "2026-03-07"}, + "/api/v1/agent/history", + params={ + "threadId": "00000000-0000-0000-0000-000000000001", + "before": "2026-03-07", + }, ) assert authorized.status_code == 200 payload = authorized.json() @@ -415,19 +381,10 @@ def test_resume_accepts_tool_message_without_user_message() -> None: id=uuid4(), email="user@example.com" ) client = TestClient(app) - original_verify_token = agent_router._verified_access_token_for_user - - def _verify_token(**kwargs: object) -> str: - if kwargs.get("authorization"): - return "token-ok" - raise HTTPException(status_code=401, detail="Unauthorized") - - agent_router._verified_access_token_for_user = _verify_token # type: ignore[assignment] try: response = client.post( "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/resume", - headers={"Authorization": "Bearer token-ok"}, json={ "threadId": "00000000-0000-0000-0000-000000000001", "runId": "run-resume-1", @@ -447,29 +404,7 @@ def test_resume_accepts_tool_message_without_user_message() -> None: ) assert response.status_code == 202 assert response.json()["taskId"] == "task-resume-1" - - missing_header = client.post( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/resume", - json={ - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-resume-2", - "state": {}, - "messages": [ - { - "id": "tool-2", - "role": "tool", - "toolCallId": "call-2", - "content": '{"toolName":"navigate_to_route","toolArgs":{"target":"/calendar/dayweek"},"nonce":"n2","result":{"ok":true}}', - } - ], - "tools": [], - "context": [], - "forwardedProps": {}, - }, - ) - assert missing_header.status_code == 401 finally: - agent_router._verified_access_token_for_user = original_verify_token # type: ignore[assignment] app.dependency_overrides = {} @@ -498,6 +433,30 @@ def test_upload_attachment_returns_reference() -> None: app.dependency_overrides = {} +def test_create_attachment_signed_url_returns_url() -> None: + app.dependency_overrides[get_agent_service] = lambda: _FakeAgentService() + app.dependency_overrides[get_current_user] = lambda: CurrentUser( + id=uuid4(), email="user@example.com" + ) + client = TestClient(app) + + try: + response = client.get( + "/api/v1/agent/attachments/signed-url", + params={ + "bucket": "bucket-test", + "path": "agent-inputs/user/thread/upload.png", + }, + ) + assert response.status_code == 200 + body = response.json() + assert body["bucket"] == "bucket-test" + assert body["path"] == "agent-inputs/user/thread/upload.png" + assert body["url"].startswith("https://signed.example/") + finally: + app.dependency_overrides = {} + + def test_asr_transcribe_returns_sync_transcript(monkeypatch) -> None: app.dependency_overrides[get_current_user] = lambda: CurrentUser( id=uuid4(), email="user@example.com" diff --git a/backend/tests/integration/v1/agent/test_sse_flow_live.py b/backend/tests/integration/v1/agent/test_sse_flow_live.py index f532a21..2cfb666 100644 --- a/backend/tests/integration/v1/agent/test_sse_flow_live.py +++ b/backend/tests/integration/v1/agent/test_sse_flow_live.py @@ -168,8 +168,9 @@ async def test_agent_runs_events_history_live_with_image_input() -> None: assert "RUN_FINISHED" in event_names or "RUN_ERROR" in event_names history_resp = await client.get( - f"{BASE_URL}/api/v1/agent/runs/{thread_id}/history", + f"{BASE_URL}/api/v1/agent/history", headers=headers, + params={"threadId": thread_id}, ) assert history_resp.status_code == 200 history = history_resp.json() @@ -183,10 +184,11 @@ async def test_agent_runs_events_history_live_with_image_input() -> None: if isinstance(item, dict) and item.get("role") == "user" ] assert user_messages - attachments = user_messages[0].get("attachments") - assert isinstance(attachments, list) - assert attachments and isinstance(attachments[0], dict) - assert isinstance(attachments[0].get("path"), str) + metadata = user_messages[0].get("metadata") + assert isinstance(metadata, dict) + user_attachment = metadata.get("user_message_attachments") + assert isinstance(user_attachment, dict) + assert isinstance(user_attachment.get("path"), str) async with AsyncSessionLocal() as session: session_row = await session.get(AgentChatSession, UUID(thread_id)) @@ -212,7 +214,6 @@ async def test_agent_runs_events_history_live_with_image_input() -> None: ] assert user_rows metadata = user_rows[0].metadata_json or {} - attachments = metadata.get("attachments") - assert isinstance(attachments, list) - assert attachments and isinstance(attachments[0], dict) - assert isinstance(attachments[0].get("path"), str) + user_attachment = metadata.get("user_message_attachments") + assert isinstance(user_attachment, dict) + assert isinstance(user_attachment.get("path"), str) diff --git a/backend/tests/unit/core/agentscope/events/test_agui_codec.py b/backend/tests/unit/core/agentscope/events/test_agui_codec.py index 0e3552d..aa9be89 100644 --- a/backend/tests/unit/core/agentscope/events/test_agui_codec.py +++ b/backend/tests/unit/core/agentscope/events/test_agui_codec.py @@ -50,10 +50,13 @@ def test_tool_result_wire_event_filters_sensitive_fields() -> None: "data": { "messageId": "tool-result-1", "toolCallId": "call-1", - "callId": "call-1", - "toolName": "calendar_write", - "content": "summary", - "ui": {"type": "calendar_operation.v1", "data": {"ok": True}}, + "toolAgentOutput": { + "tool_name": "calendar_write", + "tool_call_id": "call-1", + "status": "success", + "result_summary": "summary", + "tool_call_args": {}, + }, "args": {"token": "secret"}, "result": {"raw": "secret"}, "error": "stack trace", @@ -65,9 +68,32 @@ def test_tool_result_wire_event_filters_sensitive_fields() -> None: assert result["type"] == "TOOL_CALL_RESULT" assert result["messageId"] == "tool-result-1" assert result["toolCallId"] == "call-1" - assert result["toolName"] == "calendar_write" - assert result["content"] == "summary" - assert isinstance(result.get("ui"), dict) + assert isinstance(result.get("toolAgentOutput"), dict) assert "args" not in result assert "result" not in result assert "error" not in result + + +def test_text_end_event_only_keeps_protocol_fields() -> None: + internal = { + "type": "text.end", + "threadId": "thread-1", + "runId": "run-1", + "data": { + "messageId": "assistant-run-1", + "workerAgentOutput": {"answer": "done", "status": "success"}, + "stage": "worker", + "model": "qwen", + "inputTokens": 1, + "outputTokens": 2, + }, + } + + result = to_agui_wire_event(internal) + + assert result["type"] == "TEXT_MESSAGE_END" + assert result["messageId"] == "assistant-run-1" + assert isinstance(result.get("workerAgentOutput"), dict) + assert "stage" not in result + assert "model" not in result + assert "inputTokens" not in result diff --git a/backend/tests/unit/core/agentscope/events/test_store.py b/backend/tests/unit/core/agentscope/events/test_store.py index e487b6a..f2a3fc6 100644 --- a/backend/tests/unit/core/agentscope/events/test_store.py +++ b/backend/tests/unit/core/agentscope/events/test_store.py @@ -49,49 +49,11 @@ class _FakeToolResultStorage: return path -@pytest.mark.asyncio -async def test_store_marks_session_running_on_run_started( +def _patch_repositories( monkeypatch: pytest.MonkeyPatch, + captured: dict[str, object], + fake_chat_session: Any, ) -> None: - captured: dict[str, object] = {} - fake_chat_session = SimpleNamespace(state_snapshot=None) - - class _FakeSessionRepository: - def __init__(self, session: object) -> None: - del session - - async def get_session(self, *, session_id): # noqa: ANN001 - captured["session_id"] = session_id - return fake_chat_session - - async def update_runtime_state(self, **kwargs): # noqa: ANN003 - captured.update(kwargs) - - monkeypatch.setattr(store_module, "SessionRepository", _FakeSessionRepository) - monkeypatch.setattr(store_module, "AgentChatSessionStatus", _SessionStatus) - store = store_module.SqlAlchemyEventStore(session_factory=lambda: _FakeSessionCtx()) - - await store.persist( - { - "type": "RUN_STARTED", - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-1", - } - ) - - assert captured["status"] == _SessionStatus.RUNNING - assert captured["message_delta"] == 0 - assert captured["token_delta"] == 0 - assert captured["cost_delta"] == Decimal("0") - - -@pytest.mark.asyncio -async def test_store_persists_assistant_message_and_aggregates( - monkeypatch: pytest.MonkeyPatch, -) -> None: - captured: dict[str, object] = {} - fake_chat_session = SimpleNamespace(state_snapshot={"k": "v"}, message_count=6) - class _FakeSessionRepository: def __init__(self, session: object) -> None: del session @@ -118,6 +80,14 @@ async def test_store_persists_assistant_message_and_aggregates( monkeypatch.setattr(store_module, "MessageRepository", _FakeMessageRepository) monkeypatch.setattr(store_module, "AgentChatSessionStatus", _SessionStatus) + +@pytest.mark.asyncio +async def test_store_persists_worker_output_with_answer_as_content( + monkeypatch: pytest.MonkeyPatch, +) -> None: + captured: dict[str, object] = {} + fake_chat_session = SimpleNamespace(state_snapshot={}, message_count=6) + _patch_repositories(monkeypatch, captured, fake_chat_session) store = store_module.SqlAlchemyEventStore(session_factory=lambda: _FakeSessionCtx()) await store.persist( @@ -127,7 +97,7 @@ async def test_store_persists_assistant_message_and_aggregates( "runId": "run-1", "messageId": "assistant-run-1", "role": "assistant", - "stage": "report", + "stage": "worker", } ) await store.persist( @@ -136,7 +106,7 @@ async def test_store_persists_assistant_message_and_aggregates( "threadId": "00000000-0000-0000-0000-000000000001", "runId": "run-1", "messageId": "assistant-run-1", - "delta": "hello", + "delta": "legacy-text", } ) await store.persist( @@ -149,177 +119,34 @@ async def test_store_persists_assistant_message_and_aggregates( "outputTokens": 5, "cost": "0.123", "latencyMs": 250, + "workerAgentOutput": { + "status": "success", + "answer": "worker-answer", + "key_points": [], + "result_type": "summary", + "suggested_actions": [], + "error": None, + }, } ) append_kwargs = cast(dict[str, Any], captured["append_kwargs"]) assert append_kwargs["seq"] == 7 - assert append_kwargs["content"] == "hello" - assert append_kwargs["input_tokens"] == 3 - assert append_kwargs["output_tokens"] == 5 + assert append_kwargs["content"] == "worker-answer" + metadata = cast(dict[str, Any], append_kwargs["metadata"]) + assert metadata["worker_agent_output"]["answer"] == "worker-answer" assert append_kwargs["cost"] == Decimal("0.123") - assert append_kwargs["metadata"]["latency_ms"] == 250 - assert append_kwargs["metadata"]["stage"] == "report" - assert append_kwargs["latency_ms"] == 250 assert captured["message_delta"] == 1 assert captured["token_delta"] == 8 - assert captured["cost_delta"] == Decimal("0.123") @pytest.mark.asyncio -async def test_store_uses_canonical_thread_id_for_buffer_keys( - monkeypatch: pytest.MonkeyPatch, -) -> None: - captured: dict[str, object] = {} - fake_chat_session = SimpleNamespace(state_snapshot={}, message_count=1) - - class _FakeSessionRepository: - def __init__(self, session: object) -> None: - del session - - async def get_session(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def lock_session_for_update(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def update_runtime_state(self, **kwargs): # noqa: ANN003 - captured.update(kwargs) - - class _FakeMessageRepository: - def __init__(self, session: object) -> None: - del session - - async def append_message(self, **kwargs): # noqa: ANN003 - captured["append_kwargs"] = kwargs - - monkeypatch.setattr(store_module, "SessionRepository", _FakeSessionRepository) - monkeypatch.setattr(store_module, "MessageRepository", _FakeMessageRepository) - monkeypatch.setattr(store_module, "AgentChatSessionStatus", _SessionStatus) - - store = store_module.SqlAlchemyEventStore(session_factory=lambda: _FakeSessionCtx()) - compact_thread_id = "00000000000000000000000000000001" - - await store.persist( - { - "type": "TEXT_MESSAGE_CONTENT", - "threadId": compact_thread_id, - "runId": "run-1", - "messageId": "assistant-run-1", - "delta": "hello", - } - ) - await store.persist( - { - "type": "TEXT_MESSAGE_END", - "threadId": compact_thread_id, - "runId": "run-1", - "messageId": "assistant-run-1", - } - ) - - append_kwargs = cast(dict[str, Any], captured["append_kwargs"]) - assert append_kwargs["content"] == "hello" - - -@pytest.mark.asyncio -async def test_store_clears_buffer_on_run_finished( - monkeypatch: pytest.MonkeyPatch, -) -> None: - captured: dict[str, object] = {} - fake_chat_session = SimpleNamespace(state_snapshot={}, message_count=0) - - class _FakeSessionRepository: - def __init__(self, session: object) -> None: - del session - - async def get_session(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def lock_session_for_update(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def update_runtime_state(self, **kwargs): # noqa: ANN003 - captured.update(kwargs) - - class _FakeMessageRepository: - def __init__(self, session: object) -> None: - del session - - async def append_message(self, **kwargs): # noqa: ANN003 - captured["append_kwargs"] = kwargs - - monkeypatch.setattr(store_module, "SessionRepository", _FakeSessionRepository) - monkeypatch.setattr(store_module, "MessageRepository", _FakeMessageRepository) - monkeypatch.setattr(store_module, "AgentChatSessionStatus", _SessionStatus) - - store = store_module.SqlAlchemyEventStore(session_factory=lambda: _FakeSessionCtx()) - thread_id = "00000000-0000-0000-0000-000000000001" - - await store.persist( - { - "type": "TEXT_MESSAGE_CONTENT", - "threadId": thread_id, - "runId": "run-1", - "messageId": "assistant-run-1", - "delta": "stale", - } - ) - await store.persist( - { - "type": "RUN_FINISHED", - "threadId": thread_id, - "runId": "run-1", - } - ) - await store.persist( - { - "type": "TEXT_MESSAGE_END", - "threadId": thread_id, - "runId": "run-1", - "messageId": "assistant-run-1", - } - ) - - assert "append_kwargs" not in captured - - -@pytest.mark.asyncio -async def test_store_persists_tool_call_result_as_tool_message( +async def test_store_persists_tool_output_with_summary_as_content( monkeypatch: pytest.MonkeyPatch, ) -> None: captured: dict[str, object] = {} fake_chat_session = SimpleNamespace(state_snapshot={}, message_count=2) - - class _FakeSessionRepository: - def __init__(self, session: object) -> None: - del session - - async def get_session(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def lock_session_for_update(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def update_runtime_state(self, **kwargs): # noqa: ANN003 - captured.update(kwargs) - - class _FakeMessageRepository: - def __init__(self, session: object) -> None: - del session - - async def append_message(self, **kwargs): # noqa: ANN003 - captured["append_kwargs"] = kwargs - - monkeypatch.setattr(store_module, "SessionRepository", _FakeSessionRepository) - monkeypatch.setattr(store_module, "MessageRepository", _FakeMessageRepository) - monkeypatch.setattr(store_module, "AgentChatSessionStatus", _SessionStatus) + _patch_repositories(monkeypatch, captured, fake_chat_session) fake_storage = _FakeToolResultStorage() store = store_module.SqlAlchemyEventStore( @@ -334,128 +161,23 @@ async def test_store_persists_tool_call_result_as_tool_message( "runId": "run-1", "toolName": "calendar_write", "taskId": "t1", - "stage": "execution", - "args": {"title": "A"}, - "result": {"event_id": "evt-1", "token": "secret"}, + "stage": "worker", + "toolAgentOutput": { + "tool_name": "calendar_write", + "tool_call_id": "call-1", + "tool_call_args": {"title": "A"}, + "status": "success", + "result_summary": "已创建日程 A", + "ui_hints": None, + "error": None, + }, } ) append_kwargs = cast(dict[str, Any], captured["append_kwargs"]) assert getattr(append_kwargs["role"], "value", None) == "tool" - assert append_kwargs["tool_name"] == "calendar_write" - assert append_kwargs["metadata"]["task_id"] == "t1" - tool_call_id = append_kwargs["metadata"]["tool_call_id"] - assert isinstance(tool_call_id, str) - assert tool_call_id.startswith("run-1-t1-") - assert append_kwargs["metadata"]["storage_bucket"] == "agent-tool-results" - assert isinstance(append_kwargs["metadata"]["storage_path"], str) - assert append_kwargs["content"].startswith("已创建日程") + assert append_kwargs["content"] == "已创建日程 A" + metadata = cast(dict[str, Any], append_kwargs["metadata"]) + assert metadata["tool_agent_output"]["result_summary"] == "已创建日程 A" + assert metadata["storage_bucket"] == "agent-tool-results" assert len(fake_storage.upload_calls) == 1 - uploaded = fake_storage.upload_calls[0] - assert uploaded["bucket"] == "agent-tool-results" - payload = cast(dict[str, Any], uploaded["payload"]) - assert payload["toolName"] == "calendar_write" - assert "args" not in payload - assert isinstance(payload.get("result"), dict) - assert payload["result"]["token"] == "[REDACTED]" - assert captured["message_delta"] == 1 - - -@pytest.mark.asyncio -async def test_store_sanitizes_nested_sensitive_fields_in_result_payload( - monkeypatch: pytest.MonkeyPatch, -) -> None: - captured: dict[str, object] = {} - fake_chat_session = SimpleNamespace(state_snapshot={}, message_count=0) - - class _FakeSessionRepository: - def __init__(self, session: object) -> None: - del session - - async def get_session(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def lock_session_for_update(self, *, session_id): # noqa: ANN001 - del session_id - return fake_chat_session - - async def update_runtime_state(self, **kwargs): # noqa: ANN003 - captured.update(kwargs) - - class _FakeMessageRepository: - def __init__(self, session: object) -> None: - del session - - async def append_message(self, **kwargs): # noqa: ANN003 - captured["append_kwargs"] = kwargs - - monkeypatch.setattr(store_module, "SessionRepository", _FakeSessionRepository) - monkeypatch.setattr(store_module, "MessageRepository", _FakeMessageRepository) - monkeypatch.setattr(store_module, "AgentChatSessionStatus", _SessionStatus) - - fake_storage = _FakeToolResultStorage() - store = store_module.SqlAlchemyEventStore( - session_factory=lambda: _FakeSessionCtx(), - tool_result_storage=fake_storage, - tool_result_bucket="agent-tool-results", - ) - await store.persist( - { - "type": "TOOL_CALL_RESULT", - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-1", - "toolName": "calendar_write", - "result": { - "data": { - "ok": True, - "accessToken": "secret-a", - "nested": { - "refresh_token": "secret-b", - }, - "items": [ - {"authorizationHeader": "secret-c"}, - ], - } - }, - } - ) - - payload = cast(dict[str, Any], fake_storage.upload_calls[0]["payload"]) - stored_result = cast(dict[str, Any], payload["result"]) - data = cast(dict[str, Any], stored_result["data"]) - assert data["accessToken"] == "[REDACTED]" - nested = cast(dict[str, Any], data["nested"]) - assert nested["refresh_token"] == "[REDACTED]" - items = cast(list[Any], data["items"]) - assert isinstance(items[0], dict) - assert items[0]["authorizationHeader"] == "[REDACTED]" - - -@pytest.mark.asyncio -async def test_store_drops_buffer_when_session_missing( - monkeypatch: pytest.MonkeyPatch, -) -> None: - class _FakeSessionRepository: - def __init__(self, session: object) -> None: - del session - - async def get_session(self, *, session_id): # noqa: ANN001 - del session_id - return None - - monkeypatch.setattr(store_module, "SessionRepository", _FakeSessionRepository) - - store = store_module.SqlAlchemyEventStore(session_factory=lambda: _FakeSessionCtx()) - thread_id = "00000000-0000-0000-0000-000000000001" - - await store.persist( - { - "type": "TEXT_MESSAGE_CONTENT", - "threadId": thread_id, - "messageId": "assistant-run-1", - "delta": "orphan", - } - ) - - assert store._message_buffers == {} diff --git a/backend/tests/unit/core/agentscope/runtime/test_agent_route_runtime.py b/backend/tests/unit/core/agentscope/runtime/test_agent_route_runtime.py deleted file mode 100644 index 9f6f814..0000000 --- a/backend/tests/unit/core/agentscope/runtime/test_agent_route_runtime.py +++ /dev/null @@ -1,608 +0,0 @@ -from __future__ import annotations - -from typing import Any, cast -from uuid import uuid4 - -import pytest -from sqlalchemy.ext.asyncio import AsyncSession - -from core.agentscope.schemas.user_context import ( - UserAgentContext, - parse_profile_settings, -) -from core.agentscope.runtime.agent_route_runtime import AgentRouteRuntime -from core.agentscope.schemas import ReportOutput, RuntimeOutput -from core.agentscope.schemas.agent_runtime import RunCommand -from core.agentscope.schemas.execution import ExecutionBatchOutput, ExecutionTaskOutput -from core.agentscope.schemas.execution import ExecutionToolCall -from core.agentscope.schemas.intent import IntentOutput, IntentTask - - -def _user_context() -> UserAgentContext: - return UserAgentContext( - user_id=uuid4(), - username="tester", - bio=None, - settings=parse_profile_settings( - { - "version": 1, - "preferences": { - "interface_language": "zh-CN", - "ai_language": "zh-CN", - "timezone": "Asia/Shanghai", - "country": "CN", - }, - } - ), - ) - - -@pytest.mark.asyncio -async def test_runtime_emits_started_text_and_finished_events() -> None: - calls: list[dict[str, Any]] = [] - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - calls.append(event) - return f"{len(calls)}-0" - - class _FakeOrchestrator: - async def run(self, **_: object) -> RuntimeOutput: - return RuntimeOutput( - intent=IntentOutput( - route="TASK_EXECUTION", - intent_summary="summary", - direct_response=None, - tasks=[IntentTask(task_id="t1", title="exec", objective="do")], - complexity="complex", - response_metadata={"latencyMs": 120}, - ), - execution=ExecutionBatchOutput( - task_results=[ - ExecutionTaskOutput( - task_id="t1", - status="SUCCESS", - execution_summary="execution-ok", - execution_data={}, - user_feedback_needs=[], - response_metadata={"latencyMs": 300}, - tool_calls=[ - ExecutionToolCall( - tool_name="calendar_write", - args={"title": "A"}, - result={"event_id": "evt-1"}, - ) - ], - ) - ], - overall_status="SUCCESS", - aggregate_summary="ok", - ), - report=ReportOutput( - assistant_text="hello world", - response_metadata={ - "model": "qwen3.5-flash", - "inputTokens": 10, - "outputTokens": 5, - "cost": 0.123, - "latencyMs": 250, - }, - ), - ) - - runtime = AgentRouteRuntime( - orchestrator=_FakeOrchestrator(), pipeline=_FakePipeline() - ) - command = RunCommand(threadId="thread-1", runId="run-1", messages=[]) - - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - assert [item["type"] for item in calls] == [ - "run.started", - "step.start", - "step.finish", - "step.start", - "text.start", - "text.delta", - "text.end", - "text.start", - "text.delta", - "text.end", - "tool.result", - "step.finish", - "step.start", - "text.start", - "text.delta", - "text.end", - "step.finish", - "run.finished", - ] - assert calls[1]["data"]["stepName"] == "intent" - assert calls[2]["data"]["stepName"] == "intent" - assert calls[3]["data"]["stepName"] == "execution" - assert calls[4]["data"]["stage"] == "intent" - assert calls[7]["data"]["stage"] == "execution" - assert calls[10]["data"]["toolName"] == "calendar_write" - assert calls[10]["data"]["toolCallId"] == "run-1-t1-1" - assert calls[10]["data"]["messageId"] == "tool-result-run-1-t1-1" - tool_content = calls[10]["data"]["content"] - assert tool_content == "calendar_write 执行完成" - assert calls[11]["data"]["stepName"] == "execution" - assert calls[12]["data"]["stepName"] == "report" - assert calls[14]["data"]["delta"] == "hello world" - assert calls[13]["data"]["messageId"] == calls[14]["data"]["messageId"] - assert calls[14]["data"]["messageId"] == calls[15]["data"]["messageId"] - assert calls[15]["data"]["model"] == "qwen3.5-flash" - assert calls[15]["data"]["inputTokens"] == 10 - assert calls[15]["data"]["outputTokens"] == 5 - assert calls[15]["data"]["cost"] == 0.123 - assert calls[15]["data"]["latencyMs"] == 250 - assert calls[16]["data"]["stepName"] == "report" - - -@pytest.mark.asyncio -async def test_runtime_emits_run_error_when_orchestrator_fails() -> None: - calls: list[dict[str, Any]] = [] - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - calls.append(event) - return f"{len(calls)}-0" - - class _FailOrchestrator: - async def run(self, **_: object) -> RuntimeOutput: - raise RuntimeError("boom") - - runtime = AgentRouteRuntime( - orchestrator=_FailOrchestrator(), - pipeline=_FakePipeline(), - ) - command = RunCommand(threadId="thread-1", runId="run-1", messages=[]) - - with pytest.raises(RuntimeError, match="boom"): - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - assert [item["type"] for item in calls] == [ - "run.started", - "step.start", - "run.error", - ] - assert calls[1]["data"]["stepName"] == "intent" - assert calls[2]["data"]["message"] == "runtime execution failed" - - -@pytest.mark.asyncio -async def test_runtime_passes_binary_payload_to_orchestrator() -> None: - captured_user_input: object | None = None - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - return str(event.get("type", "")) - - class _CaptureOrchestrator: - async def run(self, **kwargs: object) -> RuntimeOutput: - nonlocal captured_user_input - captured_user_input = kwargs.get("user_input") - return RuntimeOutput( - intent=IntentOutput( - route="DIRECT_RESPONSE", - intent_summary="summary", - direct_response="done", - tasks=[], - complexity="simple", - ), - execution=None, - report=ReportOutput( - assistant_text="ok", - response_metadata={}, - ), - ) - - runtime = AgentRouteRuntime( - orchestrator=_CaptureOrchestrator(), - pipeline=_FakePipeline(), - ) - command = RunCommand.model_validate( - { - "threadId": "thread-1", - "runId": "run-1", - "messages": [ - { - "id": "u1", - "role": "user", - "content": [ - {"type": "text", "text": "hello"}, - { - "type": "binary", - "mimeType": "image/png", - "data": "aGVsbG8=", - }, - ], - } - ], - } - ) - - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - assert isinstance(captured_user_input, list) - first = captured_user_input[0] - assert isinstance(first, dict) - content = first.get("content") - assert isinstance(content, list) - binary = content[1] - assert isinstance(binary, dict) - assert binary.get("data") == "aGVsbG8=" - - -@pytest.mark.asyncio -async def test_runtime_direct_response_finishes_without_report_stage() -> None: - calls: list[dict[str, Any]] = [] - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - calls.append(event) - return f"{len(calls)}-0" - - class _DirectOrchestrator: - async def run(self, **_: object) -> RuntimeOutput: - return RuntimeOutput( - intent=IntentOutput( - route="DIRECT_RESPONSE", - intent_summary="summary", - direct_response="direct-answer", - tasks=[], - complexity="simple", - response_metadata={"latencyMs": 88}, - ), - execution=None, - report=ReportOutput( - assistant_text="direct-answer", - response_metadata={"latencyMs": 88}, - ), - ) - - runtime = AgentRouteRuntime( - orchestrator=_DirectOrchestrator(), - pipeline=_FakePipeline(), - ) - command = RunCommand(threadId="thread-1", runId="run-1", messages=[]) - - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - assert [item["type"] for item in calls] == [ - "run.started", - "step.start", - "step.finish", - "text.start", - "text.delta", - "text.end", - "run.finished", - ] - assert calls[3]["data"]["stage"] == "intent" - assert calls[4]["data"]["delta"] == "direct-answer" - - -@pytest.mark.asyncio -async def test_runtime_tool_result_parses_json_string_ui_payload() -> None: - calls: list[dict[str, Any]] = [] - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - calls.append(event) - return f"{len(calls)}-0" - - class _FakeOrchestrator: - async def run(self, **_: object) -> RuntimeOutput: - return RuntimeOutput( - intent=IntentOutput( - route="TASK_EXECUTION", - intent_summary="summary", - direct_response=None, - tasks=[IntentTask(task_id="t1", title="exec", objective="do")], - complexity="complex", - response_metadata={}, - ), - execution=ExecutionBatchOutput( - task_results=[ - ExecutionTaskOutput( - task_id="t1", - status="SUCCESS", - execution_summary="execution-ok", - execution_data={}, - user_feedback_needs=[], - response_metadata={}, - tool_calls=[ - ExecutionToolCall( - tool_name="calendar_write", - args={"title": "A"}, - result='{"type":"calendar_card.v1","version":"v1","data":{"ok":true,"title":"A"},"actions":[]}', - ) - ], - ) - ], - overall_status="SUCCESS", - aggregate_summary="ok", - ), - report=ReportOutput( - assistant_text="hello world", - response_metadata={}, - ), - ) - - runtime = AgentRouteRuntime( - orchestrator=_FakeOrchestrator(), pipeline=_FakePipeline() - ) - command = RunCommand(threadId="thread-1", runId="run-1", messages=[]) - - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - tool_events = [item for item in calls if item.get("type") == "tool.result"] - assert len(tool_events) == 1 - data = tool_events[0]["data"] - assert isinstance(data, dict) - assert isinstance(data.get("ui"), dict) - assert data["ui"]["type"] == "calendar_card.v1" - - -@pytest.mark.asyncio -async def test_runtime_tool_result_keeps_plain_text_content() -> None: - calls: list[dict[str, Any]] = [] - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - calls.append(event) - return f"{len(calls)}-0" - - class _FakeOrchestrator: - async def run(self, **_: object) -> RuntimeOutput: - return RuntimeOutput( - intent=IntentOutput( - route="TASK_EXECUTION", - intent_summary="summary", - direct_response=None, - tasks=[IntentTask(task_id="t1", title="exec", objective="do")], - complexity="complex", - response_metadata={}, - ), - execution=ExecutionBatchOutput( - task_results=[ - ExecutionTaskOutput( - task_id="t1", - status="SUCCESS", - execution_summary="execution-ok", - execution_data={}, - user_feedback_needs=[], - response_metadata={}, - tool_calls=[ - ExecutionToolCall( - tool_name="calendar_write", - args={"title": "A"}, - result="created successfully", - ) - ], - ) - ], - overall_status="SUCCESS", - aggregate_summary="ok", - ), - report=ReportOutput( - assistant_text="hello world", - response_metadata={}, - ), - ) - - runtime = AgentRouteRuntime( - orchestrator=_FakeOrchestrator(), pipeline=_FakePipeline() - ) - command = RunCommand(threadId="thread-1", runId="run-1", messages=[]) - - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - tool_events = [item for item in calls if item.get("type") == "tool.result"] - assert len(tool_events) == 1 - data = tool_events[0]["data"] - assert isinstance(data, dict) - assert data["content"] == "created successfully" - - -@pytest.mark.asyncio -async def test_runtime_tool_result_sanitizes_sensitive_payload() -> None: - calls: list[dict[str, Any]] = [] - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - calls.append(event) - return f"{len(calls)}-0" - - class _FakeOrchestrator: - async def run(self, **_: object) -> RuntimeOutput: - return RuntimeOutput( - intent=IntentOutput( - route="TASK_EXECUTION", - intent_summary="summary", - direct_response=None, - tasks=[IntentTask(task_id="t1", title="exec", objective="do")], - complexity="complex", - response_metadata={}, - ), - execution=ExecutionBatchOutput( - task_results=[ - ExecutionTaskOutput( - task_id="t1", - status="SUCCESS", - execution_summary="execution-ok", - execution_data={}, - user_feedback_needs=[], - response_metadata={}, - tool_calls=[ - ExecutionToolCall( - tool_name="calendar_write", - args={ - "title": "A", - "accessToken": "arg-secret", - "author": "alice", - }, - result={ - "ok": True, - "accessToken": "secret-token", - "message": "Authorization: Bearer inline-token", - "nested": [ - { - "authorizationHeader": "Bearer abc", - } - ], - }, - error="failed authorization=Bearer abc123 detail", - ) - ], - ) - ], - overall_status="SUCCESS", - aggregate_summary="ok", - ), - report=ReportOutput( - assistant_text="hello world", - response_metadata={}, - ), - ) - - runtime = AgentRouteRuntime( - orchestrator=_FakeOrchestrator(), pipeline=_FakePipeline() - ) - command = RunCommand(threadId="thread-1", runId="run-1", messages=[]) - - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - tool_events = [item for item in calls if item.get("type") == "tool.result"] - assert len(tool_events) == 1 - data = tool_events[0]["data"] - assert isinstance(data, dict) - assert isinstance(data["result"], dict) - assert data["result"]["accessToken"] == "[REDACTED]" - assert data["result"]["message"] == "Authorization=[REDACTED]" - nested = data["result"]["nested"] - assert isinstance(nested, list) - assert nested[0]["authorizationHeader"] == "[REDACTED]" - assert isinstance(data["args"], dict) - assert data["args"]["accessToken"] == "[REDACTED]" - assert data["args"]["author"] == "alice" - assert data["error"] == "failed authorization=[REDACTED] detail" - - -@pytest.mark.asyncio -async def test_runtime_tool_result_keeps_non_object_result() -> None: - calls: list[dict[str, Any]] = [] - - class _FakePipeline: - async def emit(self, *, session_id: str, event: dict[str, object]) -> str: - assert session_id == "thread-1" - calls.append(event) - return f"{len(calls)}-0" - - class _FakeOrchestrator: - async def run(self, **_: object) -> RuntimeOutput: - return RuntimeOutput( - intent=IntentOutput( - route="TASK_EXECUTION", - intent_summary="summary", - direct_response=None, - tasks=[IntentTask(task_id="t1", title="exec", objective="do")], - complexity="complex", - response_metadata={}, - ), - execution=ExecutionBatchOutput( - task_results=[ - ExecutionTaskOutput( - task_id="t1", - status="SUCCESS", - execution_summary="execution-ok", - execution_data={}, - user_feedback_needs=[], - response_metadata={}, - tool_calls=[ - ExecutionToolCall( - tool_name="calendar_write", - args={"title": "A"}, - result=["evt-1", "evt-2"], - ) - ], - ) - ], - overall_status="SUCCESS", - aggregate_summary="ok", - ), - report=ReportOutput( - assistant_text="hello world", - response_metadata={}, - ), - ) - - runtime = AgentRouteRuntime( - orchestrator=_FakeOrchestrator(), pipeline=_FakePipeline() - ) - command = RunCommand(threadId="thread-1", runId="run-1", messages=[]) - - await runtime.run( - command=command, - owner_id=uuid4(), - user_token="token", - user_context=_user_context(), - session=cast(AsyncSession, object()), - ) - - tool_events = [item for item in calls if item.get("type") == "tool.result"] - assert len(tool_events) == 1 - data = tool_events[0]["data"] - assert isinstance(data, dict) - assert isinstance(data["result"], dict) - assert data["result"]["value"] == ["evt-1", "evt-2"] diff --git a/backend/tests/unit/core/agentscope/runtime/test_orchestrator.py b/backend/tests/unit/core/agentscope/runtime/test_orchestrator.py index b831368..c9e5371 100644 --- a/backend/tests/unit/core/agentscope/runtime/test_orchestrator.py +++ b/backend/tests/unit/core/agentscope/runtime/test_orchestrator.py @@ -1,229 +1,144 @@ from __future__ import annotations -from types import SimpleNamespace -from typing import Any, cast -from uuid import uuid4 +from typing import Any +from uuid import UUID import pytest -from sqlalchemy.ext.asyncio import AsyncSession -from core.agentscope.schemas.system_agent_config import SystemAgentLLMConfig -from core.agentscope.schemas.user_context import ( - UserAgentContext, - parse_profile_settings, -) -from core.agentscope.runtime.config_loader import RuntimeStageConfig from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator +from schemas.user import UserContext, parse_profile_settings -def _ctx() -> UserAgentContext: - return UserAgentContext( - user_id=uuid4(), - username="alice", - bio=None, - settings=parse_profile_settings( - { - "version": 1, - "preferences": { - "interface_language": "zh-CN", - "ai_language": "zh-CN", - "timezone": "Asia/Shanghai", - "country": "CN", - }, - } - ), - ) +class _FakePipeline: + def __init__(self) -> None: + self.events: list[dict[str, Any]] = [] - -def _stage_config() -> dict[str, RuntimeStageConfig]: - llm = SystemAgentLLMConfig(temperature=0.1, max_tokens=256, timeout_seconds=30) - return { - "intent": RuntimeStageConfig("intent", "qwen3.5-flash", "dashscope", llm), - "execution": RuntimeStageConfig("execution", "deepseek-chat", "deepseek", llm), - "report": RuntimeStageConfig("report", "deepseek-chat", "deepseek", llm), - } + async def emit(self, *, session_id: str, event: dict[str, Any]) -> str: + self.events.append({"session_id": session_id, "event": event}) + return "1-0" class _FakeRunner: def __init__(self) -> None: - self.intent_calls = 0 - self.execution_calls = 0 - self.report_calls = 0 + self.last_user_input: str | list[dict[str, Any]] | None = None - async def run_json_stage( + async def run_router_then_worker( self, *, - stage_config: RuntimeStageConfig, - agent_name: str, - system_prompt: str, - user_prompt: str, - toolkit: Any | None, + session, + user_context, + user_input, + router_toolkit, + worker_toolkit, + extra_context=None, ) -> dict[str, Any]: - del agent_name, system_prompt, user_prompt, toolkit - if stage_config.stage == "intent": - self.intent_calls += 1 - return { - "route": "DIRECT_RESPONSE", - "intent_summary": "直接问候", - "direct_response": "你好", - "tasks": [], - "complexity": "simple", - "response_metadata": {"model": "qwen3.5-flash", "latencyMs": 100}, - } - self.report_calls += 1 + del session, user_context, router_toolkit, worker_toolkit, extra_context + self.last_user_input = user_input return { - "assistant_text": "已完成", - "response_metadata": {"source": "report-agent"}, + "worker": { + "status": "success", + "answer": "done", + "key_points": [], + "result_type": "summary", + "suggested_actions": [], + "error": None, + "response_metadata": { + "model": "qwen3.5-flash", + "inputTokens": 10, + "outputTokens": 5, + }, + } } -class _ComplexRunner(_FakeRunner): - async def run_json_stage( - self, - *, - stage_config: RuntimeStageConfig, - agent_name: str, - system_prompt: str, - user_prompt: str, - toolkit: Any | None, - ) -> dict[str, Any]: - del agent_name, system_prompt, user_prompt, toolkit - if stage_config.stage == "intent": - self.intent_calls += 1 - return { - "route": "TASK_EXECUTION", - "intent_summary": "需要写入日历", - "direct_response": None, - "tasks": [ - {"task_id": "t1", "title": "创建事件", "objective": "写入明天会议"} - ], - "complexity": "complex", - } - if stage_config.stage == "execution": - self.execution_calls += 1 - return { - "task_id": "t1", - "status": "SUCCESS", - "execution_summary": "done", - "execution_data": {}, - "user_feedback_needs": [], - } - self.report_calls += 1 - return { - "assistant_text": "任务执行完成", - "response_metadata": {"source": "report-agent"}, - } +def _user_context() -> UserContext: + return UserContext( + id="00000000-0000-0000-0000-000000000001", + username="alice", + email="alice@example.com", + avatar_url=None, + bio=None, + settings=parse_profile_settings(None), + ) -@pytest.mark.asyncio -async def test_runtime_direct_response_skips_execution( - monkeypatch: pytest.MonkeyPatch, -) -> None: - fake_runner = _FakeRunner() +def _run_command_with_binary() -> Any: + from ag_ui.core import RunAgentInput - async def _fake_config_loader( - _session: AsyncSession, - ) -> dict[str, RuntimeStageConfig]: - return _stage_config() - - class _FakeToolkit: - def get_json_schemas(self) -> list[dict[str, Any]]: - return [ + return RunAgentInput.model_validate( + { + "threadId": "00000000-0000-0000-0000-000000000010", + "runId": "run-1", + "state": {}, + "messages": [ { - "type": "function", - "function": { - "name": "calendar_read", - "description": "read", - "parameters": {"type": "object", "properties": {}}, - }, + "id": "u1", + "role": "user", + "content": [ + {"type": "text", "text": "看这张图"}, + { + "type": "binary", + "mimeType": "image/png", + "url": "https://example.com/signed.png", + }, + ], } - ] - - async def call_tool_function(self, tool_call: dict[str, Any]): - del tool_call - if False: - yield None - - monkeypatch.setattr( - "core.agentscope.runtime.orchestrator.build_stage_toolkit", - lambda **_: _FakeToolkit(), + ], + "tools": [], + "context": [], + "forwardedProps": {}, + } ) - orchestrator = AgentScopeRuntimeOrchestrator( - runner=fake_runner, - config_loader=_fake_config_loader, - ) - result = await orchestrator.run( - session=cast(AsyncSession, SimpleNamespace()), - owner_id=uuid4(), - user_token="token", - user_context=_ctx(), - user_input="你好", - ) - - assert result.intent.route == "DIRECT_RESPONSE" - assert result.execution is None - assert result.report.assistant_text == "你好" - assert result.report.response_metadata["model"] == "qwen3.5-flash" - assert fake_runner.execution_calls == 0 - assert fake_runner.report_calls == 0 - @pytest.mark.asyncio -async def test_runtime_complex_route_runs_execution( +async def test_orchestrator_maps_binary_to_model_image_url( monkeypatch: pytest.MonkeyPatch, ) -> None: - fake_runner = _ComplexRunner() - - async def _fake_config_loader( - _session: AsyncSession, - ) -> dict[str, RuntimeStageConfig]: - return _stage_config() - - class _FakeToolkit: - def get_json_schemas(self) -> list[dict[str, Any]]: - return [ - { - "type": "function", - "function": { - "name": "calendar_read", - "description": "read", - "parameters": {"type": "object", "properties": {}}, - }, - }, - { - "type": "function", - "function": { - "name": "calendar_write", - "description": "write", - "parameters": {"type": "object", "properties": {}}, - }, - }, - ] - - async def call_tool_function(self, tool_call: dict[str, Any]): - del tool_call - if False: - yield None - + pipeline = _FakePipeline() + runner = _FakeRunner() monkeypatch.setattr( "core.agentscope.runtime.orchestrator.build_stage_toolkit", - lambda **_: _FakeToolkit(), + lambda **_: None, + ) + orchestrator = AgentScopeRuntimeOrchestrator(pipeline=pipeline, runner=runner) + + await orchestrator.run( + command=_run_command_with_binary(), + owner_id=UUID("00000000-0000-0000-0000-000000000001"), + user_context=_user_context(), + session=None, ) - orchestrator = AgentScopeRuntimeOrchestrator( - runner=fake_runner, - config_loader=_fake_config_loader, - ) - result = await orchestrator.run( - session=cast(AsyncSession, SimpleNamespace()), - owner_id=uuid4(), - user_token="token", - user_context=_ctx(), - user_input="帮我安排明天会议", + assert isinstance(runner.last_user_input, list) + assert runner.last_user_input[0]["type"] == "text" + assert runner.last_user_input[1]["type"] == "image_url" + assert ( + runner.last_user_input[1]["image_url"]["url"] + == "https://example.com/signed.png" ) - assert result.intent.route == "TASK_EXECUTION" - assert result.execution is not None - assert result.execution.overall_status == "SUCCESS" - assert fake_runner.execution_calls == 1 + +@pytest.mark.asyncio +async def test_orchestrator_emits_worker_output_on_text_end( + monkeypatch: pytest.MonkeyPatch, +) -> None: + pipeline = _FakePipeline() + runner = _FakeRunner() + monkeypatch.setattr( + "core.agentscope.runtime.orchestrator.build_stage_toolkit", + lambda **_: None, + ) + orchestrator = AgentScopeRuntimeOrchestrator(pipeline=pipeline, runner=runner) + + await orchestrator.run( + command=_run_command_with_binary(), + owner_id=UUID("00000000-0000-0000-0000-000000000001"), + user_context=_user_context(), + session=None, + ) + + emitted = [item["event"] for item in pipeline.events] + text_end = next(item for item in emitted if item.get("type") == "text.end") + assert text_end["data"]["workerAgentOutput"]["answer"] == "done" + assert any(item.get("type") == "run.finished" for item in emitted) diff --git a/backend/tests/unit/schemas/messages/test_chat_message_schema.py b/backend/tests/unit/schemas/messages/test_chat_message_schema.py new file mode 100644 index 0000000..2049fb4 --- /dev/null +++ b/backend/tests/unit/schemas/messages/test_chat_message_schema.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import uuid4 + +from schemas.messages.chat_message import AgentChatMessage + + +def test_agent_chat_message_schema_matches_messages_columns() -> None: + now = datetime.now(UTC) + payload = { + "id": uuid4(), + "seq": 3, + "role": "assistant", + "content": "hello", + "metadata": {"run_id": "run-1"}, + "timestamp": now, + } + + message = AgentChatMessage.model_validate(payload) + + assert message.seq == 3 + assert message.role == "assistant" + assert message.content == "hello" + assert message.metadata is not None + if isinstance(message.metadata, dict): + assert message.metadata == {"run_id": "run-1"} + else: + assert message.metadata.model_dump(exclude_none=True) == {"run_id": "run-1"} diff --git a/backend/tests/unit/v1/agent/test_repository.py b/backend/tests/unit/v1/agent/test_repository.py index 0ac5685..8fbcc83 100644 --- a/backend/tests/unit/v1/agent/test_repository.py +++ b/backend/tests/unit/v1/agent/test_repository.py @@ -6,7 +6,6 @@ from uuid import uuid4 import pytest -from core.config.settings import config from models.agent_chat_message import AgentChatMessageRole from v1.agent.repository import AgentRepository @@ -36,243 +35,27 @@ class _FakeSession: self.flushed = True -class _FakeToolResultStorage: - def __init__(self, payload: dict[str, object] | None) -> None: - self._payload = payload - - async def read_json(self, *, bucket: str, path: str) -> dict[str, object] | None: - del bucket, path - return self._payload - - @pytest.mark.asyncio -async def test_tool_message_hydrates_content_from_object_storage() -> None: - repository = AgentRepository( - session=SimpleNamespace(), # type: ignore[arg-type] - tool_result_storage=_FakeToolResultStorage( - { - "toolName": "front.navigate_to_route", - "result": {"ok": True, "applied": True, "content": "已跳转"}, - } - ), - ) +async def test_snapshot_message_returns_raw_db_columns() -> None: + repository = AgentRepository(session=SimpleNamespace()) # type: ignore[arg-type] + now = datetime.now(timezone.utc) message = SimpleNamespace( id=uuid4(), + session_id=uuid4(), + seq=7, role=AgentChatMessageRole.TOOL, - created_at=datetime.now(timezone.utc), content='{"offloaded":true}', - metadata_json={ - "tool_call_id": "call-1", - "storage_bucket": config.storage.bucket, - "storage_path": "tool-results/run-1/call-1.json", - }, + metadata_json={"tool_call_id": "call-1"}, + created_at=now, ) payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] - assert payload["toolCallId"] == "call-1" - assert payload["content"] == "已跳转" - - -@pytest.mark.asyncio -async def test_tool_message_hydrates_ui_from_ui_schema_field() -> None: - repository = AgentRepository( - session=SimpleNamespace(), # type: ignore[arg-type] - tool_result_storage=_FakeToolResultStorage( - { - "toolName": "calendar_write", - "ui_schema": { - "type": "calendar_operation.v1", - "version": "v1", - "data": {"ok": True, "operation": "create"}, - "actions": [], - }, - } - ), - ) - message = SimpleNamespace( - id=uuid4(), - role=AgentChatMessageRole.TOOL, - created_at=datetime.now(timezone.utc), - content="已创建日程:项目评审(明天 10:00)", - metadata_json={ - "tool_call_id": "call-3", - "storage_bucket": config.storage.bucket, - "storage_path": "tool-results/run-1/call-3.json", - }, - ) - - payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] - - assert payload["toolCallId"] == "call-3" - assert payload["content"] == "已创建日程:项目评审(明天 10:00)" - ui = payload.get("ui") - assert isinstance(ui, dict) - assert ui["type"] == "calendar_operation.v1" - - -@pytest.mark.asyncio -async def test_tool_message_keeps_inline_content_when_storage_payload_missing() -> None: - repository = AgentRepository( - session=SimpleNamespace(), # type: ignore[arg-type] - tool_result_storage=_FakeToolResultStorage(None), - ) - message = SimpleNamespace( - id=uuid4(), - role=AgentChatMessageRole.TOOL, - created_at=datetime.now(timezone.utc), - content="inline-tool-content", - metadata_json={ - "tool_call_id": "call-2", - "storage_bucket": config.storage.bucket, - "storage_path": "tool-results/run-1/call-2.json", - }, - ) - - payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] - - assert payload["toolCallId"] == "call-2" - assert payload["content"] == "inline-tool-content" - - -@pytest.mark.asyncio -async def test_tool_message_skips_storage_when_path_not_matching_session() -> None: - repository = AgentRepository( - session=SimpleNamespace(), # type: ignore[arg-type] - tool_result_storage=_FakeToolResultStorage( - { - "ui_schema": { - "type": "calendar_operation.v1", - "version": "v1", - "data": {"ok": True}, - "actions": [], - } - } - ), - ) - message = SimpleNamespace( - id=uuid4(), - session_id=uuid4(), - role=AgentChatMessageRole.TOOL, - created_at=datetime.now(timezone.utc), - content="summary", - metadata_json={ - "tool_call_id": "call-x", - "storage_bucket": config.storage.bucket, - "storage_path": "tool-results/foreign-session/call-y.json", - }, - ) - - payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] - - assert payload["content"] == "summary" - assert "ui" not in payload - - -@pytest.mark.asyncio -async def test_tool_message_rejects_path_traversal() -> None: - repository = AgentRepository( - session=SimpleNamespace(), # type: ignore[arg-type] - tool_result_storage=_FakeToolResultStorage( - { - "ui_schema": { - "type": "calendar_operation.v1", - "version": "v1", - "data": {"ok": True}, - "actions": [], - } - } - ), - ) - message = SimpleNamespace( - id=uuid4(), - session_id=uuid4(), - role=AgentChatMessageRole.TOOL, - created_at=datetime.now(timezone.utc), - content="summary", - metadata_json={ - "tool_call_id": "call-z", - "storage_bucket": config.storage.bucket, - "storage_path": "tool-results/ok/../../evil/call-z.json", - }, - ) - - payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] - - assert payload["content"] == "summary" - assert "ui" not in payload - - -@pytest.mark.asyncio -async def test_tool_message_supports_legacy_storage_path() -> None: - repository = AgentRepository( - session=SimpleNamespace(), # type: ignore[arg-type] - tool_result_storage=_FakeToolResultStorage( - { - "ui_schema": { - "type": "calendar_operation.v1", - "version": "v1", - "data": {"ok": True}, - "actions": [], - }, - "content": "legacy content", - } - ), - ) - message = SimpleNamespace( - id=uuid4(), - session_id=uuid4(), - role=AgentChatMessageRole.TOOL, - created_at=datetime.now(timezone.utc), - content='{"offloaded":true}', - metadata_json={ - "tool_call_id": "call-legacy", - "storage_bucket": config.storage.bucket, - "storage_path": "tool-results/old-run/call-legacy.json", - }, - ) - - payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] - - assert payload["content"] == "legacy content" - ui = payload.get("ui") - assert isinstance(ui, dict) - assert ui["type"] == "calendar_operation.v1" - - -@pytest.mark.asyncio -async def test_user_message_snapshot_includes_renderable_attachments() -> None: - repository = AgentRepository( - session=SimpleNamespace(), # type: ignore[arg-type] - ) - message = SimpleNamespace( - id=uuid4(), - session_id=uuid4(), - role=AgentChatMessageRole.USER, - created_at=datetime.now(timezone.utc), - content="请分析这张图", - metadata_json={ - "attachments": [ - { - "bucket": "agent-chat-attachments", - "path": "agent-inputs/u1/t1/r1/m1/att-1.png", - "mimeType": "image/png", - } - ] - }, - ) - - payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] - - assert payload["role"] == "user" - assert payload["content"] == "请分析这张图" - attachments = payload.get("attachments") - assert isinstance(attachments, list) - assert len(attachments) == 1 - first = attachments[0] - assert isinstance(first, dict) - assert first["mimeType"] == "image/png" - assert isinstance(first.get("previewPath"), str) + assert payload["seq"] == 7 + assert payload["role"] == "tool" + assert payload["content"] == '{"offloaded":true}' + assert payload["metadata"] == {"tool_call_id": "call-1"} + assert "timestamp" in payload @pytest.mark.asyncio @@ -318,32 +101,3 @@ async def test_persist_user_message_keeps_existing_session_title() -> None: assert session_row.title == "已有标题" assert session_row.message_count == 2 - - -@pytest.mark.asyncio -async def test_get_message_attachment_reference_returns_item() -> None: - session_id = str(uuid4()) - message_id = str(uuid4()) - message = SimpleNamespace( - metadata_json={ - "attachments": [ - { - "bucket": "bucket-test", - "path": "agent-inputs/u/t/r/a.png", - "mimeType": "image/png", - } - ] - } - ) - fake_session = _FakeSession(message) - repository = AgentRepository(session=fake_session) # type: ignore[arg-type] - - ref = await repository.get_message_attachment_reference( - session_id=session_id, - message_id=message_id, - attachment_index=0, - ) - - assert ref is not None - assert ref["bucket"] == "bucket-test" - assert ref["mimeType"] == "image/png" diff --git a/backend/tests/unit/v1/agent/test_router_guards.py b/backend/tests/unit/v1/agent/test_router_guards.py index a28b9f2..2eaedaf 100644 --- a/backend/tests/unit/v1/agent/test_router_guards.py +++ b/backend/tests/unit/v1/agent/test_router_guards.py @@ -12,48 +12,6 @@ from core.auth.models import CurrentUser from v1.agent import router as agent_router -@pytest.mark.asyncio -async def test_allow_run_request_fails_closed_when_redis_unavailable( - monkeypatch: pytest.MonkeyPatch, -) -> None: - async def _raise_redis_error(): - raise RuntimeError("redis unavailable") - - monkeypatch.setattr(agent_router, "get_or_init_redis_client", _raise_redis_error) - - allowed = await agent_router._allow_run_request(user_id="user-1") - - assert allowed is False - - -@pytest.mark.asyncio -async def test_acquire_sse_slot_fails_closed_when_redis_unavailable( - monkeypatch: pytest.MonkeyPatch, -) -> None: - async def _raise_redis_error(): - raise RuntimeError("redis unavailable") - - monkeypatch.setattr(agent_router, "get_or_init_redis_client", _raise_redis_error) - - allowed = await agent_router._acquire_sse_slot(user_id="user-1") - - assert allowed is False - - -@pytest.mark.asyncio -async def test_allow_transcribe_request_fails_closed_when_redis_unavailable( - monkeypatch: pytest.MonkeyPatch, -) -> None: - async def _raise_redis_error(): - raise RuntimeError("redis unavailable") - - monkeypatch.setattr(agent_router, "get_or_init_redis_client", _raise_redis_error) - - allowed = await agent_router._allow_transcribe_request(user_id="user-1") - - assert allowed is False - - def _resume_input_with_tool_message() -> RunAgentInput: return RunAgentInput.model_validate( { @@ -82,13 +40,7 @@ async def test_enqueue_resume_rejects_without_tool_contract() -> None: "threadId": "00000000-0000-0000-0000-000000000001", "runId": "run-resume-invalid", "state": {}, - "messages": [ - { - "id": "u1", - "role": "user", - "content": "continue", - } - ], + "messages": [{"id": "u1", "role": "user", "content": "continue"}], "tools": [], "context": [], "forwardedProps": {}, @@ -109,10 +61,6 @@ async def test_enqueue_resume_rejects_without_tool_contract() -> None: ) assert exc_info.value.status_code == 422 - assert ( - exc_info.value.detail - == "RunAgentInput.messages requires a tool message with toolCallId for resume" - ) @pytest.mark.asyncio @@ -141,7 +89,6 @@ async def test_enqueue_resume_rejects_when_rate_limited( ) assert exc_info.value.status_code == 429 - assert exc_info.value.detail == "Too many run requests" @pytest.mark.asyncio @@ -173,96 +120,4 @@ async def test_enqueue_resume_accepts_valid_tool_contract( ) assert result.task_id == "task-resume-1" - assert result.thread_id == "00000000-0000-0000-0000-000000000001" assert result.run_id == "run-resume-1" - - -@pytest.mark.asyncio -async def test_stream_events_retries_on_redis_timeout( - monkeypatch: pytest.MonkeyPatch, -) -> None: - async def _acquire(*, user_id: str) -> bool: - del user_id - return True - - async def _release(*, user_id: str) -> None: - del user_id - - monkeypatch.setattr(agent_router, "_acquire_sse_slot", _acquire) - monkeypatch.setattr(agent_router, "_release_sse_slot", _release) - - class _Request: - async def is_disconnected(self) -> bool: - return False - - class _Service: - def __init__(self) -> None: - self.calls = 0 - - async def stream_events(self, **kwargs): # noqa: ANN003 - del kwargs - self.calls += 1 - if self.calls == 1: - raise RuntimeError("Timeout reading from localhost:6379") - if self.calls == 2: - return [{"id": "1-0", "event": {"type": "RUN_FINISHED"}}] - return [] - - response = await agent_router.stream_events( - request=cast(Any, _Request()), - thread_id="00000000-0000-0000-0000-000000000001", - service=cast(Any, _Service()), - current_user=CurrentUser(id=uuid4(), email="user@example.com"), - last_event_id=None, - idle_limit=2, - ) - - chunks: list[str] = [] - async for chunk in response.body_iterator: - chunks.append(str(chunk)) - if any("RUN_FINISHED" in item for item in chunks): - break - - merged = "".join(chunks) - assert "event: RUN_FINISHED" in merged - - -@pytest.mark.asyncio -async def test_get_attachment_preview_rejects_negative_index() -> None: - class _Service: - async def get_attachment_preview(self, **kwargs): # noqa: ANN003 - del kwargs - raise AssertionError("get_attachment_preview should not be called") - - with pytest.raises(HTTPException) as exc_info: - await agent_router.get_attachment_preview( - thread_id="00000000-0000-0000-0000-000000000001", - message_id="00000000-0000-0000-0000-000000000010", - attachment_index=-1, - service=cast(Any, _Service()), - current_user=CurrentUser(id=uuid4(), email="user@example.com"), - ) - - assert exc_info.value.status_code == 422 - - -@pytest.mark.asyncio -async def test_get_attachment_preview_returns_streaming_response() -> None: - class _Service: - async def get_attachment_preview(self, **kwargs): # noqa: ANN003 - del kwargs - return b"png-bytes", "image/png" - - response = await agent_router.get_attachment_preview( - thread_id="00000000-0000-0000-0000-000000000001", - message_id="00000000-0000-0000-0000-000000000010", - attachment_index=0, - service=cast(Any, _Service()), - current_user=CurrentUser(id=uuid4(), email="user@example.com"), - ) - chunks: list[bytes] = [] - async for chunk in response.body_iterator: - chunks.append(cast(bytes, chunk)) - - assert response.media_type == "image/png" - assert b"".join(chunks) == b"png-bytes" diff --git a/backend/tests/unit/v1/agent/test_service.py b/backend/tests/unit/v1/agent/test_service.py index f52dd4b..3cf3a09 100644 --- a/backend/tests/unit/v1/agent/test_service.py +++ b/backend/tests/unit/v1/agent/test_service.py @@ -1,25 +1,22 @@ from __future__ import annotations from datetime import date -from types import SimpleNamespace +from urllib.parse import quote from uuid import UUID from ag_ui.core import RunAgentInput from fastapi import HTTPException import pytest -from sqlalchemy.exc import IntegrityError + +import v1.agent.service as agent_service_module from core.auth.models import CurrentUser from core.config.settings import config -import v1.agent.service as agent_service_module -from v1.agent.service import AgentService, AsrService +from v1.agent.service import AgentService class _FakeRepository: def __init__(self) -> None: self.committed = False - self.rolled_back = False - self.deleted_session_id: str | None = None - self.created_with_session_id: str | None = None self.persisted_user_messages: list[dict[str, object]] = [] async def get_session_owner(self, *, session_id: str) -> str: @@ -31,33 +28,23 @@ class _FakeRepository: self, *, user_id: str, session_id: str | None = None ) -> str: del user_id - self.created_with_session_id = session_id return session_id or "00000000-0000-0000-0000-000000000999" async def commit(self) -> None: self.committed = True async def rollback(self) -> None: - self.rolled_back = True - - async def delete_session(self, *, session_id: str) -> None: - self.deleted_session_id = session_id + return None async def get_history_day( self, *, session_id: str, before: date | None ) -> dict[str, object] | None: - del session_id - if before is not None and before <= date(2026, 3, 6): - return None - return { - "day": "2026-03-06", - "hasMore": False, - "messages": [{"id": "m1", "role": "assistant", "content": "hello"}], - } + del session_id, before + return None async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: del user_id - return "00000000-0000-0000-0000-000000000001" + return None async def persist_user_message( self, @@ -76,22 +63,6 @@ class _FakeRepository: } ) - async def get_message_attachment_reference( - self, - *, - session_id: str, - message_id: str, - attachment_index: int, - ) -> dict[str, str] | None: - del session_id, message_id - if attachment_index != 0: - return None - return { - "bucket": config.storage.bucket, - "path": "agent-inputs/00000000-0000-0000-0000-000000000001/00000000-0000-0000-0000-000000000001/run-1/attachment-0-a.png", - "mimeType": "image/png", - } - class _FakeQueue: def __init__(self) -> None: @@ -100,33 +71,20 @@ class _FakeQueue: async def enqueue( self, *, command: dict[str, object], dedup_key: str | None ) -> str: - self.commands.append(command) del dedup_key + self.commands.append(command) return "task-1" -class _FailingQueue: - async def enqueue( - self, *, command: dict[str, object], dedup_key: str | None - ) -> str: - del command, dedup_key - raise RuntimeError("enqueue failed") - - class _FakeStream: async def read( self, *, session_id: str, last_event_id: str | None ) -> list[dict[str, object]]: - del session_id - return [ - {"id": "2-0", "event": {"type": "RUN_STARTED"}, "cursor": last_event_id} - ] + del session_id, last_event_id + return [] class _FakeAttachmentStorage: - def __init__(self) -> None: - self.calls: list[dict[str, object]] = [] - async def upload_bytes( self, *, @@ -135,65 +93,12 @@ class _FakeAttachmentStorage: content: bytes, content_type: str, ) -> str: - self.calls.append( - { - "bucket": bucket, - "path": path, - "content": content, - "content_type": content_type, - } - ) + del bucket, content, content_type return path - async def download_bytes(self, *, bucket: str, path: str) -> bytes: - self.calls.append( - { - "bucket": bucket, - "path": path, - "download": True, - } - ) - return b"png-bytes" - - async def create_signed_url( - self, - *, - bucket: str, - path: str, - expires_in_seconds: int, - ) -> str: - self.calls.append( - { - "bucket": bucket, - "path": path, - "signed": True, - "expires_in_seconds": expires_in_seconds, - } - ) - return f"https://signed.example/{path}?exp={expires_in_seconds}" - - def parse_signed_url(self, url: str) -> tuple[str, str]: - if url.startswith("https://signed.example/"): - path = url.replace("https://signed.example/", "").split("?")[0] - return "agent-test-bucket", path - raise RuntimeError("Invalid signed URL") - - -class _AlwaysFailAttachmentStorage: - async def upload_bytes( - self, - *, - bucket: str, - path: str, - content: bytes, - content_type: str, - ) -> str: - del bucket, path, content, content_type - raise RuntimeError("upload failed") - async def download_bytes(self, *, bucket: str, path: str) -> bytes: del bucket, path - raise RuntimeError("download failed") + return b"" async def create_signed_url( self, @@ -202,12 +107,16 @@ class _AlwaysFailAttachmentStorage: path: str, expires_in_seconds: int, ) -> str: - del bucket, path, expires_in_seconds - raise RuntimeError("sign failed") + del expires_in_seconds + return f"https://signed.example/{bucket}/{path}" def parse_signed_url(self, url: str) -> tuple[str, str]: - del url - raise RuntimeError("parse failed") + parsed = url.split("/storage/v1/object/sign/") + if len(parsed) != 2: + raise RuntimeError("invalid") + bucket, path = parsed[1].split("/", 1) + path = path.split("?", 1)[0] + return bucket, path def _user() -> CurrentUser: @@ -217,13 +126,22 @@ def _user() -> CurrentUser: ) -def _build_run_input(*, thread_id: str, run_id: str) -> RunAgentInput: +def _build_run_input(*, url: str) -> RunAgentInput: return RunAgentInput.model_validate( { - "threadId": thread_id, - "runId": run_id, + "threadId": "00000000-0000-0000-0000-000000000001", + "runId": "run-1", "state": {}, - "messages": [{"id": "u1", "role": "user", "content": "hello"}], + "messages": [ + { + "id": "u1", + "role": "user", + "content": [ + {"type": "text", "text": "hello"}, + {"type": "binary", "mimeType": "image/png", "url": url}, + ], + } + ], "tools": [], "context": [], "forwardedProps": {}, @@ -231,454 +149,69 @@ def _build_run_input(*, thread_id: str, run_id: str) -> RunAgentInput: ) -async def test_resume_idempotency_uses_redis_lock_and_task_key() -> None: +@pytest.mark.asyncio +async def test_enqueue_run_rejects_non_project_host_signed_url(monkeypatch) -> None: + monkeypatch.setattr( + agent_service_module.config.storage, "bucket", "agent-test-bucket" + ) service = AgentService( repository=_FakeRepository(), queue=_FakeQueue(), stream=_FakeStream(), - ) - user = _user() - run_input = _build_run_input( - thread_id="00000000-0000-0000-0000-000000000001", - run_id="run-1", - ) - - first = await service.enqueue_resume( - thread_id="00000000-0000-0000-0000-000000000001", - run_input=run_input, - current_user=user, - ) - second = await service.enqueue_resume( - thread_id="00000000-0000-0000-0000-000000000001", - run_input=run_input, - current_user=user, - ) - - assert first.task_id == second.task_id - - -async def test_enqueue_run_creates_missing_thread_session() -> None: - repository = _FakeRepository() - queue = _FakeQueue() - service = AgentService( - repository=repository, - queue=queue, - stream=_FakeStream(), + attachment_storage=_FakeAttachmentStorage(), ) run_input = _build_run_input( - thread_id="00000000-0000-0000-0000-000000000999", - run_id="run-1", - ) - - accepted = await service.enqueue_run( - run_input=run_input, - current_user=_user(), - ) - - assert accepted.thread_id == "00000000-0000-0000-0000-000000000999" - assert accepted.run_id == "run-1" - assert accepted.created is True - assert repository.created_with_session_id == "00000000-0000-0000-0000-000000000999" - assert repository.committed is True - assert queue.commands[0]["user_token"] is None - - -async def test_enqueue_run_uses_explicit_user_token() -> None: - repository = _FakeRepository() - queue = _FakeQueue() - service = AgentService( - repository=repository, - queue=queue, - stream=_FakeStream(), - ) - run_input = _build_run_input( - thread_id="00000000-0000-0000-0000-000000000001", - run_id="run-1", - ) - - await service.enqueue_run( - run_input=run_input, - current_user=_user(), - user_token="Bearer access-token-1", - ) - - assert queue.commands - assert queue.commands[0]["user_token"] == "access-token-1" - - -async def test_enqueue_run_keeps_created_session_when_enqueue_fails() -> None: - repository = _FakeRepository() - service = AgentService( - repository=repository, - queue=_FailingQueue(), - stream=_FakeStream(), - ) - run_input = _build_run_input( - thread_id="00000000-0000-0000-0000-000000000999", - run_id="run-1", - ) - - try: - await service.enqueue_run( - run_input=run_input, - current_user=_user(), - ) - raise AssertionError("expected RuntimeError") - except RuntimeError as exc: - assert str(exc) == "enqueue failed" - - assert repository.deleted_session_id is None - - -async def test_enqueue_run_handles_session_create_race() -> None: - class _RaceRepository(_FakeRepository): - def __init__(self) -> None: - super().__init__() - self.create_calls = 0 - - async def get_session_owner(self, *, session_id: str) -> str: - if self.create_calls == 0: - raise HTTPException(status_code=404, detail="Session not found") - return "00000000-0000-0000-0000-000000000001" - - async def create_session_for_user( - self, *, user_id: str, session_id: str | None = None - ) -> str: - del user_id, session_id - self.create_calls += 1 - raise IntegrityError("insert", {}, Exception("duplicate key")) - - repository = _RaceRepository() - service = AgentService( - repository=repository, - queue=_FakeQueue(), - stream=_FakeStream(), - ) - run_input = _build_run_input( - thread_id="00000000-0000-0000-0000-000000000999", - run_id="run-1", - ) - - accepted = await service.enqueue_run( - run_input=run_input, - current_user=_user(), - ) - - assert accepted.created is False - assert repository.rolled_back is True - - -async def test_enqueue_run_parses_signed_url_and_injects_metadata( - monkeypatch, -) -> None: - monkeypatch.setattr( - agent_service_module.config.storage, "bucket", "agent-test-bucket" - ) - repository = _FakeRepository() - attachment_storage = _FakeAttachmentStorage() - service = AgentService( - repository=repository, - queue=_FakeQueue(), - stream=_FakeStream(), - attachment_storage=attachment_storage, - ) - run_input = RunAgentInput.model_validate( - { - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-with-image", - "state": {}, - "messages": [ - { - "id": "u1", - "role": "user", - "content": [ - {"type": "text", "text": "帮我看下这张图"}, - { - "type": "binary", - "mimeType": "image/png", - "url": "https://signed.example/agent-inputs/u/t/r/file.png", - }, - ], - } - ], - "tools": [], - "context": [], - } - ) - - accepted = await service.enqueue_run(run_input=run_input, current_user=_user()) - - assert accepted.task_id == "task-1" - assert repository.persisted_user_messages - persisted = repository.persisted_user_messages[0] - assert persisted["session_id"] == "00000000-0000-0000-0000-000000000001" - assert persisted["run_id"] == "run-with-image" - metadata = persisted["metadata"] - assert isinstance(metadata, dict) - attachments = metadata.get("user_message_attachments") - assert isinstance(attachments, dict) - assert attachments["bucket"] == "agent-test-bucket" - assert attachments["path"] == "agent-inputs/u/t/r/file.png" - assert attachments["mime_type"] == "image/png" - - -async def test_enqueue_run_with_invalid_signed_url_still_succeeds( - monkeypatch, -) -> None: - monkeypatch.setattr( - agent_service_module.config.storage, "bucket", "agent-test-bucket" - ) - repository = _FakeRepository() - attachment_storage = _FakeAttachmentStorage() - service = AgentService( - repository=repository, - queue=_FakeQueue(), - stream=_FakeStream(), - attachment_storage=attachment_storage, - ) - run_input = RunAgentInput.model_validate( - { - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-with-invalid-url", - "state": {}, - "messages": [ - { - "id": "u1", - "role": "user", - "content": [ - {"type": "text", "text": "帮我看下这张图"}, - { - "type": "binary", - "mimeType": "image/png", - "url": "invalid-url-format", - }, - ], - } - ], - "tools": [], - "context": [], - } - ) - - accepted = await service.enqueue_run(run_input=run_input, current_user=_user()) - - assert accepted.task_id == "task-1" - assert repository.persisted_user_messages - persisted = repository.persisted_user_messages[0] - metadata = persisted["metadata"] - assert metadata is None - - -async def test_enqueue_run_rejects_unsupported_attachment_type( - monkeypatch, -) -> None: - monkeypatch.setattr( - agent_service_module.config.storage, "bucket", "agent-test-bucket" - ) - repository = _FakeRepository() - attachment_storage = _FakeAttachmentStorage() - service = AgentService( - repository=repository, - queue=_FakeQueue(), - stream=_FakeStream(), - attachment_storage=attachment_storage, - ) - run_input = RunAgentInput.model_validate( - { - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-with-bad-image", - "state": {}, - "messages": [ - { - "id": "u1", - "role": "user", - "content": [ - {"type": "text", "text": "请看附件"}, - { - "type": "binary", - "mimeType": "image/gif", - "url": "https://signed.example/upload.gif", - }, - ], - } - ], - "tools": [], - "context": [], - "forwardedProps": { - "attachments": [ - { - "bucket": "agent-test-bucket", - "path": "agent-inputs/00000000-0000-0000-0000-000000000001/00000000-0000-0000-0000-000000000001/upload.gif", - "mimeType": "image/gif", - } - ] - }, - } + url="https://evil.example.com/storage/v1/object/sign/agent-test-bucket/a.png?token=1" ) with pytest.raises(HTTPException) as exc_info: await service.enqueue_run(run_input=run_input, current_user=_user()) assert exc_info.value.status_code == 422 - assert exc_info.value.detail == "Unsupported attachment type" - assert attachment_storage.calls == [] + assert exc_info.value.detail == "INVALID_BINARY_URL_HOST" -async def test_enqueue_run_rejects_attachment_too_large( +@pytest.mark.asyncio +async def test_enqueue_run_persists_attachment_and_queue_without_user_token( monkeypatch, ) -> None: - monkeypatch.setattr(agent_service_module, "_MAX_ATTACHMENT_BYTES", 4) monkeypatch.setattr( agent_service_module.config.storage, "bucket", "agent-test-bucket" ) - repository = _FakeRepository() - attachment_storage = _FakeAttachmentStorage() - service = AgentService( - repository=repository, - queue=_FakeQueue(), - stream=_FakeStream(), - attachment_storage=attachment_storage, - ) - run_input = RunAgentInput.model_validate( - { - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-with-big-image", - "state": {}, - "messages": [ - { - "id": "u1", - "role": "user", - "content": [ - {"type": "text", "text": "请看附件"}, - { - "type": "binary", - "mimeType": "image/png", - "url": "https://signed.example/upload.png", - }, - ], - } - ], - "tools": [], - "context": [], - "forwardedProps": { - "attachments": [ - { - "bucket": "agent-test-bucket", - "path": "agent-inputs/00000000-0000-0000-0000-000000000001/00000000-0000-0000-0000-000000000001/upload.png", - "mimeType": "image/png", - } - ] - }, - } - ) - - with pytest.raises(HTTPException) as exc_info: - await service.enqueue_run(run_input=run_input, current_user=_user()) - - assert exc_info.value.status_code == 413 - assert exc_info.value.detail == "Attachment too large" - assert len(attachment_storage.calls) == 1 - assert attachment_storage.calls[0]["download"] is True - - -async def test_enqueue_run_accepts_binary_url_and_persists_metadata() -> None: repository = _FakeRepository() queue = _FakeQueue() - attachment_storage = _FakeAttachmentStorage() service = AgentService( repository=repository, queue=queue, stream=_FakeStream(), - attachment_storage=attachment_storage, + attachment_storage=_FakeAttachmentStorage(), ) - run_input = RunAgentInput.model_validate( - { - "threadId": "00000000-0000-0000-0000-000000000001", - "runId": "run-with-binary-url", - "state": {}, - "messages": [ - { - "id": "u1", - "role": "user", - "content": [ - {"type": "text", "text": "请分析"}, - { - "type": "binary", - "mimeType": "image/png", - "url": "https://signed.example/upload-1.png", - }, - ], - } - ], - "tools": [], - "context": [], - "forwardedProps": { - "attachments": [ - { - "bucket": config.storage.bucket, - "path": "agent-inputs/00000000-0000-0000-0000-000000000001/00000000-0000-0000-0000-000000000001/upload-1.png", - "mimeType": "image/png", - } - ] - }, - } + base_url = str(config.supabase.url).rstrip("/") + safe_path = quote( + "agent-inputs/00000000-0000-0000-0000-000000000001/" + "00000000-0000-0000-0000-000000000001/uploads/a.png" + ) + run_input = _build_run_input( + url=f"{base_url}/storage/v1/object/sign/agent-test-bucket/{safe_path}?token=1" ) accepted = await service.enqueue_run(run_input=run_input, current_user=_user()) assert accepted.task_id == "task-1" - persisted = repository.persisted_user_messages[-1] + persisted = repository.persisted_user_messages[0] metadata = persisted["metadata"] assert isinstance(metadata, dict) - attachments = metadata.get("attachments") - assert isinstance(attachments, list) - assert attachments[0]["path"].endswith("upload-1.png") - queue_input = queue.commands[-1]["run_input"] - assert isinstance(queue_input, dict) - content = queue_input["messages"][0]["content"] - assert isinstance(content, list) - assert content[1]["type"] == "binary" - assert content[1]["url"] == "https://signed.example/upload-1.png" + attachment = metadata["user_message_attachments"] + assert attachment["bucket"] == "agent-test-bucket" + command = queue.commands[0] + assert "user_token" not in command -async def test_get_history_snapshot_wraps_history_day_as_state_snapshot_event() -> None: - service = AgentService( - repository=_FakeRepository(), - queue=_FakeQueue(), - stream=_FakeStream(), +@pytest.mark.asyncio +async def test_create_attachment_signed_url_returns_url(monkeypatch) -> None: + monkeypatch.setattr( + agent_service_module.config.storage, "bucket", "agent-test-bucket" ) - - event = await service.get_history_snapshot( - thread_id="00000000-0000-0000-0000-000000000001", - before=date(2026, 3, 7), - current_user=_user(), - ) - - assert event["type"] == "STATE_SNAPSHOT" - assert event["threadId"] == "00000000-0000-0000-0000-000000000001" - snapshot = event["snapshot"] - assert isinstance(snapshot, dict) - assert snapshot["scope"] == "history_day" - assert snapshot["day"] == "2026-03-06" - assert snapshot["messages"][0]["id"] == "m1" - - -async def test_get_user_history_snapshot_uses_latest_thread_when_absent() -> None: - service = AgentService( - repository=_FakeRepository(), - queue=_FakeQueue(), - stream=_FakeStream(), - ) - event = await service.get_user_history_snapshot( - current_user=_user(), - thread_id=None, - before=None, - ) - assert event["type"] == "STATE_SNAPSHOT" - assert event["threadId"] == "00000000-0000-0000-0000-000000000001" - - -async def test_get_attachment_preview_returns_payload_and_mime() -> None: service = AgentService( repository=_FakeRepository(), queue=_FakeQueue(), @@ -686,120 +219,36 @@ async def test_get_attachment_preview_returns_payload_and_mime() -> None: attachment_storage=_FakeAttachmentStorage(), ) - payload, mime_type = await service.get_attachment_preview( - thread_id="00000000-0000-0000-0000-000000000001", - message_id="00000000-0000-0000-0000-000000000010", - attachment_index=0, + payload = await service.create_attachment_signed_url( + bucket="agent-test-bucket", + path="agent-inputs/00000000-0000-0000-0000-000000000001/thread-x/uploads/a.png", current_user=_user(), ) - assert payload == b"png-bytes" - assert mime_type == "image/png" + assert payload["bucket"] == "agent-test-bucket" + assert payload["path"].endswith("/a.png") + assert payload["url"].startswith("https://signed.example/") -async def test_get_attachment_preview_rejects_invalid_path() -> None: - class _BadPathRepository(_FakeRepository): - async def get_message_attachment_reference( - self, - *, - session_id: str, - message_id: str, - attachment_index: int, - ) -> dict[str, str] | None: - del session_id, message_id, attachment_index - return { - "bucket": "bucket-test", - "path": "agent-inputs/other-user/other-thread/run-1/a.png", - "mimeType": "image/png", - } - +@pytest.mark.asyncio +async def test_create_attachment_signed_url_rejects_out_of_scope_path( + monkeypatch, +) -> None: + monkeypatch.setattr( + agent_service_module.config.storage, "bucket", "agent-test-bucket" + ) service = AgentService( - repository=_BadPathRepository(), + repository=_FakeRepository(), queue=_FakeQueue(), stream=_FakeStream(), attachment_storage=_FakeAttachmentStorage(), ) with pytest.raises(HTTPException) as exc_info: - await service.get_attachment_preview( - thread_id="00000000-0000-0000-0000-000000000001", - message_id="00000000-0000-0000-0000-000000000010", - attachment_index=0, + await service.create_attachment_signed_url( + bucket="agent-test-bucket", + path="agent-inputs/other-user/thread-x/uploads/a.png", current_user=_user(), ) - assert exc_info.value.status_code == 403 - - -async def test_asr_service_parses_dict_output_sentence(monkeypatch) -> None: - result = SimpleNamespace( - status_code=200, - message="ok", - output={"sentence": {"text": "你好,世界"}}, - request_id="req-test", - ) - - class _FakeRecognition: - def __init__(self, **kwargs) -> None: - del kwargs - - def call(self, *, file: str): - del file - return result - - monkeypatch.setattr(agent_service_module, "Recognition", _FakeRecognition) - monkeypatch.setattr(AsrService, "_get_api_key", lambda self: "test-key") - service = AsrService() - - transcript = await service.transcribe_file("/tmp/test.wav", "test.wav") - - assert transcript == "你好,世界" - - -async def test_asr_service_parses_sentence_when_result_is_dict(monkeypatch) -> None: - result = { - "status_code": 200, - "message": "ok", - "output": {"sentence": {"text": "字典结果"}}, - "request_id": "req-dict", - } - - class _FakeRecognition: - def __init__(self, **kwargs) -> None: - del kwargs - - def call(self, *, file: str): - del file - return result - - monkeypatch.setattr(agent_service_module, "Recognition", _FakeRecognition) - monkeypatch.setattr(AsrService, "_get_api_key", lambda self: "test-key") - service = AsrService() - - transcript = await service.transcribe_file("/tmp/test.wav", "test.wav") - - assert transcript == "字典结果" - - -async def test_asr_service_returns_empty_when_sentence_missing(monkeypatch) -> None: - result = { - "status_code": 200, - "message": "ok", - "output": {}, - } - - class _FakeRecognition: - def __init__(self, **kwargs) -> None: - del kwargs - - def call(self, *, file: str): - del file - return result - - monkeypatch.setattr(agent_service_module, "Recognition", _FakeRecognition) - monkeypatch.setattr(AsrService, "_get_api_key", lambda self: "test-key") - service = AsrService() - - transcript = await service.transcribe_file("/tmp/test.wav", "test.wav") - - assert transcript == "" + assert exc_info.value.status_code == 422 diff --git a/docs/plans/2026-03-13-agent-runs-multimodal-implementation.md b/docs/plans/2026-03-13-agent-runs-multimodal-implementation.md new file mode 100644 index 0000000..82965dd --- /dev/null +++ b/docs/plans/2026-03-13-agent-runs-multimodal-implementation.md @@ -0,0 +1,261 @@ +# Agent Runs Multimodal Refactor Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 让 runs/resume 使用真实多模态图片输入,并将 worker/tool 按新结构化 metadata 规范落库。 + +**Architecture:** 保持现有 event pipeline,不引入旁路写库。请求入口完成 URL 安全边界校验;runtime 将 `binary` 转模型可识别 `image_url` block;event store 统一校验 `WorkerAgentOutput` / `ToolAgentOutput` 并完成 `content` 映射。 + +**Tech Stack:** FastAPI, Pydantic v2, SQLAlchemy AsyncSession, AgentScope, LiteLLM, Redis Stream + +--- + +### Task 1: Runs 输入安全边界 + +**Files:** +- Modify: `backend/src/core/agentscope/schemas/agui_input.py` +- Modify: `backend/src/v1/agent/router.py` +- Modify: `backend/src/v1/agent/service.py` +- Test: `backend/tests/unit/v1/agent/test_agent_router.py` + +**Step 1: Write the failing test** + +```python +def test_runs_rejects_non_project_signed_url(...) -> None: + payload = build_run_payload_with_binary_url("https://evil.example.com/storage/v1/object/sign/..." ) + resp = client.post("/api/v1/agent/runs", json=payload, headers=auth_headers) + assert resp.status_code == 422 +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest backend/tests/unit/v1/agent/test_agent_router.py::test_runs_rejects_non_project_signed_url -v` +Expected: FAIL(当前不会拦截该 URL) + +**Step 3: Write minimal implementation** + +```python +def validate_binary_signed_url_scope(*, url: str, user_id: UUID, thread_id: UUID) -> tuple[str, str]: + bucket, path = supabase_service.parse_signed_url(url) + # check host, bucket, path prefix agent-inputs/{user_id}/{thread_id}/uploads/ + return bucket, path +``` + +在 `runs/resume` 请求入口调用校验;若请求含 binary 且当前模型不支持视觉,抛 `HTTPException(status_code=422, ...)`。 + +**Step 4: Run test to verify it passes** + +Run: `pytest backend/tests/unit/v1/agent/test_agent_router.py::test_runs_rejects_non_project_signed_url -v` +Expected: PASS + +**Step 5: Commit** + +```bash +git add backend/src/core/agentscope/schemas/agui_input.py backend/src/v1/agent/router.py backend/src/v1/agent/service.py backend/tests/unit/v1/agent/test_agent_router.py +git commit -m "fix: enforce signed image url scope on runs" +``` + +### Task 2: Runtime 多模态直传(移除文本化图片) + +**Files:** +- Modify: `backend/src/core/agentscope/runtime/orchestrator.py` +- Modify: `backend/src/core/agentscope/prompts/agent_prompt.py` +- Test: `backend/tests/unit/core/agentscope/runtime/test_orchestrator.py` + +**Step 1: Write the failing test** + +```python +async def test_orchestrator_passes_image_url_block_to_runner() -> None: + command = build_run_input_with_binary("https://project.supabase.co/storage/v1/object/sign/...") + await orchestrator.run(..., command=command, ...) + assert fake_runner.user_input[1]["type"] == "image_url" +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest backend/tests/unit/core/agentscope/runtime/test_orchestrator.py::test_orchestrator_passes_image_url_block_to_runner -v` +Expected: FAIL(当前路径仍可能文本化) + +**Step 3: Write minimal implementation** + +```python +def _to_model_multimodal_blocks(content_blocks: list[dict[str, Any]]) -> list[dict[str, Any]]: + # text -> {type:"text", text:...} + # binary -> {type:"image_url", image_url:{url:...}} +``` + +将 runner 输入改为上述多模态块;禁止把图片块拼进普通字符串。 + +**Step 4: Run test to verify it passes** + +Run: `pytest backend/tests/unit/core/agentscope/runtime/test_orchestrator.py::test_orchestrator_passes_image_url_block_to_runner -v` +Expected: PASS + +**Step 5: Commit** + +```bash +git add backend/src/core/agentscope/runtime/orchestrator.py backend/src/core/agentscope/prompts/agent_prompt.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py +git commit -m "feat: pass image blocks as multimodal payload to model" +``` + +### Task 3: Worker 结构化落库(content=answer) + +**Files:** +- Modify: `backend/src/core/agentscope/events/store.py` +- Modify: `backend/src/core/agentscope/runtime/orchestrator.py` +- Test: `backend/tests/unit/core/agentscope/events/test_store.py` + +**Step 1: Write the failing test** + +```python +async def test_text_message_end_persists_worker_output_and_answer_content() -> None: + event = build_text_end_event(worker_agent_output={"answer": "ok", ...}) + await store.persist(event) + assert saved.content == "ok" + assert saved.metadata_json["worker_agent_output"]["answer"] == "ok" +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest backend/tests/unit/core/agentscope/events/test_store.py::test_text_message_end_persists_worker_output_and_answer_content -v` +Expected: FAIL + +**Step 3: Write minimal implementation** + +```python +worker = WorkerAgentOutput.model_validate(event.get("workerAgentOutput") or {}) +content = worker.answer +metadata["worker_agent_output"] = worker.model_dump(mode="json") +``` + +orchestrator 在 `text.end` 事件 data 写入 `workerAgentOutput`。 + +**Step 4: Run test to verify it passes** + +Run: `pytest backend/tests/unit/core/agentscope/events/test_store.py::test_text_message_end_persists_worker_output_and_answer_content -v` +Expected: PASS + +**Step 5: Commit** + +```bash +git add backend/src/core/agentscope/events/store.py backend/src/core/agentscope/runtime/orchestrator.py backend/tests/unit/core/agentscope/events/test_store.py +git commit -m "refactor: persist worker output schema with answer as message content" +``` + +### Task 4: Tool 结构化落库(content=result_summary)并删除旧摘要逻辑 + +**Files:** +- Modify: `backend/src/core/agentscope/events/store.py` +- Modify: `backend/src/core/agentscope/runtime/orchestrator.py` +- Delete: `backend/src/core/agentscope/events/tool_result_summary.py` +- Test: `backend/tests/unit/core/agentscope/events/test_store.py` + +**Step 1: Write the failing test** + +```python +async def test_tool_result_persists_tool_output_and_summary_content() -> None: + event = build_tool_result_event(tool_agent_output={"result_summary": "done", ...}) + await store.persist(event) + assert saved.content == "done" + assert saved.metadata_json["tool_agent_output"]["result_summary"] == "done" +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest backend/tests/unit/core/agentscope/events/test_store.py::test_tool_result_persists_tool_output_and_summary_content -v` +Expected: FAIL + +**Step 3: Write minimal implementation** + +```python +tool = ToolAgentOutput.model_validate(event.get("toolAgentOutput") or {}) +content = tool.result_summary +metadata["tool_agent_output"] = tool.model_dump(mode="json") +``` + +移除 `build_tool_content_summary` 相关 import/调用。 + +**Step 4: Run test to verify it passes** + +Run: `pytest backend/tests/unit/core/agentscope/events/test_store.py::test_tool_result_persists_tool_output_and_summary_content -v` +Expected: PASS + +**Step 5: Commit** + +```bash +git add backend/src/core/agentscope/events/store.py backend/src/core/agentscope/runtime/orchestrator.py backend/tests/unit/core/agentscope/events/test_store.py backend/src/core/agentscope/events/tool_result_summary.py +git commit -m "refactor: persist tool output schema and remove legacy summary builder" +``` + +### Task 5: Worker output 模型别名收敛(可选第二阶段) + +**Files:** +- Modify: `backend/src/schemas/agent/runtime_models.py` +- Modify: `backend/src/schemas/messages/chat_message.py` +- Test: `backend/tests/unit/schemas/agent/test_runtime_models.py` + +**Step 1: Write the failing test** + +```python +def test_worker_output_lite_disallows_ui_hints() -> None: + with pytest.raises(ValidationError): + WorkerAgentOutputLite.model_validate({... , "ui_hints": {...}}) +``` + +**Step 2: Run test to verify it fails** + +Run: `pytest backend/tests/unit/schemas/agent/test_runtime_models.py::test_worker_output_lite_disallows_ui_hints -v` +Expected: 根据现状决定(若已 fail 则作为守护测试) + +**Step 3: Write minimal implementation** + +```python +WorkerAgentOutput = WorkerAgentOutputLite | WorkerAgentOutputRich +``` + +如不想扩大变更,可保留现状并仅补充注释说明由 `resolve_worker_output_model` 决定运行时约束。 + +**Step 4: Run test to verify it passes** + +Run: `pytest backend/tests/unit/schemas/agent/test_runtime_models.py -v` +Expected: PASS + +**Step 5: Commit** + +```bash +git add backend/src/schemas/agent/runtime_models.py backend/src/schemas/messages/chat_message.py backend/tests/unit/schemas/agent/test_runtime_models.py +git commit -m "refactor: clarify worker output model contract for lite and rich modes" +``` + +### Task 6: 端到端回归与文档同步 + +**Files:** +- Modify: `docs/protocols/agent-chat-messages.md` +- Modify: `docs/runtime/runtime-route.md` + +**Step 1: Run targeted backend tests** + +Run: `pytest backend/tests/unit/v1/agent/test_agent_router.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/events/test_store.py -v` +Expected: PASS + +**Step 2: Run lint/type checks** + +Run: `cd backend && ruff check src tests && mypy src` +Expected: PASS + +**Step 3: Update docs for new contracts** + +- 明确 `runs` 的 URL 安全边界与 422 错误码。 +- 明确 `worker_agent_output`/`tool_agent_output` 的落库契约及 `content` 映射规则。 + +**Step 4: Final verification** + +Run: `pytest backend/tests -q` +Expected: PASS + +**Step 5: Commit** + +```bash +git add docs/protocols/agent-chat-messages.md docs/runtime/runtime-route.md +git commit -m "docs: align runs multimodal and structured persistence contracts" +``` diff --git a/docs/plans/2026-03-13-agent-runs-multimodal-storage-design.md b/docs/plans/2026-03-13-agent-runs-multimodal-storage-design.md new file mode 100644 index 0000000..4d875ad --- /dev/null +++ b/docs/plans/2026-03-13-agent-runs-multimodal-storage-design.md @@ -0,0 +1,87 @@ +# Agent Runs Multimodal 与落库重构设计 + +**目标**:让 `POST /agent/runs` 支持真实多模态直传到模型(非文本化),并将 worker/tool 结果按新 metadata 协议结构化落库。 + +**范围**:后端 `runs/resume` 请求校验、runtime 输入转换、事件落库、history 回放一致性。 + +--- + +## 1. 背景与问题 + +- 当前 `binary` 内容在运行链路中被当作普通 JSON 文本拼接进入 prompt,模型拿不到原生图像输入。 +- tool 落库仍依赖旧摘要逻辑 `build_tool_content_summary`,与最新 `ToolAgentOutput` 元数据规范不一致。 +- worker 落库当前只落文本内容,未确保 `WorkerAgentOutput` 结构化对象与 `content=answer` 的一致关系。 + +--- + +## 2. 设计原则 + +- 协议单一信源:严格遵循 `docs/protocols/agent-chat-messages.md`,只接受 `binary` 形态,不兼容旧形态。 +- 最小安全边界:仅允许本项目 Supabase 私有桶签名 URL,拒绝任意外部 URL。 +- 事件驱动持久化:以 event store 作为唯一落库入口,避免双轨逻辑。 +- 数据可回放:history 始终可按 metadata 重新签名并回填 user 附件。 + +--- + +## 3. 目标数据流 + +1. `runs` 入参校验通过后,user message 入库(附件仅存 bucket/path/mime)。 +2. runtime 执行时,将 `binary` 转为模型多模态 `image_url` content block 直传。 +3. orchestrator 产出结构化事件: + - worker 主响应通过 `TEXT_MESSAGE_*` 事件发送,`TEXT_MESSAGE_END` 携带 `workerAgentOutput`。 + - tool 执行结果通过 `TOOL_CALL_RESULT` 事件发送,携带 `toolAgentOutput`。 +4. event store 统一校验并落库: + - worker:`content = answer`,metadata 写 `worker_agent_output`。 + - tool:`content = result_summary`,metadata 写 `tool_agent_output`。 +5. history 读取 user metadata 重新签名 URL,返回 `binary` block 给前端。 + +--- + +## 4. 安全与错误策略 + +### 4.1 URL 安全边界 + +- `binary.url` 必须满足: + - host 为当前 Supabase 项目域名。 + - path 为 `/storage/v1/object/sign/{bucket}/{path}`。 + - `{bucket}` 等于 `config.storage.bucket`。 + - `{path}` 前缀匹配 `agent-inputs/{user_id}/{thread_id}/uploads/`。 + +### 4.2 运行失败 + +- 保持 AG-UI 生命周期完整:`RUN_STARTED` 后只能 `RUN_FINISHED` 或 `RUN_ERROR` 结束。 +- 运行错误时不落半结构化消息,避免脏元数据。 + +--- + +## 5. 落库契约 + +### 5.1 Worker + +- 入库角色:`assistant` +- `messages.content = worker_agent_output.answer` +- `messages.metadata.worker_agent_output = WorkerAgentOutput`(完整、schema 校验后) + +### 5.2 Tool + +- 入库角色:`tool` +- `messages.content = tool_agent_output.result_summary` +- `messages.metadata.tool_agent_output = ToolAgentOutput`(完整、schema 校验后) +- 删除旧摘要逻辑:`build_tool_content_summary` + +--- + +## 6. 兼容性策略 + +- 不兼容旧输入块形态(如 `image_url` 作为 runs 输入)。 +- 历史接口输出协议保持不变,前端无需修改消费协议。 +- 原有 user 附件回放路径保留,只强化入站 URL 校验。 + +--- + +## 7. 验收标准 + +- runs 包含合法 `binary` 时,模型收到多模态消息(非文本化 JSON)。 +- 非本项目签名 URL 返回 `422`。 +- worker/tool 落库满足 `content` 与结构化 metadata 一一对应。 +- history 仍能正确回放 user 附件(临时签名 URL)。 diff --git a/docs/protocols/routes/agent-runs-events-history.md b/docs/protocols/routes/agent-runs-events-history.md new file mode 100644 index 0000000..cea400a --- /dev/null +++ b/docs/protocols/routes/agent-runs-events-history.md @@ -0,0 +1,239 @@ +# Agent Runs Events and History Route Protocol + +> **NOTE**: This document is the single source of truth for agent runs event streaming and history snapshot routes. + +## Overview + +Defines the transport format for: + +- `POST /api/v1/agent/runs` +- `GET /api/v1/agent/runs/{thread_id}/events` +- `GET /api/v1/agent/history` +- `GET /api/v1/agent/attachments/signed-url` + +## Version + +- **Current**: `1.0` +- **Status**: Draft (pending full backend/frontend alignment) + +--- + +## Route Semantics + +### `GET /api/v1/agent/history` + +- Unified history endpoint. +- Query params: + - `threadId` (optional): target thread id. + - `before` (optional, `YYYY-MM-DD`): paginate by day. +- Behavior: + - With `threadId`: returns that thread's day snapshot. + - Without `threadId`: returns latest available thread snapshot for current user. + +### `GET /api/v1/agent/attachments/signed-url` + +- Generate temporary signed URL for attachment rendering. +- Query params: + - `bucket` (required) + - `path` (required) +- Scope rule: + - `bucket` must match current storage bucket. + - `path` must be within current user prefix `agent-inputs/{user_id}/`. + +--- + +## SSE Envelope (`/events`) + +`GET /api/v1/agent/runs/{thread_id}/events` uses `text/event-stream`. + +Each SSE frame format: + +```text +id: +event: +data: + +``` + +--- + +## Event Type Set + +- `RUN_STARTED` +- `STEP_STARTED` +- `STEP_FINISHED` +- `TEXT_MESSAGE_START` +- `TEXT_MESSAGE_CONTENT` +- `TEXT_MESSAGE_END` +- `TOOL_CALL_RESULT` +- `RUN_FINISHED` +- `RUN_ERROR` + +--- + +## Common Event Fields + +```typescript +interface EventBase { + type: string; + threadId: string; + runId?: string; +} +``` + +--- + +## Event Payload Schemas + +### Run Lifecycle + +```typescript +interface RunStartedEvent extends EventBase { + type: "RUN_STARTED"; + runId: string; +} + +interface RunFinishedEvent extends EventBase { + type: "RUN_FINISHED"; + runId: string; +} + +interface RunErrorEvent extends EventBase { + type: "RUN_ERROR"; + runId: string; + message: string; +} +``` + +### Step Lifecycle + +```typescript +interface StepStartedEvent extends EventBase { + type: "STEP_STARTED"; + runId: string; + stepName: string; +} + +interface StepFinishedEvent extends EventBase { + type: "STEP_FINISHED"; + runId: string; + stepName: string; +} +``` + +### Text Streaming + +```typescript +interface TextMessageStartEvent extends EventBase { + type: "TEXT_MESSAGE_START"; + runId: string; + messageId: string; + role: "assistant" | "system" | "user" | "tool"; + stage?: string; +} + +interface TextMessageContentEvent extends EventBase { + type: "TEXT_MESSAGE_CONTENT"; + runId: string; + messageId: string; + delta: string; // incremental text chunk +} + +interface TextMessageEndEvent extends EventBase { + type: "TEXT_MESSAGE_END"; + runId: string; + messageId: string; + workerAgentOutput: WorkerAgentOutput; + // stage/model are intentionally excluded from this event +} +``` + +### Tool Result + +```typescript +interface ToolCallResultEvent extends EventBase { + type: "TOOL_CALL_RESULT"; + messageId: string; + toolCallId: string; + toolAgentOutput: ToolAgentOutput; // required +} +``` + +### Worker/Tool Payloads + +```typescript +interface WorkerAgentOutput { + status: "success" | "partial_success" | "failed"; + answer: string; + key_points?: string[]; + result_type?: string; + suggested_actions?: string[]; + error?: { + code: string; + message: string; + retryable?: boolean; + details?: Record; + }; + ui_hints?: Record; +} + +interface ToolAgentOutput { + tool_name: string; + tool_call_id: string; + tool_call_args?: Record; + status: "success" | "partial" | "failure"; + result_summary: string; + ui_hints?: Record; + error?: { + code: string; + message: string; + retryable?: boolean; + details?: Record; + }; +} +``` + +--- + +## History Response Schema + +`GET /api/v1/agent/history` returns `STATE_SNAPSHOT` payload. + +```typescript +interface AgentHistoryResponse { + type: "STATE_SNAPSHOT"; + threadId?: string; + snapshot: { + scope: "history_day"; + threadId: string | null; + day: string | null; // YYYY-MM-DD + hasMore: boolean; + messages: SnapshotMessage[]; + }; +} + +interface SnapshotMessage { + id: string; + seq: number; + role: "user" | "assistant" | "system" | "tool"; + content: string; + metadata?: Record; + timestamp: string; // ISO-8601 +} + +interface AttachmentSignedUrlResponse { + bucket: string; + path: string; + url: string; +} +``` + +--- + +## Compatibility Notes + +- For `TOOL_CALL_RESULT`, clients should treat `toolAgentOutput` as canonical payload. +- `TEXT_MESSAGE_CONTENT.delta` is defined as incremental text chunk. Implementations should emit multiple chunks for real streaming UX. +- `TEXT_MESSAGE_END` must not include `stage` or `model` in this protocol version. +- History snapshot `messages[]` strictly follows `backend/src/schemas/messages/chat_message.py` `AgentChatMessage` schema. +- Attachment URL rendering is decoupled from history; client should call `/api/v1/agent/attachments/signed-url` using metadata fields.