# Agent 后端重建 Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** 在后端重建 Agent 运行时,满足队列异步、CrewAI 配置打通、AG-UI 工具中断恢复、LiteLLM 计量、以及 `sessions.state_snapshot` 持久化要求。 **Architecture:** `v1/agent` 仅做 API/鉴权/参数校验与 SSE 输出,`core/agent` 负责编排与执行。Agent 创建配置由 `system_agents`(数据库)+ `core/config/static/crewai/*.yaml`(静态模板)合并生成。run/resume 全链路通过 Celery Worker 执行,状态写入 `sessions.state_snapshot`。 **Tech Stack:** FastAPI, Celery, Redis, CrewAI, ag-ui-crewai, LiteLLM, SQLAlchemy, Alembic, pytest --- ### Task 1: 建立配置聚合器(system_agents + static/crewai) **Files:** - Create: `backend/src/core/agent/infrastructure/config/resolver.py` - Modify: `backend/src/core/config/static/crewai/agents.yaml` - Modify: `backend/src/core/config/static/crewai/tasks.yaml` - Create: `backend/src/core/config/static/crewai/workflow.yaml` - Create: `backend/src/core/config/static/crewai/tools.yaml` - Test: `backend/tests/unit/core/agent/test_config_resolver.py` **Step 1: Write the failing test** ```python def test_resolver_merges_system_agents_and_static_templates(): resolved = resolve_agent_runtime_config(...) assert resolved.intent.llm.model_code == "deepseek-v3.2" assert "intent" in resolved.workflow_stages ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_config_resolver.py::test_resolver_merges_system_agents_and_static_templates -q` Expected: FAIL with `NameError` or import not found **Step 3: Write minimal implementation** ```python def resolve_agent_runtime_config(system_agents: list[dict], static_cfg: dict) -> RuntimeConfig: by_type = {item["agent_type"]: item for item in system_agents} return RuntimeConfig.from_sources(by_type=by_type, static_cfg=static_cfg) ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_config_resolver.py::test_resolver_merges_system_agents_and_static_templates -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure/config/resolver.py backend/src/core/config/static/crewai backend/tests/unit/core/agent/test_config_resolver.py git commit -m "feat: add system_agents and static crewai config resolver" ``` ### Task 2: 统一 LLM Key 与模型配置入口 **Files:** - Modify: `backend/src/core/config/settings.py` - Modify: `.env.example` - Create: `backend/tests/unit/core/config/test_llm_settings.py` **Step 1: Write the failing test** ```python def test_llm_keys_read_from_settings(monkeypatch): monkeypatch.setenv("SOCIAL_LLM__PROVIDER_KEYS__DEEPSEEK", "k1") s = Settings() assert s.llm.provider_keys.deepseek == "k1" ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/config/test_llm_settings.py::test_llm_keys_read_from_settings -q` Expected: FAIL with missing `llm` field **Step 3: Write minimal implementation** ```python class LLMProviderKeys(BaseModel): deepseek: str | None = None class LLMSettings(BaseModel): provider_keys: LLMProviderKeys = LLMProviderKeys() ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/config/test_llm_settings.py::test_llm_keys_read_from_settings -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/config/settings.py .env.example backend/tests/unit/core/config/test_llm_settings.py git commit -m "feat: centralize llm provider keys in settings" ``` ### Task 3: sessions 表状态快照契约落地 **Files:** - Create: `backend/alembic/versions/20260304_add_sessions_state_snapshot_contract.py` - Modify: `backend/src/models/agent_chat_session.py` - Create: `backend/tests/unit/database/test_sessions_state_snapshot_contract.py` **Step 1: Write the failing test** ```python def test_sessions_has_state_snapshot_column(db_inspector): columns = db_inspector.get_columns("sessions") assert "state_snapshot" in [c["name"] for c in columns] ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py::test_sessions_has_state_snapshot_column -q` Expected: FAIL when migration not applied **Step 3: Write minimal implementation** ```python def upgrade() -> None: op.add_column("sessions", sa.Column("state_snapshot", postgresql.JSONB, nullable=True)) ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py::test_sessions_has_state_snapshot_column -q` Expected: PASS **Step 5: Commit** ```bash git add backend/alembic/versions/20260304_add_sessions_state_snapshot_contract.py backend/src/models/agent_chat_session.py backend/tests/unit/database/test_sessions_state_snapshot_contract.py git commit -m "feat(db): enforce sessions state_snapshot contract" ``` ### Task 3.1: 会话与消息持久化仓储 **Files:** - Create: `backend/src/core/agent/infrastructure/persistence/session_repository.py` - Create: `backend/src/core/agent/infrastructure/persistence/message_repository.py` - Create: `backend/tests/integration/core/agent/test_session_message_persistence.py` **Step 1: Write the failing test** ```python def test_run_persists_user_and_assistant_messages(db_session): run = execute_run(...) rows = list_messages(session_id=run.session_id) assert rows[0].role == "user" assert rows[1].role == "assistant" ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_session_message_persistence.py::test_run_persists_user_and_assistant_messages -q` Expected: FAIL **Step 3: Write minimal implementation** ```python async def append_message(...): session.add(AgentChatMessage(...)) async def update_session_aggregate(...): session_obj.message_count = message_count ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_session_message_persistence.py::test_run_persists_user_and_assistant_messages -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure/persistence backend/tests/integration/core/agent/test_session_message_persistence.py git commit -m "feat: persist session lifecycle and messages for agent runs" ``` ### Task 4: 定义 state_snapshot 结构与并发语义 **Files:** - Create: `backend/src/core/agent/domain/state_snapshot.py` - Create: `backend/tests/unit/core/agent/test_state_snapshot.py` **Step 1: Write the failing test** ```python def test_pending_tool_call_snapshot_contains_correlation_fields(): snap = StateSnapshot.new(...) pending = snap.pending_tool_calls[0] assert pending.tool_call_id assert pending.status == "PENDING_APPROVAL" ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_state_snapshot.py::test_pending_tool_call_snapshot_contains_correlation_fields -q` Expected: FAIL **Step 3: Write minimal implementation** ```python class PendingToolCall(BaseModel): tool_call_id: str tool_name: str status: Literal["PENDING_APPROVAL", "APPROVED", "EXECUTED", "REJECTED", "EXPIRED"] ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_state_snapshot.py::test_pending_tool_call_snapshot_contains_correlation_fields -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/domain/state_snapshot.py backend/tests/unit/core/agent/test_state_snapshot.py git commit -m "feat: define sessions state_snapshot schema for run and tool state" ``` ### Task 5: 工具路由策略(前端/后端/审批) **Files:** - Create: `backend/src/core/agent/domain/tool_policy.py` - Create: `backend/tests/unit/core/agent/test_tool_policy.py` **Step 1: Write the failing test** ```python def test_frontend_tool_requires_interrupt_and_client_execution(): decision = classify_tool_call(name="ui.navigate_to", source="request.tools") assert decision.mode == "FRONTEND_EXECUTE" def test_backend_approval_tool_returns_interrupt_but_executes_on_backend_after_approve(): decision = classify_tool_call(name="srv.transfer_funds", requires_approval=True) assert decision.mode == "BACKEND_APPROVAL_INTERRUPT" ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_policy.py -q` Expected: FAIL **Step 3: Write minimal implementation** ```python if tool_name.startswith("ui."): return ToolDecision(mode="FRONTEND_EXECUTE") if requires_approval: return ToolDecision(mode="BACKEND_APPROVAL_INTERRUPT") return ToolDecision(mode="BACKEND_DIRECT_EXECUTE") ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_policy.py -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/domain/tool_policy.py backend/tests/unit/core/agent/test_tool_policy.py git commit -m "feat: add frontend/backend tool policy and approval routing" ``` ### Task 6: tool_call 与 tool_result 对账机制 **Files:** - Create: `backend/src/core/agent/domain/tool_correlation.py` - Create: `backend/tests/unit/core/agent/test_tool_correlation.py` **Step 1: Write the failing test** ```python def test_rejects_tool_result_when_tool_call_id_not_pending(): store = PendingToolStore([]) with pytest.raises(ToolCorrelationError): store.apply_result(tool_call_id="unknown", result={"ok": True}) ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_correlation.py::test_rejects_tool_result_when_tool_call_id_not_pending -q` Expected: FAIL **Step 3: Write minimal implementation** ```python def apply_result(self, *, tool_call_id: str, result: dict) -> None: pending = self._pending.get(tool_call_id) if pending is None: raise ToolCorrelationError("tool_call_id not pending") pending.result = result ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_correlation.py::test_rejects_tool_result_when_tool_call_id_not_pending -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/domain/tool_correlation.py backend/tests/unit/core/agent/test_tool_correlation.py git commit -m "feat: add tool call/result correlation guard" ``` ### Task 7: Celery run/resume 异步任务 **Files:** - Create: `backend/src/core/agent/infrastructure/queue/tasks.py` - Create: `backend/src/core/agent/application/run_service.py` - Create: `backend/src/core/agent/application/resume_service.py` - Test: `backend/tests/integration/core/agent/test_queue_run_resume.py` **Step 1: Write the failing test** ```python def test_run_api_enqueues_celery_task(client): resp = client.post("/api/v1/agent/runs", json={...}) assert resp.status_code == 202 def test_resume_updates_session_status_and_snapshot(client): resp = client.post("/api/v1/agent/runs/r1/resume", json={...}) assert resp.status_code == 202 ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_queue_run_resume.py::test_run_api_enqueues_celery_task -q` Expected: FAIL **Step 3: Write minimal implementation** ```python def enqueue_run(cmd: RunCommand) -> str: task = run_agent_task.apply_async(args=[cmd.model_dump()]) return task.id ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_queue_run_resume.py::test_run_api_enqueues_celery_task -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/application backend/src/core/agent/infrastructure/queue backend/tests/integration/core/agent/test_queue_run_resume.py git commit -m "feat: add celery-based run and resume tasks" ``` ### Task 8: CrewAI 运行时加载与创建 **Files:** - Create: `backend/src/core/agent/infrastructure/crewai/runtime.py` - Create: `backend/src/core/agent/infrastructure/crewai/factory.py` - Test: `backend/tests/unit/core/agent/test_crewai_runtime.py` **Step 1: Write the failing test** ```python def test_runtime_creates_agents_tasks_from_resolved_config(): runtime = CrewAIRuntime(...) crew = runtime.build_crew(message="hello") assert len(crew.agents) >= 1 ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py::test_runtime_creates_agents_tasks_from_resolved_config -q` Expected: FAIL **Step 3: Write minimal implementation** ```python def build_crew(self, *, message: str) -> Crew: agents = self._factory.build_agents(self._config) tasks = self._factory.build_tasks(self._config, message=message) return Crew(agents=agents, tasks=tasks) ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py::test_runtime_creates_agents_tasks_from_resolved_config -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure/crewai backend/tests/unit/core/agent/test_crewai_runtime.py git commit -m "feat: create crewai runtime from resolved config" ``` ### Task 9: AG-UI 与 ag-ui-crewai 事件桥 **Files:** - Create: `backend/src/core/agent/infrastructure/agui/bridge.py` - Create: `backend/src/core/agent/infrastructure/agui/stream.py` - Test: `backend/tests/unit/core/agent/test_agui_bridge.py` **Step 1: Write the failing test** ```python def test_agui_stream_emits_required_lifecycle(): events = to_agui_events(internal_events=[...]) assert events[0]["type"] == "RUN_STARTED" assert events[-1]["type"] in {"RUN_FINISHED", "RUN_ERROR"} ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py::test_agui_stream_emits_required_lifecycle -q` Expected: FAIL **Step 3: Write minimal implementation** ```python def to_agui_events(internal_events: list[dict]) -> list[dict]: return [map_event(e) for e in internal_events] ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py::test_agui_stream_emits_required_lifecycle -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure/agui backend/tests/unit/core/agent/test_agui_bridge.py git commit -m "feat: add ag-ui and ag-ui-crewai event bridge" ``` ### Task 10: LiteLLM 调用统计与会话聚合 **Files:** - Create: `backend/src/core/agent/infrastructure/litellm/client.py` - Create: `backend/src/core/agent/infrastructure/litellm/usage_tracker.py` - Test: `backend/tests/unit/core/agent/test_litellm_usage.py` **Step 1: Write the failing test** ```python def test_tracker_aggregates_per_call_usage_and_cost(): t = UsageTracker() t.add({"input_tokens": 10, "output_tokens": 5, "cost": "0.1"}) assert t.snapshot()["total_tokens"] == 15 ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_litellm_usage.py::test_tracker_aggregates_per_call_usage_and_cost -q` Expected: FAIL **Step 3: Write minimal implementation** ```python def add(self, usage: dict[str, object]) -> None: self.input_tokens += int(usage.get("input_tokens", 0)) self.output_tokens += int(usage.get("output_tokens", 0)) ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_litellm_usage.py::test_tracker_aggregates_per_call_usage_and_cost -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure/litellm backend/tests/unit/core/agent/test_litellm_usage.py git commit -m "feat: add litellm usage and cost tracking" ``` ### Task 11: v1/agent 薄层 API + SSE 出口 **Files:** - Create: `backend/src/v1/agent/router.py` - Create: `backend/src/v1/agent/schemas.py` - Create: `backend/src/v1/agent/dependencies.py` - Create: `backend/src/v1/agent/service.py` - Modify: `backend/src/v1/router.py` - Test: `backend/tests/integration/v1/agent/test_routes.py` **Step 1: Write the failing test** ```python def test_run_endpoint_returns_sse_and_not_blocking(client): resp = client.post("/api/v1/agent/runs", json={...}) assert resp.status_code == 202 ``` **Step 2: Run test to verify it fails** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/integration/v1/agent/test_routes.py::test_run_endpoint_returns_sse_and_not_blocking -q` Expected: FAIL **Step 3: Write minimal implementation** ```python @router.post("/runs", status_code=202) async def create_run(...): task_id = service.enqueue_run(input_data) return {"task_id": task_id} ``` **Step 4: Run test to verify it passes** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/integration/v1/agent/test_routes.py::test_run_endpoint_returns_sse_and_not_blocking -q` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/agent backend/src/v1/router.py backend/tests/integration/v1/agent/test_routes.py git commit -m "feat: add thin v1 agent api and sse endpoints" ``` ### Task 12: 端到端验证与文档回填 **Files:** - Modify: `docs/runtime/runtime-route.md` - Modify: `docs/runtime/runtime-runbook.md` **Step 1: Run unit tests** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent backend/tests/unit/core/config backend/tests/unit/database -q` Expected: PASS **Step 2: Run integration tests** Run: `PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent backend/tests/integration/v1/agent -q` Expected: PASS **Step 3: Run lint and typecheck** Run: `PYTHONPATH=backend/src uv run ruff check backend/src backend/tests` Expected: PASS Run: `PYTHONPATH=backend/src uv run basedpyright backend/src` Expected: PASS **Step 4: Document protocol contracts** 在运行手册中补充以下固定规则: - `system_agents` + `static/crewai` 配置合并优先级。 - `sessions.state_snapshot` 字段结构与版本号。 - `messages` 入库顺序与 `sessions` 聚合字段更新规则。 - 工具调用审批与恢复时序图。 - tool_call/result 不匹配时的错误语义(`RUN_ERROR` + 可审计日志)。 **Step 5: Commit** ```bash git add docs/runtime/runtime-route.md docs/runtime/runtime-runbook.md git commit -m "docs: add new agent runtime contracts and operational guide" ``` ## Success Criteria - [ ] Agent 创建配置由 `system_agents` 与 `core/config/static/crewai` 合并生成。 - [ ] run/resume 仅通过 Celery Worker 执行,Web 不执行编排。 - [ ] `v1/agent` 无业务编排代码。 - [ ] `sessions.state_snapshot` 承担运行态和工具审批恢复状态。 - [ ] 每次 run/resume 的会话状态变更均落库到 `sessions`。 - [ ] 用户/助手/工具消息按 `messages` 约束落库,`seq` 单调递增。 - [ ] 前端工具与后端工具(审批/非审批)策略完整可测。 - [ ] tool_call 与 tool_result 具备强关联校验并可恢复/报错。 - [ ] LiteLLM 逐次计量与 run 聚合可落库。