19 KiB
Agent 后端重建 Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: 在后端重建 Agent 运行时,满足队列异步、CrewAI 配置打通、AG-UI 工具中断恢复、LiteLLM 计量、以及 sessions.state_snapshot 持久化要求。
Architecture: v1/agent 仅做 API/鉴权/参数校验与 SSE 输出,core/agent 负责编排与执行。Agent 创建配置由 system_agents(数据库)+ core/config/static/crewai/*.yaml(静态模板)合并生成。run/resume 全链路通过 Celery Worker 执行,状态写入 sessions.state_snapshot。
Tech Stack: FastAPI, Celery, Redis, CrewAI, ag-ui-crewai, LiteLLM, SQLAlchemy, Alembic, pytest
Task 1: 建立配置聚合器(system_agents + static/crewai)
Files:
- Create:
backend/src/core/agent/infrastructure/config/resolver.py - Modify:
backend/src/core/config/static/crewai/agents.yaml - Modify:
backend/src/core/config/static/crewai/tasks.yaml - Create:
backend/src/core/config/static/crewai/workflow.yaml - Create:
backend/src/core/config/static/crewai/tools.yaml - Test:
backend/tests/unit/core/agent/test_config_resolver.py
Step 1: Write the failing test
def test_resolver_merges_system_agents_and_static_templates():
resolved = resolve_agent_runtime_config(...)
assert resolved.intent.llm.model_code == "deepseek-v3.2"
assert "intent" in resolved.workflow_stages
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_config_resolver.py::test_resolver_merges_system_agents_and_static_templates -q
Expected: FAIL with NameError or import not found
Step 3: Write minimal implementation
def resolve_agent_runtime_config(system_agents: list[dict], static_cfg: dict) -> RuntimeConfig:
by_type = {item["agent_type"]: item for item in system_agents}
return RuntimeConfig.from_sources(by_type=by_type, static_cfg=static_cfg)
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_config_resolver.py::test_resolver_merges_system_agents_and_static_templates -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/infrastructure/config/resolver.py backend/src/core/config/static/crewai backend/tests/unit/core/agent/test_config_resolver.py
git commit -m "feat: add system_agents and static crewai config resolver"
Task 2: 统一 LLM Key 与模型配置入口
Files:
- Modify:
backend/src/core/config/settings.py - Modify:
.env.example - Create:
backend/tests/unit/core/config/test_llm_settings.py
Step 1: Write the failing test
def test_llm_keys_read_from_settings(monkeypatch):
monkeypatch.setenv("SOCIAL_LLM__PROVIDER_KEYS__DEEPSEEK", "k1")
s = Settings()
assert s.llm.provider_keys.deepseek == "k1"
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/config/test_llm_settings.py::test_llm_keys_read_from_settings -q
Expected: FAIL with missing llm field
Step 3: Write minimal implementation
class LLMProviderKeys(BaseModel):
deepseek: str | None = None
class LLMSettings(BaseModel):
provider_keys: LLMProviderKeys = LLMProviderKeys()
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/config/test_llm_settings.py::test_llm_keys_read_from_settings -q
Expected: PASS
Step 5: Commit
git add backend/src/core/config/settings.py .env.example backend/tests/unit/core/config/test_llm_settings.py
git commit -m "feat: centralize llm provider keys in settings"
Task 3: sessions 表状态快照契约落地
Files:
- Create:
backend/alembic/versions/20260304_add_sessions_state_snapshot_contract.py - Modify:
backend/src/models/agent_chat_session.py - Create:
backend/tests/unit/database/test_sessions_state_snapshot_contract.py
Step 1: Write the failing test
def test_sessions_has_state_snapshot_column(db_inspector):
columns = db_inspector.get_columns("sessions")
assert "state_snapshot" in [c["name"] for c in columns]
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py::test_sessions_has_state_snapshot_column -q
Expected: FAIL when migration not applied
Step 3: Write minimal implementation
def upgrade() -> None:
op.add_column("sessions", sa.Column("state_snapshot", postgresql.JSONB, nullable=True))
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py::test_sessions_has_state_snapshot_column -q
Expected: PASS
Step 5: Commit
git add backend/alembic/versions/20260304_add_sessions_state_snapshot_contract.py backend/src/models/agent_chat_session.py backend/tests/unit/database/test_sessions_state_snapshot_contract.py
git commit -m "feat(db): enforce sessions state_snapshot contract"
Task 3.1: 会话与消息持久化仓储
Files:
- Create:
backend/src/core/agent/infrastructure/persistence/session_repository.py - Create:
backend/src/core/agent/infrastructure/persistence/message_repository.py - Create:
backend/tests/integration/core/agent/test_session_message_persistence.py
Step 1: Write the failing test
def test_run_persists_user_and_assistant_messages(db_session):
run = execute_run(...)
rows = list_messages(session_id=run.session_id)
assert rows[0].role == "user"
assert rows[1].role == "assistant"
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_session_message_persistence.py::test_run_persists_user_and_assistant_messages -q
Expected: FAIL
Step 3: Write minimal implementation
async def append_message(...):
session.add(AgentChatMessage(...))
async def update_session_aggregate(...):
session_obj.message_count = message_count
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_session_message_persistence.py::test_run_persists_user_and_assistant_messages -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/infrastructure/persistence backend/tests/integration/core/agent/test_session_message_persistence.py
git commit -m "feat: persist session lifecycle and messages for agent runs"
Task 4: 定义 state_snapshot 结构与并发语义
Files:
- Create:
backend/src/core/agent/domain/state_snapshot.py - Create:
backend/tests/unit/core/agent/test_state_snapshot.py
Step 1: Write the failing test
def test_pending_tool_call_snapshot_contains_correlation_fields():
snap = StateSnapshot.new(...)
pending = snap.pending_tool_calls[0]
assert pending.tool_call_id
assert pending.status == "PENDING_APPROVAL"
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_state_snapshot.py::test_pending_tool_call_snapshot_contains_correlation_fields -q
Expected: FAIL
Step 3: Write minimal implementation
class PendingToolCall(BaseModel):
tool_call_id: str
tool_name: str
status: Literal["PENDING_APPROVAL", "APPROVED", "EXECUTED", "REJECTED", "EXPIRED"]
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_state_snapshot.py::test_pending_tool_call_snapshot_contains_correlation_fields -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/domain/state_snapshot.py backend/tests/unit/core/agent/test_state_snapshot.py
git commit -m "feat: define sessions state_snapshot schema for run and tool state"
Task 5: 工具路由策略(前端/后端/审批)
Files:
- Create:
backend/src/core/agent/domain/tool_policy.py - Create:
backend/tests/unit/core/agent/test_tool_policy.py
Step 1: Write the failing test
def test_frontend_tool_requires_interrupt_and_client_execution():
decision = classify_tool_call(name="ui.navigate_to", source="request.tools")
assert decision.mode == "FRONTEND_EXECUTE"
def test_backend_approval_tool_returns_interrupt_but_executes_on_backend_after_approve():
decision = classify_tool_call(name="srv.transfer_funds", requires_approval=True)
assert decision.mode == "BACKEND_APPROVAL_INTERRUPT"
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_policy.py -q
Expected: FAIL
Step 3: Write minimal implementation
if tool_name.startswith("ui."):
return ToolDecision(mode="FRONTEND_EXECUTE")
if requires_approval:
return ToolDecision(mode="BACKEND_APPROVAL_INTERRUPT")
return ToolDecision(mode="BACKEND_DIRECT_EXECUTE")
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_policy.py -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/domain/tool_policy.py backend/tests/unit/core/agent/test_tool_policy.py
git commit -m "feat: add frontend/backend tool policy and approval routing"
Task 6: tool_call 与 tool_result 对账机制
Files:
- Create:
backend/src/core/agent/domain/tool_correlation.py - Create:
backend/tests/unit/core/agent/test_tool_correlation.py
Step 1: Write the failing test
def test_rejects_tool_result_when_tool_call_id_not_pending():
store = PendingToolStore([])
with pytest.raises(ToolCorrelationError):
store.apply_result(tool_call_id="unknown", result={"ok": True})
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_correlation.py::test_rejects_tool_result_when_tool_call_id_not_pending -q
Expected: FAIL
Step 3: Write minimal implementation
def apply_result(self, *, tool_call_id: str, result: dict) -> None:
pending = self._pending.get(tool_call_id)
if pending is None:
raise ToolCorrelationError("tool_call_id not pending")
pending.result = result
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_tool_correlation.py::test_rejects_tool_result_when_tool_call_id_not_pending -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/domain/tool_correlation.py backend/tests/unit/core/agent/test_tool_correlation.py
git commit -m "feat: add tool call/result correlation guard"
Task 7: Celery run/resume 异步任务
Files:
- Create:
backend/src/core/agent/infrastructure/queue/tasks.py - Create:
backend/src/core/agent/application/run_service.py - Create:
backend/src/core/agent/application/resume_service.py - Test:
backend/tests/integration/core/agent/test_queue_run_resume.py
Step 1: Write the failing test
def test_run_api_enqueues_celery_task(client):
resp = client.post("/api/v1/agent/runs", json={...})
assert resp.status_code == 202
def test_resume_updates_session_status_and_snapshot(client):
resp = client.post("/api/v1/agent/runs/r1/resume", json={...})
assert resp.status_code == 202
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_queue_run_resume.py::test_run_api_enqueues_celery_task -q
Expected: FAIL
Step 3: Write minimal implementation
def enqueue_run(cmd: RunCommand) -> str:
task = run_agent_task.apply_async(args=[cmd.model_dump()])
return task.id
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent/test_queue_run_resume.py::test_run_api_enqueues_celery_task -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/application backend/src/core/agent/infrastructure/queue backend/tests/integration/core/agent/test_queue_run_resume.py
git commit -m "feat: add celery-based run and resume tasks"
Task 8: CrewAI 运行时加载与创建
Files:
- Create:
backend/src/core/agent/infrastructure/crewai/runtime.py - Create:
backend/src/core/agent/infrastructure/crewai/factory.py - Test:
backend/tests/unit/core/agent/test_crewai_runtime.py
Step 1: Write the failing test
def test_runtime_creates_agents_tasks_from_resolved_config():
runtime = CrewAIRuntime(...)
crew = runtime.build_crew(message="hello")
assert len(crew.agents) >= 1
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py::test_runtime_creates_agents_tasks_from_resolved_config -q
Expected: FAIL
Step 3: Write minimal implementation
def build_crew(self, *, message: str) -> Crew:
agents = self._factory.build_agents(self._config)
tasks = self._factory.build_tasks(self._config, message=message)
return Crew(agents=agents, tasks=tasks)
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py::test_runtime_creates_agents_tasks_from_resolved_config -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/infrastructure/crewai backend/tests/unit/core/agent/test_crewai_runtime.py
git commit -m "feat: create crewai runtime from resolved config"
Task 9: AG-UI 与 ag-ui-crewai 事件桥
Files:
- Create:
backend/src/core/agent/infrastructure/agui/bridge.py - Create:
backend/src/core/agent/infrastructure/agui/stream.py - Test:
backend/tests/unit/core/agent/test_agui_bridge.py
Step 1: Write the failing test
def test_agui_stream_emits_required_lifecycle():
events = to_agui_events(internal_events=[...])
assert events[0]["type"] == "RUN_STARTED"
assert events[-1]["type"] in {"RUN_FINISHED", "RUN_ERROR"}
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py::test_agui_stream_emits_required_lifecycle -q
Expected: FAIL
Step 3: Write minimal implementation
def to_agui_events(internal_events: list[dict]) -> list[dict]:
return [map_event(e) for e in internal_events]
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py::test_agui_stream_emits_required_lifecycle -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/infrastructure/agui backend/tests/unit/core/agent/test_agui_bridge.py
git commit -m "feat: add ag-ui and ag-ui-crewai event bridge"
Task 10: LiteLLM 调用统计与会话聚合
Files:
- Create:
backend/src/core/agent/infrastructure/litellm/client.py - Create:
backend/src/core/agent/infrastructure/litellm/usage_tracker.py - Test:
backend/tests/unit/core/agent/test_litellm_usage.py
Step 1: Write the failing test
def test_tracker_aggregates_per_call_usage_and_cost():
t = UsageTracker()
t.add({"input_tokens": 10, "output_tokens": 5, "cost": "0.1"})
assert t.snapshot()["total_tokens"] == 15
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_litellm_usage.py::test_tracker_aggregates_per_call_usage_and_cost -q
Expected: FAIL
Step 3: Write minimal implementation
def add(self, usage: dict[str, object]) -> None:
self.input_tokens += int(usage.get("input_tokens", 0))
self.output_tokens += int(usage.get("output_tokens", 0))
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent/test_litellm_usage.py::test_tracker_aggregates_per_call_usage_and_cost -q
Expected: PASS
Step 5: Commit
git add backend/src/core/agent/infrastructure/litellm backend/tests/unit/core/agent/test_litellm_usage.py
git commit -m "feat: add litellm usage and cost tracking"
Task 11: v1/agent 薄层 API + SSE 出口
Files:
- Create:
backend/src/v1/agent/router.py - Create:
backend/src/v1/agent/schemas.py - Create:
backend/src/v1/agent/dependencies.py - Create:
backend/src/v1/agent/service.py - Modify:
backend/src/v1/router.py - Test:
backend/tests/integration/v1/agent/test_routes.py
Step 1: Write the failing test
def test_run_endpoint_returns_sse_and_not_blocking(client):
resp = client.post("/api/v1/agent/runs", json={...})
assert resp.status_code == 202
Step 2: Run test to verify it fails
Run: PYTHONPATH=backend/src uv run pytest backend/tests/integration/v1/agent/test_routes.py::test_run_endpoint_returns_sse_and_not_blocking -q
Expected: FAIL
Step 3: Write minimal implementation
@router.post("/runs", status_code=202)
async def create_run(...):
task_id = service.enqueue_run(input_data)
return {"task_id": task_id}
Step 4: Run test to verify it passes
Run: PYTHONPATH=backend/src uv run pytest backend/tests/integration/v1/agent/test_routes.py::test_run_endpoint_returns_sse_and_not_blocking -q
Expected: PASS
Step 5: Commit
git add backend/src/v1/agent backend/src/v1/router.py backend/tests/integration/v1/agent/test_routes.py
git commit -m "feat: add thin v1 agent api and sse endpoints"
Task 12: 端到端验证与文档回填
Files:
- Modify:
docs/runtime/runtime-route.md - Modify:
docs/runtime/runtime-runbook.md
Step 1: Run unit tests
Run: PYTHONPATH=backend/src uv run pytest backend/tests/unit/core/agent backend/tests/unit/core/config backend/tests/unit/database -q
Expected: PASS
Step 2: Run integration tests
Run: PYTHONPATH=backend/src uv run pytest backend/tests/integration/core/agent backend/tests/integration/v1/agent -q
Expected: PASS
Step 3: Run lint and typecheck
Run: PYTHONPATH=backend/src uv run ruff check backend/src backend/tests
Expected: PASS
Run: PYTHONPATH=backend/src uv run basedpyright backend/src
Expected: PASS
Step 4: Document protocol contracts
在运行手册中补充以下固定规则:
system_agents+static/crewai配置合并优先级。sessions.state_snapshot字段结构与版本号。messages入库顺序与sessions聚合字段更新规则。- 工具调用审批与恢复时序图。
- tool_call/result 不匹配时的错误语义(
RUN_ERROR+ 可审计日志)。
Step 5: Commit
git add docs/runtime/runtime-route.md docs/runtime/runtime-runbook.md
git commit -m "docs: add new agent runtime contracts and operational guide"
Success Criteria
- Agent 创建配置由
system_agents与core/config/static/crewai合并生成。 - run/resume 仅通过 Celery Worker 执行,Web 不执行编排。
v1/agent无业务编排代码。sessions.state_snapshot承担运行态和工具审批恢复状态。- 每次 run/resume 的会话状态变更均落库到
sessions。 - 用户/助手/工具消息按
messages约束落库,seq单调递增。 - 前端工具与后端工具(审批/非审批)策略完整可测。
- tool_call 与 tool_result 具备强关联校验并可恢复/报错。
- LiteLLM 逐次计量与 run 聚合可落库。