chore: no changes needed for calendar message card

This commit is contained in:
qzl
2026-03-11 21:06:02 +08:00
parent 98f22a2127
commit a8dacbe81f
8 changed files with 590 additions and 0 deletions
@@ -0,0 +1,51 @@
from __future__ import annotations
import asyncio
from typing import Any
from services.base.supabase import supabase_service
class AgentAttachmentStorage:
def _bucket_client(self, *, bucket: str) -> Any:
client = supabase_service.get_admin_client()
storage = getattr(client, "storage", None)
if storage is None:
raise RuntimeError("Supabase storage client unavailable")
from_bucket = getattr(storage, "from_", None)
if not callable(from_bucket):
raise RuntimeError("Supabase storage bucket accessor unavailable")
return from_bucket(bucket)
async def upload_bytes(
self,
*,
bucket: str,
path: str,
content: bytes,
content_type: str,
) -> str:
def _upload() -> object:
bucket_client = self._bucket_client(bucket=bucket)
upload = getattr(bucket_client, "upload", None)
if not callable(upload):
raise RuntimeError("Supabase storage upload is unavailable")
return upload(
path,
content,
{
"content-type": content_type,
"upsert": "true",
},
)
await asyncio.to_thread(_upload)
return path
def create_attachment_storage() -> AgentAttachmentStorage | None:
try:
supabase_service.get_admin_client()
except Exception:
return None
return AgentAttachmentStorage()
+3
View File
@@ -20,6 +20,7 @@ from core.config.settings import config
from core.db import get_db from core.db import get_db
from services.base.redis import get_or_init_redis_client from services.base.redis import get_or_init_redis_client
from v1.agent.repository import AgentRepository from v1.agent.repository import AgentRepository
from v1.agent.attachment_storage import create_attachment_storage
from v1.agent.service import AgentService from v1.agent.service import AgentService
DEDUP_WAIT_RETRIES = 20 DEDUP_WAIT_RETRIES = 20
@@ -109,8 +110,10 @@ class RedisEventStream:
def get_agent_service(session: AsyncSession = Depends(get_db)) -> AgentService: def get_agent_service(session: AsyncSession = Depends(get_db)) -> AgentService:
tool_result_storage = create_tool_result_storage() tool_result_storage = create_tool_result_storage()
attachment_storage = create_attachment_storage()
return AgentService( return AgentService(
repository=AgentRepository(session, tool_result_storage=tool_result_storage), repository=AgentRepository(session, tool_result_storage=tool_result_storage),
queue=TaskiqQueueClient(), queue=TaskiqQueueClient(),
stream=RedisEventStream(), stream=RedisEventStream(),
attachment_storage=attachment_storage,
) )
+45
View File
@@ -84,6 +84,43 @@ class AgentRepository:
await self._session.delete(session) await self._session.delete(session)
await self._session.flush() await self._session.flush()
async def persist_user_message(
self,
*,
session_id: str,
run_id: str,
content_text: str,
metadata: dict[str, object] | None,
) -> None:
try:
session_uuid = UUID(session_id)
except ValueError as exc:
raise HTTPException(status_code=422, detail="Invalid session_id") from exc
stmt = (
select(AgentChatSession)
.where(AgentChatSession.id == session_uuid)
.with_for_update()
)
session_row = (await self._session.execute(stmt)).scalar_one_or_none()
if session_row is None:
raise HTTPException(status_code=404, detail="Session not found")
next_seq = int(session_row.message_count or 0) + 1
payload_metadata = dict(metadata or {})
payload_metadata["run_id"] = run_id
message = AgentChatMessage(
session_id=session_uuid,
seq=next_seq,
role=AgentChatMessageRole.USER,
content=content_text,
metadata_json=payload_metadata,
)
self._session.add(message)
session_row.message_count = next_seq
session_row.last_activity_at = datetime.now(timezone.utc)
await self._session.flush()
async def get_history_day( async def get_history_day(
self, *, session_id: str, before: date | None self, *, session_id: str, before: date | None
) -> dict[str, object] | None: ) -> dict[str, object] | None:
@@ -218,4 +255,12 @@ class AgentRepository:
payload["content"] = message.content payload["content"] = message.content
else: else:
payload["content"] = message.content payload["content"] = message.content
metadata = message.metadata_json or {}
attachments = (
metadata.get("attachments") if isinstance(metadata, dict) else None
)
if isinstance(attachments, list):
rendered = [item for item in attachments if isinstance(item, dict)]
if rendered:
payload["attachments"] = rendered
return payload return payload
+113
View File
@@ -1,8 +1,10 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import base64
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date from datetime import date
import hashlib
from typing import Any, Protocol from typing import Any, Protocol
import dashscope import dashscope
@@ -12,6 +14,7 @@ from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from core.auth.models import CurrentUser from core.auth.models import CurrentUser
from core.agentscope.schemas.agui_input import extract_latest_user_payload
from core.config.settings import config from core.config.settings import config
from core.logging import get_logger from core.logging import get_logger
@@ -54,6 +57,15 @@ class AgentRepositoryLike(Protocol):
async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: ... async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: ...
async def persist_user_message(
self,
*,
session_id: str,
run_id: str,
content_text: str,
metadata: dict[str, object] | None,
) -> None: ...
class QueueClientLike(Protocol): class QueueClientLike(Protocol):
async def enqueue( async def enqueue(
@@ -70,6 +82,17 @@ class EventStreamLike(Protocol):
) -> list[dict[str, object]]: ... ) -> list[dict[str, object]]: ...
class AttachmentStorageLike(Protocol):
async def upload_bytes(
self,
*,
bucket: str,
path: str,
content: bytes,
content_type: str,
) -> str: ...
def ensure_session_owner(*, owner_id: str, current_user: CurrentUser) -> None: def ensure_session_owner(*, owner_id: str, current_user: CurrentUser) -> None:
if owner_id != str(current_user.id): if owner_id != str(current_user.id):
raise HTTPException(status_code=403, detail="Forbidden") raise HTTPException(status_code=403, detail="Forbidden")
@@ -79,6 +102,7 @@ class AgentService:
_repository: AgentRepositoryLike _repository: AgentRepositoryLike
_queue: QueueClientLike _queue: QueueClientLike
_stream: EventStreamLike _stream: EventStreamLike
_attachment_storage: AttachmentStorageLike | None
def __init__( def __init__(
self, self,
@@ -86,10 +110,12 @@ class AgentService:
repository: AgentRepositoryLike, repository: AgentRepositoryLike,
queue: QueueClientLike, queue: QueueClientLike,
stream: EventStreamLike, stream: EventStreamLike,
attachment_storage: AttachmentStorageLike | None = None,
) -> None: ) -> None:
self._repository = repository self._repository = repository
self._queue = queue self._queue = queue
self._stream = stream self._stream = stream
self._attachment_storage = attachment_storage
async def enqueue_run( async def enqueue_run(
self, self,
@@ -119,6 +145,18 @@ class AgentService:
else: else:
ensure_session_owner(owner_id=owner, current_user=current_user) ensure_session_owner(owner_id=owner, current_user=current_user)
user_message_text, user_message_metadata = await self._prepare_user_message(
run_input=run_input,
current_user=current_user,
)
await self._repository.persist_user_message(
session_id=thread_id,
run_id=run_id,
content_text=user_message_text,
metadata=user_message_metadata,
)
await self._repository.commit()
task_id = await self._queue.enqueue( task_id = await self._queue.enqueue(
command={ command={
"command": "run", "command": "run",
@@ -135,6 +173,54 @@ class AgentService:
created=created, created=created,
) )
async def _prepare_user_message(
self,
*,
run_input: RunAgentInput,
current_user: CurrentUser,
) -> tuple[str, dict[str, object] | None]:
text, content_blocks = extract_latest_user_payload(run_input)
attachments: list[dict[str, object]] = []
if self._attachment_storage is not None:
for index, block in enumerate(content_blocks):
if not isinstance(block, dict):
continue
if block.get("type") != "image_url":
continue
image_value = block.get("image_url")
if not isinstance(image_value, dict):
continue
url = image_value.get("url")
if not isinstance(url, str) or not url.startswith("data:"):
continue
decoded = _decode_data_url(url)
if decoded is None:
continue
mime_type, payload = decoded
suffix = _mime_to_suffix(mime_type)
checksum = hashlib.sha1(payload).hexdigest()[:16]
path = (
f"agent-inputs/{current_user.id}/{run_input.thread_id}/"
f"{run_input.run_id}/attachment-{index}-{checksum}.{suffix}"
)
stored_path = await self._attachment_storage.upload_bytes(
bucket=config.storage.bucket,
path=path,
content=payload,
content_type=mime_type,
)
attachments.append(
{
"bucket": config.storage.bucket,
"path": stored_path,
"mimeType": mime_type,
}
)
metadata: dict[str, object] = {}
if attachments:
metadata["attachments"] = attachments
return text, metadata or None
async def enqueue_resume( async def enqueue_resume(
self, self,
*, *,
@@ -340,3 +426,30 @@ class AsrService:
asr_service = AsrService() asr_service = AsrService()
def _decode_data_url(data_url: str) -> tuple[str, bytes] | None:
if not data_url.startswith("data:"):
return None
header, sep, payload = data_url.partition(",")
if not sep:
return None
mime_type = "image/png"
if ";" in header:
maybe_mime = header[5:].split(";", 1)[0].strip()
if maybe_mime:
mime_type = maybe_mime
try:
decoded = base64.b64decode(payload, validate=True)
except ValueError:
return None
return mime_type, decoded
def _mime_to_suffix(mime_type: str) -> str:
mapping = {
"image/png": "png",
"image/jpeg": "jpg",
"image/webp": "webp",
}
return mapping.get(mime_type.lower(), "bin")
@@ -100,3 +100,105 @@ async def test_agent_sse_closed_loop_live() -> None:
) )
) )
assert len(list(rows.scalars().all())) >= 1 assert len(list(rows.scalars().all())) >= 1
@pytest.mark.asyncio
@pytest.mark.live
async def test_agent_runs_events_history_live_with_image_input() -> None:
if os.getenv("AGENT_LIVE_INTEGRATION") != "1":
pytest.skip("set AGENT_LIVE_INTEGRATION=1 to run live integration test")
async with httpx.AsyncClient(timeout=30.0) as client:
token = await _live_access_token(client)
headers = {"Authorization": f"Bearer {token}"}
thread_id = str(uuid4())
run_resp = await client.post(
f"{BASE_URL}/api/v1/agent/runs",
headers=headers,
json={
"threadId": thread_id,
"runId": "run-live-image-1",
"state": {},
"messages": [
{
"id": "u1",
"role": "user",
"content": [
{"type": "text", "text": "请描述图片里的内容"},
{
"type": "binary",
"data": "aGVsbG8=",
"mimeType": "image/png",
},
],
}
],
"tools": [],
"context": [],
"forwardedProps": {},
},
)
assert run_resp.status_code == 202
events_url = f"{BASE_URL}/api/v1/agent/runs/{thread_id}/events"
event_names: list[str] = []
async with client.stream(
"GET", events_url, headers=headers, timeout=20.0
) as sse_resp:
assert sse_resp.status_code == 200
assert sse_resp.headers.get("content-type", "").startswith(
"text/event-stream"
)
async for line in sse_resp.aiter_lines():
if line.startswith("event:"):
event_name = line.split(":", 1)[1].strip()
event_names.append(event_name)
if event_name in {"RUN_FINISHED", "RUN_ERROR"}:
break
assert "RUN_STARTED" in event_names
assert "RUN_FINISHED" in event_names or "RUN_ERROR" in event_names
history_resp = await client.get(
f"{BASE_URL}/api/v1/agent/runs/{thread_id}/history",
headers=headers,
)
assert history_resp.status_code == 200
history = history_resp.json()
assert history.get("type") == "STATE_SNAPSHOT"
snapshot = history.get("snapshot", {})
assert snapshot.get("scope") == "history_day"
messages = snapshot.get("messages", [])
user_messages = [
item
for item in messages
if isinstance(item, dict) and item.get("role") == "user"
]
assert user_messages
attachments = user_messages[0].get("attachments")
assert isinstance(attachments, list)
assert attachments and isinstance(attachments[0], dict)
assert isinstance(attachments[0].get("path"), str)
async with AsyncSessionLocal() as session:
session_row = await session.get(AgentChatSession, UUID(thread_id))
assert session_row is not None
assert session_row.message_count >= 1
assert session_row.total_tokens >= 0
assert session_row.total_cost >= 0
rows = await session.execute(
select(AgentChatMessage).where(
AgentChatMessage.session_id == UUID(thread_id)
)
)
all_messages = list(rows.scalars().all())
assert all_messages
user_rows = [row for row in all_messages if str(row.role) == "user"]
assert user_rows
metadata = user_rows[0].metadata_json or {}
attachments = metadata.get("attachments")
assert isinstance(attachments, list)
assert attachments and isinstance(attachments[0], dict)
assert isinstance(attachments[0].get("path"), str)
@@ -70,3 +70,37 @@ async def test_tool_message_keeps_inline_content_when_storage_payload_missing()
assert payload["toolCallId"] == "call-2" assert payload["toolCallId"] == "call-2"
assert payload["content"] == "inline-tool-content" assert payload["content"] == "inline-tool-content"
@pytest.mark.asyncio
async def test_user_message_snapshot_includes_renderable_attachments() -> None:
repository = AgentRepository(
session=SimpleNamespace(), # type: ignore[arg-type]
)
message = SimpleNamespace(
id=uuid4(),
role=AgentChatMessageRole.USER,
created_at=datetime.now(timezone.utc),
content="请分析这张图",
metadata_json={
"attachments": [
{
"bucket": "agent-chat-attachments",
"path": "agent-inputs/u1/t1/r1/m1/att-1.png",
"mimeType": "image/png",
}
]
},
)
payload = await repository._to_snapshot_message(message) # type: ignore[arg-type]
assert payload["role"] == "user"
assert payload["content"] == "请分析这张图"
assert payload["attachments"] == [
{
"bucket": "agent-chat-attachments",
"path": "agent-inputs/u1/t1/r1/m1/att-1.png",
"mimeType": "image/png",
}
]
+101
View File
@@ -18,6 +18,7 @@ class _FakeRepository:
self.rolled_back = False self.rolled_back = False
self.deleted_session_id: str | None = None self.deleted_session_id: str | None = None
self.created_with_session_id: str | None = None self.created_with_session_id: str | None = None
self.persisted_user_messages: list[dict[str, object]] = []
async def get_session_owner(self, *, session_id: str) -> str: async def get_session_owner(self, *, session_id: str) -> str:
if session_id == "00000000-0000-0000-0000-000000000001": if session_id == "00000000-0000-0000-0000-000000000001":
@@ -56,6 +57,23 @@ class _FakeRepository:
del user_id del user_id
return "00000000-0000-0000-0000-000000000001" return "00000000-0000-0000-0000-000000000001"
async def persist_user_message(
self,
*,
session_id: str,
run_id: str,
content_text: str,
metadata: dict[str, object] | None,
) -> None:
self.persisted_user_messages.append(
{
"session_id": session_id,
"run_id": run_id,
"content_text": content_text,
"metadata": metadata,
}
)
class _FakeQueue: class _FakeQueue:
async def enqueue( async def enqueue(
@@ -83,6 +101,29 @@ class _FakeStream:
] ]
class _FakeAttachmentStorage:
def __init__(self) -> None:
self.calls: list[dict[str, object]] = []
async def upload_bytes(
self,
*,
bucket: str,
path: str,
content: bytes,
content_type: str,
) -> str:
self.calls.append(
{
"bucket": bucket,
"path": path,
"content": content,
"content_type": content_type,
}
)
return path
def _user() -> CurrentUser: def _user() -> CurrentUser:
return CurrentUser( return CurrentUser(
id=UUID("00000000-0000-0000-0000-000000000001"), id=UUID("00000000-0000-0000-0000-000000000001"),
@@ -216,6 +257,66 @@ async def test_enqueue_run_handles_session_create_race() -> None:
assert repository.rolled_back is True assert repository.rolled_back is True
async def test_enqueue_run_uploads_user_image_to_supabase_and_injects_metadata(
monkeypatch,
) -> None:
monkeypatch.setattr(
agent_service_module.config.storage, "bucket", "agent-test-bucket"
)
repository = _FakeRepository()
attachment_storage = _FakeAttachmentStorage()
service = AgentService(
repository=repository,
queue=_FakeQueue(),
stream=_FakeStream(),
attachment_storage=attachment_storage,
)
run_input = RunAgentInput.model_validate(
{
"threadId": "00000000-0000-0000-0000-000000000001",
"runId": "run-with-image",
"state": {},
"messages": [
{
"id": "u1",
"role": "user",
"content": [
{"type": "text", "text": "帮我看下这张图"},
{
"type": "binary",
"data": "aGVsbG8=",
"mimeType": "image/png",
},
],
}
],
"tools": [],
"context": [],
"forwardedProps": {},
}
)
accepted = await service.enqueue_run(run_input=run_input, current_user=_user())
assert accepted.task_id == "task-1"
assert len(attachment_storage.calls) == 1
upload = attachment_storage.calls[0]
assert upload["bucket"] == "agent-test-bucket"
assert upload["content"] == b"hello"
assert upload["content_type"] == "image/png"
assert repository.persisted_user_messages
persisted = repository.persisted_user_messages[0]
assert persisted["session_id"] == "00000000-0000-0000-0000-000000000001"
assert persisted["run_id"] == "run-with-image"
metadata = persisted["metadata"]
assert isinstance(metadata, dict)
attachments = metadata.get("attachments")
assert isinstance(attachments, list)
assert attachments and isinstance(attachments[0], dict)
assert attachments[0]["bucket"] == "agent-test-bucket"
assert isinstance(attachments[0]["path"], str)
async def test_get_history_snapshot_wraps_history_day_as_state_snapshot_event() -> None: async def test_get_history_snapshot_wraps_history_day_as_state_snapshot_event() -> None:
service = AgentService( service = AgentService(
repository=_FakeRepository(), repository=_FakeRepository(),
@@ -0,0 +1,141 @@
# Agent Multimodal Smoke Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 完成 agent 三条主链路(runs/events/history)真实冒烟,并支持 RunAgentInput 图片信息在发送链路落 Supabase Storage、在 messages.metadata 持久化、在 history 返回中可渲染。
**Architecture:**`v1/agent` 服务层新增“用户消息持久化 + 图片附件上传”步骤:`enqueue_run` 时解析用户消息 content block,图片上传到 `config.storage.bucket`,将路径写入 `messages.metadata`。运行时继续通过 AgentScope pipeline 输出 AG-UI 事件,SSE 从 Redis stream 订阅,历史查询从 `messages` 回放并附带附件信息。
**Tech Stack:** FastAPI, SQLAlchemy AsyncSession, Supabase Storage Admin Client, Redis SSE stream, AG-UI, pytest/httpx。
---
### Task 1: 用户消息图片附件上传与落库
**Files:**
- Create: `backend/src/v1/agent/attachment_storage.py`
- Modify: `backend/src/v1/agent/service.py`
- Modify: `backend/src/v1/agent/repository.py`
- Test: `backend/tests/unit/v1/agent/test_service.py`
**Step 1: 写失败测试(RED**
```python
@pytest.mark.asyncio
async def test_enqueue_run_persists_user_message_with_uploaded_image_metadata() -> None:
...
```
**Step 2: 运行单测验证失败**
Run: `uv run pytest tests/unit/v1/agent/test_service.py::test_enqueue_run_persists_user_message_with_uploaded_image_metadata -q`
Expected: FAIL(缺少附件上传/metadata 持久化行为)
**Step 3: 最小实现(GREEN**
```python
class AgentAttachmentStorage:
async def upload_bytes(...):
...
class AgentService:
async def enqueue_run(...):
# 解析 user content blocks
# 上传图片到 storage
# repository 持久化 user message(metadata 包含 bucket/path)
...
```
**Step 4: 运行单测验证通过**
Run: `uv run pytest tests/unit/v1/agent/test_service.py::test_enqueue_run_persists_user_message_with_uploaded_image_metadata -q`
Expected: PASS
### Task 2: history 渲染附件路径
**Files:**
- Modify: `backend/src/v1/agent/repository.py`
- Test: `backend/tests/unit/v1/agent/test_repository.py`
**Step 1: 写失败测试(RED**
```python
@pytest.mark.asyncio
async def test_history_includes_user_message_attachments_from_metadata() -> None:
...
```
**Step 2: 运行测试验证失败**
Run: `uv run pytest tests/unit/v1/agent/test_repository.py::test_history_includes_user_message_attachments_from_metadata -q`
Expected: FAILhistory 尚未渲染 attachments
**Step 3: 最小实现(GREEN**
```python
if role == "user" and isinstance(metadata.get("attachments"), list):
payload["attachments"] = metadata["attachments"]
```
**Step 4: 运行测试验证通过**
Run: `uv run pytest tests/unit/v1/agent/test_repository.py::test_history_includes_user_message_attachments_from_metadata -q`
Expected: PASS
### Task 3: 真实冒烟 runs + SSE + history(含图片输入)
**Files:**
- Modify: `backend/tests/integration/v1/agent/test_sse_flow_live.py`
**Step 1: 写失败测试(RED**
```python
@pytest.mark.asyncio
@pytest.mark.live
async def test_agent_runs_events_history_live_with_image_input() -> None:
...
```
**Step 2: 运行 live 测试验证失败(实现前或环境不完整)**
Run: `AGENT_LIVE_INTEGRATION=1 AGENT_LIVE_EMAIL=... AGENT_LIVE_PASSWORD=... uv run pytest tests/integration/v1/agent/test_sse_flow_live.py::test_agent_runs_events_history_live_with_image_input -q -s`
Expected: FAIL(缺 metadata/path 或 history 不含附件)
**Step 3: 最小实现(GREEN**
```python
# live 测试流程:
# 1) 登录拿 token
# 2) POST /runs 发送 text + image(data)
# 3) SSE 订阅直到 RUN_FINISHED/RUN_ERROR
# 4) GET /runs/{thread_id}/history
# 5) SQL 校验 sessions/messages 字段与 metadata.attachments
```
**Step 4: 运行 live 测试验证通过**
Run: `AGENT_LIVE_INTEGRATION=1 AGENT_LIVE_EMAIL=... AGENT_LIVE_PASSWORD=... uv run pytest tests/integration/v1/agent/test_sse_flow_live.py::test_agent_runs_events_history_live_with_image_input -q -s`
Expected: PASS
### Task 4: 全量收口验证与安全门禁
**Files:**
- Modify (if needed): `backend/src/v1/agent/*`, `backend/tests/*`
**Step 1: 回归测试**
Run: `uv run pytest tests/unit/v1/agent tests/unit/core/agentscope tests/integration/v1/agent -q`
Expected: PASS
**Step 2: 静态检查**
Run: `uv run ruff check src/v1/agent src/core/agentscope tests/unit/v1/agent tests/integration/v1/agent`
Expected: PASS
Run: `uv run basedpyright src/v1/agent src/core/agentscope tests/unit/v1/agent tests/integration/v1/agent`
Expected: 0 errors
**Step 3: 评审门禁**
Run agents: `security-reviewer`, `refactor-cleaner`, `code-reviewer`
Expected: 无未解决 CRITICAL/HIGH