diff --git a/backend/src/v1/agent/attachment_storage.py b/backend/src/v1/agent/attachment_storage.py new file mode 100644 index 0000000..61725ec --- /dev/null +++ b/backend/src/v1/agent/attachment_storage.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import asyncio +from typing import Any + +from services.base.supabase import supabase_service + + +class AgentAttachmentStorage: + def _bucket_client(self, *, bucket: str) -> Any: + client = supabase_service.get_admin_client() + storage = getattr(client, "storage", None) + if storage is None: + raise RuntimeError("Supabase storage client unavailable") + from_bucket = getattr(storage, "from_", None) + if not callable(from_bucket): + raise RuntimeError("Supabase storage bucket accessor unavailable") + return from_bucket(bucket) + + async def upload_bytes( + self, + *, + bucket: str, + path: str, + content: bytes, + content_type: str, + ) -> str: + def _upload() -> object: + bucket_client = self._bucket_client(bucket=bucket) + upload = getattr(bucket_client, "upload", None) + if not callable(upload): + raise RuntimeError("Supabase storage upload is unavailable") + return upload( + path, + content, + { + "content-type": content_type, + "upsert": "true", + }, + ) + + await asyncio.to_thread(_upload) + return path + + +def create_attachment_storage() -> AgentAttachmentStorage | None: + try: + supabase_service.get_admin_client() + except Exception: + return None + return AgentAttachmentStorage() diff --git a/backend/src/v1/agent/dependencies.py b/backend/src/v1/agent/dependencies.py index 6d7adcf..4a1ff6a 100644 --- a/backend/src/v1/agent/dependencies.py +++ b/backend/src/v1/agent/dependencies.py @@ -20,6 +20,7 @@ from core.config.settings import config from core.db import get_db from services.base.redis import get_or_init_redis_client from v1.agent.repository import AgentRepository +from v1.agent.attachment_storage import create_attachment_storage from v1.agent.service import AgentService DEDUP_WAIT_RETRIES = 20 @@ -109,8 +110,10 @@ class RedisEventStream: def get_agent_service(session: AsyncSession = Depends(get_db)) -> AgentService: tool_result_storage = create_tool_result_storage() + attachment_storage = create_attachment_storage() return AgentService( repository=AgentRepository(session, tool_result_storage=tool_result_storage), queue=TaskiqQueueClient(), stream=RedisEventStream(), + attachment_storage=attachment_storage, ) diff --git a/backend/src/v1/agent/repository.py b/backend/src/v1/agent/repository.py index 9178e59..054013a 100644 --- a/backend/src/v1/agent/repository.py +++ b/backend/src/v1/agent/repository.py @@ -84,6 +84,43 @@ class AgentRepository: await self._session.delete(session) await self._session.flush() + async def persist_user_message( + self, + *, + session_id: str, + run_id: str, + content_text: str, + metadata: dict[str, object] | None, + ) -> None: + try: + session_uuid = UUID(session_id) + except ValueError as exc: + raise HTTPException(status_code=422, detail="Invalid session_id") from exc + + stmt = ( + select(AgentChatSession) + .where(AgentChatSession.id == session_uuid) + .with_for_update() + ) + session_row = (await self._session.execute(stmt)).scalar_one_or_none() + if session_row is None: + raise HTTPException(status_code=404, detail="Session not found") + + next_seq = int(session_row.message_count or 0) + 1 + payload_metadata = dict(metadata or {}) + payload_metadata["run_id"] = run_id + message = AgentChatMessage( + session_id=session_uuid, + seq=next_seq, + role=AgentChatMessageRole.USER, + content=content_text, + metadata_json=payload_metadata, + ) + self._session.add(message) + session_row.message_count = next_seq + session_row.last_activity_at = datetime.now(timezone.utc) + await self._session.flush() + async def get_history_day( self, *, session_id: str, before: date | None ) -> dict[str, object] | None: @@ -218,4 +255,12 @@ class AgentRepository: payload["content"] = message.content else: payload["content"] = message.content + metadata = message.metadata_json or {} + attachments = ( + metadata.get("attachments") if isinstance(metadata, dict) else None + ) + if isinstance(attachments, list): + rendered = [item for item in attachments if isinstance(item, dict)] + if rendered: + payload["attachments"] = rendered return payload diff --git a/backend/src/v1/agent/service.py b/backend/src/v1/agent/service.py index b415fb8..386d03f 100644 --- a/backend/src/v1/agent/service.py +++ b/backend/src/v1/agent/service.py @@ -1,8 +1,10 @@ from __future__ import annotations import asyncio +import base64 from dataclasses import dataclass from datetime import date +import hashlib from typing import Any, Protocol import dashscope @@ -12,6 +14,7 @@ from fastapi import HTTPException from sqlalchemy.exc import IntegrityError from core.auth.models import CurrentUser +from core.agentscope.schemas.agui_input import extract_latest_user_payload from core.config.settings import config from core.logging import get_logger @@ -54,6 +57,15 @@ class AgentRepositoryLike(Protocol): async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: ... + async def persist_user_message( + self, + *, + session_id: str, + run_id: str, + content_text: str, + metadata: dict[str, object] | None, + ) -> None: ... + class QueueClientLike(Protocol): async def enqueue( @@ -70,6 +82,17 @@ class EventStreamLike(Protocol): ) -> list[dict[str, object]]: ... +class AttachmentStorageLike(Protocol): + async def upload_bytes( + self, + *, + bucket: str, + path: str, + content: bytes, + content_type: str, + ) -> str: ... + + def ensure_session_owner(*, owner_id: str, current_user: CurrentUser) -> None: if owner_id != str(current_user.id): raise HTTPException(status_code=403, detail="Forbidden") @@ -79,6 +102,7 @@ class AgentService: _repository: AgentRepositoryLike _queue: QueueClientLike _stream: EventStreamLike + _attachment_storage: AttachmentStorageLike | None def __init__( self, @@ -86,10 +110,12 @@ class AgentService: repository: AgentRepositoryLike, queue: QueueClientLike, stream: EventStreamLike, + attachment_storage: AttachmentStorageLike | None = None, ) -> None: self._repository = repository self._queue = queue self._stream = stream + self._attachment_storage = attachment_storage async def enqueue_run( self, @@ -119,6 +145,18 @@ class AgentService: else: ensure_session_owner(owner_id=owner, current_user=current_user) + user_message_text, user_message_metadata = await self._prepare_user_message( + run_input=run_input, + current_user=current_user, + ) + await self._repository.persist_user_message( + session_id=thread_id, + run_id=run_id, + content_text=user_message_text, + metadata=user_message_metadata, + ) + await self._repository.commit() + task_id = await self._queue.enqueue( command={ "command": "run", @@ -135,6 +173,54 @@ class AgentService: created=created, ) + async def _prepare_user_message( + self, + *, + run_input: RunAgentInput, + current_user: CurrentUser, + ) -> tuple[str, dict[str, object] | None]: + text, content_blocks = extract_latest_user_payload(run_input) + attachments: list[dict[str, object]] = [] + if self._attachment_storage is not None: + for index, block in enumerate(content_blocks): + if not isinstance(block, dict): + continue + if block.get("type") != "image_url": + continue + image_value = block.get("image_url") + if not isinstance(image_value, dict): + continue + url = image_value.get("url") + if not isinstance(url, str) or not url.startswith("data:"): + continue + decoded = _decode_data_url(url) + if decoded is None: + continue + mime_type, payload = decoded + suffix = _mime_to_suffix(mime_type) + checksum = hashlib.sha1(payload).hexdigest()[:16] + path = ( + f"agent-inputs/{current_user.id}/{run_input.thread_id}/" + f"{run_input.run_id}/attachment-{index}-{checksum}.{suffix}" + ) + stored_path = await self._attachment_storage.upload_bytes( + bucket=config.storage.bucket, + path=path, + content=payload, + content_type=mime_type, + ) + attachments.append( + { + "bucket": config.storage.bucket, + "path": stored_path, + "mimeType": mime_type, + } + ) + metadata: dict[str, object] = {} + if attachments: + metadata["attachments"] = attachments + return text, metadata or None + async def enqueue_resume( self, *, @@ -340,3 +426,30 @@ class AsrService: asr_service = AsrService() + + +def _decode_data_url(data_url: str) -> tuple[str, bytes] | None: + if not data_url.startswith("data:"): + return None + header, sep, payload = data_url.partition(",") + if not sep: + return None + mime_type = "image/png" + if ";" in header: + maybe_mime = header[5:].split(";", 1)[0].strip() + if maybe_mime: + mime_type = maybe_mime + try: + decoded = base64.b64decode(payload, validate=True) + except ValueError: + return None + return mime_type, decoded + + +def _mime_to_suffix(mime_type: str) -> str: + mapping = { + "image/png": "png", + "image/jpeg": "jpg", + "image/webp": "webp", + } + return mapping.get(mime_type.lower(), "bin") diff --git a/backend/tests/integration/v1/agent/test_sse_flow_live.py b/backend/tests/integration/v1/agent/test_sse_flow_live.py index 0c5a5f0..97831bd 100644 --- a/backend/tests/integration/v1/agent/test_sse_flow_live.py +++ b/backend/tests/integration/v1/agent/test_sse_flow_live.py @@ -100,3 +100,105 @@ async def test_agent_sse_closed_loop_live() -> None: ) ) assert len(list(rows.scalars().all())) >= 1 + + +@pytest.mark.asyncio +@pytest.mark.live +async def test_agent_runs_events_history_live_with_image_input() -> None: + if os.getenv("AGENT_LIVE_INTEGRATION") != "1": + pytest.skip("set AGENT_LIVE_INTEGRATION=1 to run live integration test") + + async with httpx.AsyncClient(timeout=30.0) as client: + token = await _live_access_token(client) + headers = {"Authorization": f"Bearer {token}"} + thread_id = str(uuid4()) + + run_resp = await client.post( + f"{BASE_URL}/api/v1/agent/runs", + headers=headers, + json={ + "threadId": thread_id, + "runId": "run-live-image-1", + "state": {}, + "messages": [ + { + "id": "u1", + "role": "user", + "content": [ + {"type": "text", "text": "请描述图片里的内容"}, + { + "type": "binary", + "data": "aGVsbG8=", + "mimeType": "image/png", + }, + ], + } + ], + "tools": [], + "context": [], + "forwardedProps": {}, + }, + ) + assert run_resp.status_code == 202 + + events_url = f"{BASE_URL}/api/v1/agent/runs/{thread_id}/events" + event_names: list[str] = [] + async with client.stream( + "GET", events_url, headers=headers, timeout=20.0 + ) as sse_resp: + assert sse_resp.status_code == 200 + assert sse_resp.headers.get("content-type", "").startswith( + "text/event-stream" + ) + async for line in sse_resp.aiter_lines(): + if line.startswith("event:"): + event_name = line.split(":", 1)[1].strip() + event_names.append(event_name) + if event_name in {"RUN_FINISHED", "RUN_ERROR"}: + break + + assert "RUN_STARTED" in event_names + assert "RUN_FINISHED" in event_names or "RUN_ERROR" in event_names + + history_resp = await client.get( + f"{BASE_URL}/api/v1/agent/runs/{thread_id}/history", + headers=headers, + ) + assert history_resp.status_code == 200 + history = history_resp.json() + assert history.get("type") == "STATE_SNAPSHOT" + snapshot = history.get("snapshot", {}) + assert snapshot.get("scope") == "history_day" + messages = snapshot.get("messages", []) + user_messages = [ + item + for item in messages + if isinstance(item, dict) and item.get("role") == "user" + ] + assert user_messages + attachments = user_messages[0].get("attachments") + assert isinstance(attachments, list) + assert attachments and isinstance(attachments[0], dict) + assert isinstance(attachments[0].get("path"), str) + + async with AsyncSessionLocal() as session: + session_row = await session.get(AgentChatSession, UUID(thread_id)) + assert session_row is not None + assert session_row.message_count >= 1 + assert session_row.total_tokens >= 0 + assert session_row.total_cost >= 0 + + rows = await session.execute( + select(AgentChatMessage).where( + AgentChatMessage.session_id == UUID(thread_id) + ) + ) + all_messages = list(rows.scalars().all()) + assert all_messages + user_rows = [row for row in all_messages if str(row.role) == "user"] + assert user_rows + metadata = user_rows[0].metadata_json or {} + attachments = metadata.get("attachments") + assert isinstance(attachments, list) + assert attachments and isinstance(attachments[0], dict) + assert isinstance(attachments[0].get("path"), str) diff --git a/backend/tests/unit/v1/agent/test_repository.py b/backend/tests/unit/v1/agent/test_repository.py index 8877016..cbfc245 100644 --- a/backend/tests/unit/v1/agent/test_repository.py +++ b/backend/tests/unit/v1/agent/test_repository.py @@ -70,3 +70,37 @@ async def test_tool_message_keeps_inline_content_when_storage_payload_missing() assert payload["toolCallId"] == "call-2" assert payload["content"] == "inline-tool-content" + + +@pytest.mark.asyncio +async def test_user_message_snapshot_includes_renderable_attachments() -> None: + repository = AgentRepository( + session=SimpleNamespace(), # type: ignore[arg-type] + ) + message = SimpleNamespace( + id=uuid4(), + role=AgentChatMessageRole.USER, + created_at=datetime.now(timezone.utc), + content="请分析这张图", + metadata_json={ + "attachments": [ + { + "bucket": "agent-chat-attachments", + "path": "agent-inputs/u1/t1/r1/m1/att-1.png", + "mimeType": "image/png", + } + ] + }, + ) + + payload = await repository._to_snapshot_message(message) # type: ignore[arg-type] + + assert payload["role"] == "user" + assert payload["content"] == "请分析这张图" + assert payload["attachments"] == [ + { + "bucket": "agent-chat-attachments", + "path": "agent-inputs/u1/t1/r1/m1/att-1.png", + "mimeType": "image/png", + } + ] diff --git a/backend/tests/unit/v1/agent/test_service.py b/backend/tests/unit/v1/agent/test_service.py index 4136109..420067d 100644 --- a/backend/tests/unit/v1/agent/test_service.py +++ b/backend/tests/unit/v1/agent/test_service.py @@ -18,6 +18,7 @@ class _FakeRepository: self.rolled_back = False self.deleted_session_id: str | None = None self.created_with_session_id: str | None = None + self.persisted_user_messages: list[dict[str, object]] = [] async def get_session_owner(self, *, session_id: str) -> str: if session_id == "00000000-0000-0000-0000-000000000001": @@ -56,6 +57,23 @@ class _FakeRepository: del user_id return "00000000-0000-0000-0000-000000000001" + async def persist_user_message( + self, + *, + session_id: str, + run_id: str, + content_text: str, + metadata: dict[str, object] | None, + ) -> None: + self.persisted_user_messages.append( + { + "session_id": session_id, + "run_id": run_id, + "content_text": content_text, + "metadata": metadata, + } + ) + class _FakeQueue: async def enqueue( @@ -83,6 +101,29 @@ class _FakeStream: ] +class _FakeAttachmentStorage: + def __init__(self) -> None: + self.calls: list[dict[str, object]] = [] + + async def upload_bytes( + self, + *, + bucket: str, + path: str, + content: bytes, + content_type: str, + ) -> str: + self.calls.append( + { + "bucket": bucket, + "path": path, + "content": content, + "content_type": content_type, + } + ) + return path + + def _user() -> CurrentUser: return CurrentUser( id=UUID("00000000-0000-0000-0000-000000000001"), @@ -216,6 +257,66 @@ async def test_enqueue_run_handles_session_create_race() -> None: assert repository.rolled_back is True +async def test_enqueue_run_uploads_user_image_to_supabase_and_injects_metadata( + monkeypatch, +) -> None: + monkeypatch.setattr( + agent_service_module.config.storage, "bucket", "agent-test-bucket" + ) + repository = _FakeRepository() + attachment_storage = _FakeAttachmentStorage() + service = AgentService( + repository=repository, + queue=_FakeQueue(), + stream=_FakeStream(), + attachment_storage=attachment_storage, + ) + run_input = RunAgentInput.model_validate( + { + "threadId": "00000000-0000-0000-0000-000000000001", + "runId": "run-with-image", + "state": {}, + "messages": [ + { + "id": "u1", + "role": "user", + "content": [ + {"type": "text", "text": "帮我看下这张图"}, + { + "type": "binary", + "data": "aGVsbG8=", + "mimeType": "image/png", + }, + ], + } + ], + "tools": [], + "context": [], + "forwardedProps": {}, + } + ) + + accepted = await service.enqueue_run(run_input=run_input, current_user=_user()) + + assert accepted.task_id == "task-1" + assert len(attachment_storage.calls) == 1 + upload = attachment_storage.calls[0] + assert upload["bucket"] == "agent-test-bucket" + assert upload["content"] == b"hello" + assert upload["content_type"] == "image/png" + assert repository.persisted_user_messages + persisted = repository.persisted_user_messages[0] + assert persisted["session_id"] == "00000000-0000-0000-0000-000000000001" + assert persisted["run_id"] == "run-with-image" + metadata = persisted["metadata"] + assert isinstance(metadata, dict) + attachments = metadata.get("attachments") + assert isinstance(attachments, list) + assert attachments and isinstance(attachments[0], dict) + assert attachments[0]["bucket"] == "agent-test-bucket" + assert isinstance(attachments[0]["path"], str) + + async def test_get_history_snapshot_wraps_history_day_as_state_snapshot_event() -> None: service = AgentService( repository=_FakeRepository(), diff --git a/docs/plans/2026-03-11-agent-multimodal-smoke-implementation.md b/docs/plans/2026-03-11-agent-multimodal-smoke-implementation.md new file mode 100644 index 0000000..891bd72 --- /dev/null +++ b/docs/plans/2026-03-11-agent-multimodal-smoke-implementation.md @@ -0,0 +1,141 @@ +# Agent Multimodal Smoke Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 完成 agent 三条主链路(runs/events/history)真实冒烟,并支持 RunAgentInput 图片信息在发送链路落 Supabase Storage、在 messages.metadata 持久化、在 history 返回中可渲染。 + +**Architecture:** 在 `v1/agent` 服务层新增“用户消息持久化 + 图片附件上传”步骤:`enqueue_run` 时解析用户消息 content block,图片上传到 `config.storage.bucket`,将路径写入 `messages.metadata`。运行时继续通过 AgentScope pipeline 输出 AG-UI 事件,SSE 从 Redis stream 订阅,历史查询从 `messages` 回放并附带附件信息。 + +**Tech Stack:** FastAPI, SQLAlchemy AsyncSession, Supabase Storage Admin Client, Redis SSE stream, AG-UI, pytest/httpx。 + +--- + +### Task 1: 用户消息图片附件上传与落库 + +**Files:** +- Create: `backend/src/v1/agent/attachment_storage.py` +- Modify: `backend/src/v1/agent/service.py` +- Modify: `backend/src/v1/agent/repository.py` +- Test: `backend/tests/unit/v1/agent/test_service.py` + +**Step 1: 写失败测试(RED)** + +```python +@pytest.mark.asyncio +async def test_enqueue_run_persists_user_message_with_uploaded_image_metadata() -> None: + ... +``` + +**Step 2: 运行单测验证失败** + +Run: `uv run pytest tests/unit/v1/agent/test_service.py::test_enqueue_run_persists_user_message_with_uploaded_image_metadata -q` +Expected: FAIL(缺少附件上传/metadata 持久化行为) + +**Step 3: 最小实现(GREEN)** + +```python +class AgentAttachmentStorage: + async def upload_bytes(...): + ... + +class AgentService: + async def enqueue_run(...): + # 解析 user content blocks + # 上传图片到 storage + # repository 持久化 user message(metadata 包含 bucket/path) + ... +``` + +**Step 4: 运行单测验证通过** + +Run: `uv run pytest tests/unit/v1/agent/test_service.py::test_enqueue_run_persists_user_message_with_uploaded_image_metadata -q` +Expected: PASS + +### Task 2: history 渲染附件路径 + +**Files:** +- Modify: `backend/src/v1/agent/repository.py` +- Test: `backend/tests/unit/v1/agent/test_repository.py` + +**Step 1: 写失败测试(RED)** + +```python +@pytest.mark.asyncio +async def test_history_includes_user_message_attachments_from_metadata() -> None: + ... +``` + +**Step 2: 运行测试验证失败** + +Run: `uv run pytest tests/unit/v1/agent/test_repository.py::test_history_includes_user_message_attachments_from_metadata -q` +Expected: FAIL(history 尚未渲染 attachments) + +**Step 3: 最小实现(GREEN)** + +```python +if role == "user" and isinstance(metadata.get("attachments"), list): + payload["attachments"] = metadata["attachments"] +``` + +**Step 4: 运行测试验证通过** + +Run: `uv run pytest tests/unit/v1/agent/test_repository.py::test_history_includes_user_message_attachments_from_metadata -q` +Expected: PASS + +### Task 3: 真实冒烟 runs + SSE + history(含图片输入) + +**Files:** +- Modify: `backend/tests/integration/v1/agent/test_sse_flow_live.py` + +**Step 1: 写失败测试(RED)** + +```python +@pytest.mark.asyncio +@pytest.mark.live +async def test_agent_runs_events_history_live_with_image_input() -> None: + ... +``` + +**Step 2: 运行 live 测试验证失败(实现前或环境不完整)** + +Run: `AGENT_LIVE_INTEGRATION=1 AGENT_LIVE_EMAIL=... AGENT_LIVE_PASSWORD=... uv run pytest tests/integration/v1/agent/test_sse_flow_live.py::test_agent_runs_events_history_live_with_image_input -q -s` +Expected: FAIL(缺 metadata/path 或 history 不含附件) + +**Step 3: 最小实现(GREEN)** + +```python +# live 测试流程: +# 1) 登录拿 token +# 2) POST /runs 发送 text + image(data) +# 3) SSE 订阅直到 RUN_FINISHED/RUN_ERROR +# 4) GET /runs/{thread_id}/history +# 5) SQL 校验 sessions/messages 字段与 metadata.attachments +``` + +**Step 4: 运行 live 测试验证通过** + +Run: `AGENT_LIVE_INTEGRATION=1 AGENT_LIVE_EMAIL=... AGENT_LIVE_PASSWORD=... uv run pytest tests/integration/v1/agent/test_sse_flow_live.py::test_agent_runs_events_history_live_with_image_input -q -s` +Expected: PASS + +### Task 4: 全量收口验证与安全门禁 + +**Files:** +- Modify (if needed): `backend/src/v1/agent/*`, `backend/tests/*` + +**Step 1: 回归测试** + +Run: `uv run pytest tests/unit/v1/agent tests/unit/core/agentscope tests/integration/v1/agent -q` +Expected: PASS + +**Step 2: 静态检查** + +Run: `uv run ruff check src/v1/agent src/core/agentscope tests/unit/v1/agent tests/integration/v1/agent` +Expected: PASS + +Run: `uv run basedpyright src/v1/agent src/core/agentscope tests/unit/v1/agent tests/integration/v1/agent` +Expected: 0 errors + +**Step 3: 评审门禁** + +Run agents: `security-reviewer`, `refactor-cleaner`, `code-reviewer` +Expected: 无未解决 CRITICAL/HIGH