From c07d339a5f450cadc5e212ed3ace25a48c715e16 Mon Sep 17 00:00:00 2001 From: qzl Date: Thu, 5 Mar 2026 15:43:58 +0800 Subject: [PATCH] docs(plan): add closed-loop implementation plan --- ...runtime-closed-loop-implementation-plan.md | 469 ++++++++++++++++++ 1 file changed, 469 insertions(+) create mode 100644 docs/plans/2026-03-05-agent-runtime-closed-loop-implementation-plan.md diff --git a/docs/plans/2026-03-05-agent-runtime-closed-loop-implementation-plan.md b/docs/plans/2026-03-05-agent-runtime-closed-loop-implementation-plan.md new file mode 100644 index 0000000..ea20286 --- /dev/null +++ b/docs/plans/2026-03-05-agent-runtime-closed-loop-implementation-plan.md @@ -0,0 +1,469 @@ +# Agent Runtime Closed Loop Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Build a production-grade closed-loop agent runtime where `frontend -> FastAPI -> Celery -> run/resume service -> CrewAI -> AG-UI events -> Redis Stream -> SSE` is fully connected and verifiable. + +**Architecture:** Keep HTTP API as control-plane and worker as data-plane. The API validates auth/ownership and enqueues commands, the Celery worker executes run/resume business logic using DB-driven agent config, runtime emits normalized AG-UI events and usage/cost telemetry, all events are persisted to Redis Stream, and SSE endpoint streams from Redis with resume support (`Last-Event-ID`). + +**Tech Stack:** FastAPI, SQLAlchemy AsyncSession, Celery, Redis Streams, CrewAI, LiteLLM, Pydantic, pytest (unit/integration). + +**Confirmed Constraints (locked):** +- Persist semantics use existing `messages.role` only (`assistant|user|system|tool`), no new `message_kind` column. +- `tool_result` must be semantically complete (especially UI schema); do not store summary-only payload. +- Store full `tool_result` payload in Supabase Storage (private bucket) and persist durable object reference in DB metadata; do not rely on expiring signed URL as primary reference. +- `metadata` must be fixed and typed via Pydantic model (no free-form drift). +- Do not introduce additional business tables for this scope; keep schema minimal. +- CrewAI runtime must default to streaming mode. +- Full traceability target is final semantic reconstruction of `user/assistant/tool_result`; chunk-level replay is not required. + +**Metadata Contract (fixed, Pydantic-enforced):** +- Global required keys for all message metadata: `type`, `run_id`, `turn_id`. +- Global optional keys for all message metadata: `event_id`, `parent_message_id`, `error`. +- `type=user_input`: + - Required: `type`, `run_id`, `turn_id`. + - Optional: `input_source`, `client_ts`. +- `type=assistant_output`: + - Required: `type`, `run_id`, `turn_id`. + - Optional: `finish_reason`, `model_provider`, `cost_source`. +- `type=tool_call` (`role=assistant`): + - Required: `type`, `run_id`, `turn_id`, `tool_call_id`, `tool_name`, `tool_args`. + - Optional: `tool_schema_version`, `timeout_ms`. +- `type=tool_result` (`role=tool`): + - Required: `type`, `run_id`, `turn_id`, `tool_call_id`, `tool_name`, `storage_bucket`, `storage_path`, `payload_sha256`, `payload_bytes`, `payload_format`. + - Optional: `ui_schema_version`, `compression`, `storage_etag`, `render_hints`. +- Validation rules: + - `messages.role=tool` must use `metadata.type=tool_result`. + - `messages.role=assistant` + tool event must use `metadata.type=tool_call` or `assistant_output`. + - `tool_result` payload in DB must be reconstructable to AG-UI `TOOL_CALL_RESULT` using Storage object + metadata checksum. + +--- + +### Task 1: Add Agent Module Skeleton and Contracts + +**Files:** +- Create: `backend/src/core/agent/__init__.py` +- Create: `backend/src/core/agent/application/__init__.py` +- Create: `backend/src/core/agent/domain/__init__.py` +- Create: `backend/src/core/agent/infrastructure/events/__init__.py` +- Create: `backend/src/core/agent/infrastructure/agui/bridge.py` +- Create: `backend/src/core/agent/infrastructure/agui/stream.py` +- Test: `backend/tests/unit/core/agent/test_agui_bridge.py` + +**Step 1: Write failing tests for event normalization and SSE formatting** + +```python +def test_bridge_normalizes_event_type_to_upper_snake() -> None: + events = [{"type": "runStarted", "data": {"ok": True}}] + out = to_agui_events(events) + assert out[0]["type"] == "RUN_STARTED" + + +def test_sse_format_includes_id_event_data() -> None: + payload = to_sse_event(stream_id="1-0", event={"type": "RUN_STARTED", "data": {"a": 1}}) + assert payload.startswith("id: 1-0\nevent: RUN_STARTED\ndata: {") +``` + +**Step 2: Run tests and confirm RED** + +Run: `uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py -q` +Expected: FAIL with missing module/function errors. + +**Step 3: Implement minimal bridge + stream utilities** + +```python +def to_agui_events(internal_events: list[dict[str, Any]]) -> list[dict[str, Any]]: + ... + + +def to_sse_event(stream_id: str, event: dict[str, Any]) -> str: + ... +``` + +**Step 4: Run tests and confirm GREEN** + +Run: `uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py -q` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add backend/src/core/agent backend/tests/unit/core/agent/test_agui_bridge.py +git commit -m "feat(agent): add ag-ui bridge and sse serializer utilities" +``` + +### Task 2: Implement Redis Stream Event Store and Reader + +**Files:** +- Create: `backend/src/core/agent/infrastructure/events/redis_stream.py` +- Modify: `backend/src/core/config/settings.py` +- Test: `backend/tests/unit/core/agent/test_redis_stream.py` + +**Step 1: Write failing tests for append/read semantics** + +```python +def test_append_event_writes_json_payload() -> None: + ... + + +def test_read_events_respects_last_event_id() -> None: + ... +``` + +**Step 2: Run RED** + +Run: `uv run pytest backend/tests/unit/core/agent/test_redis_stream.py -q` +Expected: FAIL. + +**Step 3: Implement Redis stream adapter** + +```python +def append_event_sync(*, session_id: UUID, event: dict[str, Any]) -> str: + ... + + +async def read_events(...): + ... +``` + +**Step 4: Run GREEN** + +Run: `uv run pytest backend/tests/unit/core/agent/test_redis_stream.py -q` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add backend/src/core/agent/infrastructure/events/redis_stream.py backend/src/core/config/settings.py backend/tests/unit/core/agent/test_redis_stream.py +git commit -m "feat(agent): add redis stream event transport for run events" +``` + +### Task 3: Build CrewAI Runtime + AG-UI Event Mapping + Usage Tracking + +**Files:** +- Create: `backend/src/core/agent/infrastructure/crewai/factory.py` +- Create: `backend/src/core/agent/infrastructure/crewai/runtime.py` +- Create: `backend/src/core/agent/infrastructure/litellm/client.py` +- Create: `backend/src/core/agent/infrastructure/litellm/usage_tracker.py` +- Create: `backend/src/core/agent/infrastructure/config/resolver.py` +- Modify: `backend/src/core/config/settings.py` +- Test: `backend/tests/unit/core/agent/test_crewai_runtime.py` +- Test: `backend/tests/unit/core/agent/test_litellm_usage.py` +- Test: `backend/tests/unit/core/agent/test_config_resolver.py` + +**Step 1: Write failing runtime tests (events + cost + strict errors)** + +```python +def test_runtime_emits_text_tool_reasoning_events() -> None: + ... + + +def test_runtime_raises_if_model_or_api_key_missing() -> None: + ... + + +def test_usage_tracker_extracts_tokens_and_cost() -> None: + ... +``` + +**Step 2: Run RED** + +Run: `uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py backend/tests/unit/core/agent/test_litellm_usage.py backend/tests/unit/core/agent/test_config_resolver.py -q` +Expected: FAIL. + +**Step 3: Implement runtime and tracker** + +- Register CrewAI event handlers (`Task/LLM/Tool/Reasoning`) and map to AG-UI canonical event types. +- Default runtime to streaming mode for CrewAI execution. +- Enforce strict config behavior: no `llm_model_code` or provider key -> raise. +- Use LiteLLM cost calculator for actual cost; if cost cannot be computed, fail closed (raise), do not silently record zero. + +**Step 4: Run GREEN** + +Run: `uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py backend/tests/unit/core/agent/test_litellm_usage.py backend/tests/unit/core/agent/test_config_resolver.py -q` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add backend/src/core/agent/infrastructure backend/tests/unit/core/agent/test_crewai_runtime.py backend/tests/unit/core/agent/test_litellm_usage.py backend/tests/unit/core/agent/test_config_resolver.py backend/src/core/config/settings.py +git commit -m "feat(agent): implement crewai runtime events and litellm usage-cost auditing" +``` + +### Task 4: Implement Run/Resume Application Services (DB Config + Persistence) + +**Files:** +- Create: `backend/src/core/agent/application/run_service.py` +- Create: `backend/src/core/agent/application/resume_service.py` +- Create: `backend/src/core/agent/application/session_state_persistence.py` +- Create: `backend/src/core/agent/domain/state_snapshot.py` +- Create: `backend/src/core/agent/domain/tool_correlation.py` +- Test: `backend/tests/unit/core/agent/test_run_resume_service.py` +- Test: `backend/tests/unit/core/agent/test_state_snapshot.py` +- Test: `backend/tests/unit/core/agent/test_tool_correlation.py` + +**Step 1: Write failing tests for DB-driven runtime and aggregate updates** + +```python +async def test_run_service_loads_agent_config_from_db_and_persists_messages() -> None: + ... + + +async def test_resume_service_requires_pending_tool_call() -> None: + ... +``` + +**Step 2: Run RED** + +Run: `uv run pytest backend/tests/unit/core/agent/test_run_resume_service.py backend/tests/unit/core/agent/test_state_snapshot.py backend/tests/unit/core/agent/test_tool_correlation.py -q` +Expected: FAIL. + +**Step 3: Implement services** + +- `run_service`: read session + system agent config from DB, execute runtime, persist user/assistant messages, update session aggregates. +- `resume_service`: validate pending tool call status, enforce idempotency semantics, resume runtime, persist audit fields. +- Persist metadata audit (`tokens`, `cost`, `cost_source`, correlation ids) for every assistant message. +- Persist tool lifecycle with role-only model: + - tool call message uses `role=assistant` with fixed metadata (`type=tool_call`, `tool_call_id`, `tool_name`, arguments reference). + - tool result message uses `role=tool` with fixed metadata (`type=tool_result`, `tool_call_id`, `tool_name`, storage bucket/path, checksum, bytes, schema version). +- `tool_result` full payload (UI schema) is uploaded to Supabase Storage private bucket; DB stores durable reference and verification fields. +- Ensure DB->AG-UI `TOOL_CALL_RESULT` reconstruction is equivalent to SSE-streamed final tool result semantics. +- Enforce metadata contract by Pydantic model at write path and read path (reject malformed metadata early). + +**Step 4: Run GREEN** + +Run: `uv run pytest backend/tests/unit/core/agent/test_run_resume_service.py backend/tests/unit/core/agent/test_state_snapshot.py backend/tests/unit/core/agent/test_tool_correlation.py -q` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add backend/src/core/agent/application backend/src/core/agent/domain backend/tests/unit/core/agent/test_run_resume_service.py backend/tests/unit/core/agent/test_state_snapshot.py backend/tests/unit/core/agent/test_tool_correlation.py +git commit -m "feat(agent): add run-resume app services with db config and audit persistence" +``` + +### Task 5: Wire Celery Worker Task to Run/Resume and Publish Runtime Events + +**Files:** +- Create: `backend/src/core/agent/infrastructure/queue/tasks.py` +- Modify: `backend/src/core/celery/app.py` +- Test: `backend/tests/unit/core/agent/test_queue_tasks.py` +- Test: `backend/tests/integration/core/agent/test_queue_run_resume.py` + +**Step 1: Write failing queue tests** + +```python +def test_run_agent_task_emits_started_runtime_and_finished_events() -> None: + ... + + +def test_run_agent_task_emits_error_event_on_exception() -> None: + ... +``` + +**Step 2: Run RED** + +Run: `uv run pytest backend/tests/unit/core/agent/test_queue_tasks.py backend/tests/integration/core/agent/test_queue_run_resume.py -q` +Expected: FAIL. + +**Step 3: Implement worker task flow** + +- Decode command type (`run`/`resume`). +- Emit lifecycle events (`RUN_STARTED/RUN_RESUMED/RUN_FINISHED/RUN_ERROR`). +- Forward runtime callback events to Redis stream immediately. +- Persist session status/snapshot after completion. + +**Step 4: Run GREEN** + +Run: `uv run pytest backend/tests/unit/core/agent/test_queue_tasks.py backend/tests/integration/core/agent/test_queue_run_resume.py -q` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add backend/src/core/agent/infrastructure/queue/tasks.py backend/src/core/celery/app.py backend/tests/unit/core/agent/test_queue_tasks.py backend/tests/integration/core/agent/test_queue_run_resume.py +git commit -m "feat(agent): wire celery run-resume execution and redis event publishing" +``` + +### Task 6: Implement API Contracts (Run/Resume/SSE) + Auth/Ownership/Idempotency + +**Files:** +- Create: `backend/src/v1/agent/schemas.py` +- Create: `backend/src/v1/agent/repository.py` +- Create: `backend/src/v1/agent/service.py` +- Create: `backend/src/v1/agent/router.py` +- Create: `backend/src/v1/agent/dependencies.py` +- Modify: `backend/src/v1/router.py` +- Test: `backend/tests/unit/v1/agent/test_service.py` +- Test: `backend/tests/unit/v1/agent/test_owner_guard.py` +- Test: `backend/tests/integration/v1/agent/test_routes.py` + +**Step 1: Write failing API tests** + +```python +async def test_run_requires_auth_and_returns_202_task_id() -> None: + ... + + +async def test_stream_reads_from_last_event_id() -> None: + ... + + +def test_resume_idempotency_uses_redis_lock_and_task_key() -> None: + ... +``` + +**Step 2: Run RED** + +Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py backend/tests/unit/v1/agent/test_owner_guard.py backend/tests/integration/v1/agent/test_routes.py -q` +Expected: FAIL. + +**Step 3: Implement API service/router** + +- `POST /api/v1/agent/runs` enqueue run command. +- `POST /api/v1/agent/runs/{session_id}/resume` enqueue resume command with async redis lock + dedup task key. +- `GET /api/v1/agent/runs/{session_id}/events` SSE stream from Redis with `Last-Event-ID`. +- Enforce auth and session ownership checks on all endpoints. +- Validate `tool_call_id` and message length/pattern boundaries. + +**Step 4: Run GREEN** + +Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py backend/tests/unit/v1/agent/test_owner_guard.py backend/tests/integration/v1/agent/test_routes.py -q` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add backend/src/v1/agent backend/src/v1/router.py backend/tests/unit/v1/agent backend/tests/integration/v1/agent/test_routes.py +git commit -m "feat(agent): add authenticated run-resume-sse api with redis-backed idempotency" +``` + +### Task 7: Add Schema/Migration Contract for Session Snapshot + Audit Fields + +**Files:** +- Create: `backend/alembic/versions/20260305_agent_runtime_closed_loop_contract.py` +- Modify: `backend/src/models/agent_chat_session.py` +- Modify: `backend/src/models/agent_chat_message.py` +- Test: `backend/tests/unit/database/test_sessions_state_snapshot_contract.py` + +**Migration scope note:** +- Fix current schema drift: model has `sessions.state_snapshot` but migration chain does not reliably provide this column in current DB state. +- Keep schema minimal; do not add new business tables in this migration. + +**Step 1: Write failing migration contract tests** + +```python +def test_session_has_state_snapshot_and_status_contract() -> None: + ... + + +def test_message_has_token_cost_and_metadata_contract() -> None: + ... +``` + +**Step 2: Run RED** + +Run: `uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py -q` +Expected: FAIL. + +**Step 3: Implement migration and model alignment** + +- Ensure `state_snapshot`, `status`, token/cost/metadata fields are present and nullable constraints are explicit. +- Add/verify indexes needed for role-based semantic reconstruction (`session_id, seq`, and targeted metadata lookups if required). +- Ensure `metadata` structure is validated by fixed Pydantic schema at application boundary. +- Add DB-level guardrails where feasible (check constraints) for role/metadata consistency without introducing new tables. +- Keep reversible downgrade path. + +**Step 4: Run GREEN** + +Run: `uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py -q` +Expected: PASS. + +**Step 5: Commit** + +```bash +git add backend/alembic/versions/20260305_agent_runtime_closed_loop_contract.py backend/src/models/agent_chat_session.py backend/src/models/agent_chat_message.py backend/tests/unit/database/test_sessions_state_snapshot_contract.py +git commit -m "feat(agent): add db contract for session snapshot and usage audit fields" +``` + +### Task 8: End-to-End Closure Verification and Docs Update + +**Files:** +- Modify: `docs/runtime/runtime-route.md` +- Modify: `docs/runtime/runtime-runbook.md` +- Create: `backend/tests/integration/core/agent/test_session_message_persistence.py` + +**Step 1: Write integration test for full closure path** + +```python +async def test_closed_loop_run_flow_frontend_to_sse() -> None: + # run request -> queue command -> runtime events -> redis stream -> sse read + ... +``` + +Also verify: +- `tool_result` full UI schema is written to Supabase Storage private bucket. +- `messages.role=tool` row contains stable storage reference and checksum metadata. +- Reading from DB can reconstruct final AG-UI `TOOL_CALL_RESULT` event payload semantics. + +**Step 2: Run RED** + +Run: `uv run pytest backend/tests/integration/core/agent/test_session_message_persistence.py -q` +Expected: FAIL. + +**Step 3: Implement minimal missing glue and docs** + +- Fill any missing wiring revealed by the test. +- Document endpoint contracts, event taxonomy, and operational runbook for redis/celery troubleshooting. + +**Step 4: Run GREEN + full gate verification** + +Run: +- `PYTHONPATH=backend/src uv run python backend/src/core/runtime/cli.py migrate` +- `uv run pytest backend/tests/unit/core/agent backend/tests/unit/v1/agent backend/tests/integration/core/agent backend/tests/integration/v1/agent -q` +- `uv run ruff check backend/src backend/tests` +- `uv run basedpyright backend/src` + +Expected: +- All relevant tests PASS. +- Ruff PASS. +- basedpyright 0 errors (notes/warnings can be documented if pre-existing). + +**Step 5: Commit** + +```bash +git add docs/runtime/runtime-route.md docs/runtime/runtime-runbook.md backend/tests/integration/core/agent/test_session_message_persistence.py +git commit -m "docs(agent): document closed-loop runtime and verify end-to-end chain" +``` + +### Task 9: L2 Mandatory Review Gates + +**Files:** +- No direct code changes required; apply fixes if findings appear. + +**Step 1: Run required agents** + +- `tdd-guide` (already enforced by plan sequence) +- `refactor-cleaner` +- `code-reviewer` +- `security-reviewer` + +**Step 2: Fix all CRITICAL/HIGH findings** + +Run targeted tests after each fix. + +**Step 3: Final verification rerun** + +Run: +- `uv run pytest backend/tests/unit/core/agent backend/tests/unit/v1/agent backend/tests/integration/core/agent backend/tests/integration/v1/agent -q` +- `uv run ruff check backend/src backend/tests` +- `uv run basedpyright backend/src` + +Expected: no failing tests; no lint errors; no type errors. + +**Step 4: Final commit (if review fixes were needed)** + +```bash +git add backend/src backend/tests docs/runtime +git commit -m "fix(agent): resolve L2 review findings for closed-loop runtime" +```