# Agent Runtime Closed Loop Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Build a production-grade closed-loop agent runtime where `frontend -> FastAPI -> Celery -> run/resume service -> CrewAI -> AG-UI events -> Redis Stream -> SSE` is fully connected and verifiable. **Architecture:** Keep HTTP API as control-plane and worker as data-plane. The API validates auth/ownership and enqueues commands, the Celery worker executes run/resume business logic using DB-driven agent config, runtime emits normalized AG-UI events and usage/cost telemetry, all events are persisted to Redis Stream, and SSE endpoint streams from Redis with resume support (`Last-Event-ID`). **Tech Stack:** FastAPI, SQLAlchemy AsyncSession, Celery, Redis Streams, CrewAI, LiteLLM, Pydantic, pytest (unit/integration). **Confirmed Constraints (locked):** - Persist semantics use existing `messages.role` only (`assistant|user|system|tool`), no new `message_kind` column. - `tool_result` must be semantically complete (especially UI schema); do not store summary-only payload. - Store full `tool_result` payload in Supabase Storage (private bucket) and persist durable object reference in DB metadata; do not rely on expiring signed URL as primary reference. - `metadata` must be fixed and typed via Pydantic model (no free-form drift). - Do not introduce additional business tables for this scope; keep schema minimal. - CrewAI runtime must default to streaming mode. - Full traceability target is final semantic reconstruction of `user/assistant/tool_result`; chunk-level replay is not required. **Metadata Contract (fixed, Pydantic-enforced):** - Global required keys for all message metadata: `type`, `run_id`, `turn_id`. - Global optional keys for all message metadata: `event_id`, `parent_message_id`, `error`. - `type=user_input`: - Required: `type`, `run_id`, `turn_id`. - Optional: `input_source`, `client_ts`. - `type=assistant_output`: - Required: `type`, `run_id`, `turn_id`. - Optional: `finish_reason`, `model_provider`, `cost_source`. - `type=tool_call` (`role=assistant`): - Required: `type`, `run_id`, `turn_id`, `tool_call_id`, `tool_name`, `tool_args`. - Optional: `tool_schema_version`, `timeout_ms`. - `type=tool_result` (`role=tool`): - Required: `type`, `run_id`, `turn_id`, `tool_call_id`, `tool_name`, `storage_bucket`, `storage_path`, `payload_sha256`, `payload_bytes`, `payload_format`. - Optional: `ui_schema_version`, `compression`, `storage_etag`, `render_hints`. - Validation rules: - `messages.role=tool` must use `metadata.type=tool_result`. - `messages.role=assistant` + tool event must use `metadata.type=tool_call` or `assistant_output`. - `tool_result` payload in DB must be reconstructable to AG-UI `TOOL_CALL_RESULT` using Storage object + metadata checksum. --- ### Task 1: Add Agent Module Skeleton and Contracts **Files:** - Create: `backend/src/core/agent/__init__.py` - Create: `backend/src/core/agent/application/__init__.py` - Create: `backend/src/core/agent/domain/__init__.py` - Create: `backend/src/core/agent/infrastructure/events/__init__.py` - Create: `backend/src/core/agent/infrastructure/agui/bridge.py` - Create: `backend/src/core/agent/infrastructure/agui/stream.py` - Test: `backend/tests/unit/core/agent/test_agui_bridge.py` **Step 1: Write failing tests for event normalization and SSE formatting** ```python def test_bridge_normalizes_event_type_to_upper_snake() -> None: events = [{"type": "runStarted", "data": {"ok": True}}] out = to_agui_events(events) assert out[0]["type"] == "RUN_STARTED" def test_sse_format_includes_id_event_data() -> None: payload = to_sse_event(stream_id="1-0", event={"type": "RUN_STARTED", "data": {"a": 1}}) assert payload.startswith("id: 1-0\nevent: RUN_STARTED\ndata: {") ``` **Step 2: Run tests and confirm RED** Run: `uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py -q` Expected: FAIL with missing module/function errors. **Step 3: Implement minimal bridge + stream utilities** ```python def to_agui_events(internal_events: list[dict[str, Any]]) -> list[dict[str, Any]]: ... def to_sse_event(stream_id: str, event: dict[str, Any]) -> str: ... ``` **Step 4: Run tests and confirm GREEN** Run: `uv run pytest backend/tests/unit/core/agent/test_agui_bridge.py -q` Expected: PASS. **Step 5: Commit** ```bash git add backend/src/core/agent backend/tests/unit/core/agent/test_agui_bridge.py git commit -m "feat(agent): add ag-ui bridge and sse serializer utilities" ``` ### Task 2: Implement Redis Stream Event Store and Reader **Files:** - Create: `backend/src/core/agent/infrastructure/events/redis_stream.py` - Modify: `backend/src/core/config/settings.py` - Test: `backend/tests/unit/core/agent/test_redis_stream.py` **Step 1: Write failing tests for append/read semantics** ```python def test_append_event_writes_json_payload() -> None: ... def test_read_events_respects_last_event_id() -> None: ... ``` **Step 2: Run RED** Run: `uv run pytest backend/tests/unit/core/agent/test_redis_stream.py -q` Expected: FAIL. **Step 3: Implement Redis stream adapter** ```python def append_event_sync(*, session_id: UUID, event: dict[str, Any]) -> str: ... async def read_events(...): ... ``` **Step 4: Run GREEN** Run: `uv run pytest backend/tests/unit/core/agent/test_redis_stream.py -q` Expected: PASS. **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure/events/redis_stream.py backend/src/core/config/settings.py backend/tests/unit/core/agent/test_redis_stream.py git commit -m "feat(agent): add redis stream event transport for run events" ``` ### Task 3: Build CrewAI Runtime + AG-UI Event Mapping + Usage Tracking **Files:** - Create: `backend/src/core/agent/infrastructure/crewai/factory.py` - Create: `backend/src/core/agent/infrastructure/crewai/runtime.py` - Create: `backend/src/core/agent/infrastructure/litellm/client.py` - Create: `backend/src/core/agent/infrastructure/litellm/usage_tracker.py` - Create: `backend/src/core/agent/infrastructure/config/resolver.py` - Modify: `backend/src/core/config/settings.py` - Test: `backend/tests/unit/core/agent/test_crewai_runtime.py` - Test: `backend/tests/unit/core/agent/test_litellm_usage.py` - Test: `backend/tests/unit/core/agent/test_config_resolver.py` **Step 1: Write failing runtime tests (events + cost + strict errors)** ```python def test_runtime_emits_text_tool_reasoning_events() -> None: ... def test_runtime_raises_if_model_or_api_key_missing() -> None: ... def test_usage_tracker_extracts_tokens_and_cost() -> None: ... ``` **Step 2: Run RED** Run: `uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py backend/tests/unit/core/agent/test_litellm_usage.py backend/tests/unit/core/agent/test_config_resolver.py -q` Expected: FAIL. **Step 3: Implement runtime and tracker** - Register CrewAI event handlers (`Task/LLM/Tool/Reasoning`) and map to AG-UI canonical event types. - Default runtime to streaming mode for CrewAI execution. - Enforce strict config behavior: no `llm_model_code` or provider key -> raise. - Use LiteLLM cost calculator for actual cost; if cost cannot be computed, fail closed (raise), do not silently record zero. **Step 4: Run GREEN** Run: `uv run pytest backend/tests/unit/core/agent/test_crewai_runtime.py backend/tests/unit/core/agent/test_litellm_usage.py backend/tests/unit/core/agent/test_config_resolver.py -q` Expected: PASS. **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure backend/tests/unit/core/agent/test_crewai_runtime.py backend/tests/unit/core/agent/test_litellm_usage.py backend/tests/unit/core/agent/test_config_resolver.py backend/src/core/config/settings.py git commit -m "feat(agent): implement crewai runtime events and litellm usage-cost auditing" ``` ### Task 4: Implement Run/Resume Application Services (DB Config + Persistence) **Files:** - Create: `backend/src/core/agent/application/run_service.py` - Create: `backend/src/core/agent/application/resume_service.py` - Create: `backend/src/core/agent/application/session_state_persistence.py` - Create: `backend/src/core/agent/domain/state_snapshot.py` - Create: `backend/src/core/agent/domain/tool_correlation.py` - Test: `backend/tests/unit/core/agent/test_run_resume_service.py` - Test: `backend/tests/unit/core/agent/test_state_snapshot.py` - Test: `backend/tests/unit/core/agent/test_tool_correlation.py` **Step 1: Write failing tests for DB-driven runtime and aggregate updates** ```python async def test_run_service_loads_agent_config_from_db_and_persists_messages() -> None: ... async def test_resume_service_requires_pending_tool_call() -> None: ... ``` **Step 2: Run RED** Run: `uv run pytest backend/tests/unit/core/agent/test_run_resume_service.py backend/tests/unit/core/agent/test_state_snapshot.py backend/tests/unit/core/agent/test_tool_correlation.py -q` Expected: FAIL. **Step 3: Implement services** - `run_service`: read session + system agent config from DB, execute runtime, persist user/assistant messages, update session aggregates. - `resume_service`: validate pending tool call status, enforce idempotency semantics, resume runtime, persist audit fields. - Persist metadata audit (`tokens`, `cost`, `cost_source`, correlation ids) for every assistant message. - Persist tool lifecycle with role-only model: - tool call message uses `role=assistant` with fixed metadata (`type=tool_call`, `tool_call_id`, `tool_name`, arguments reference). - tool result message uses `role=tool` with fixed metadata (`type=tool_result`, `tool_call_id`, `tool_name`, storage bucket/path, checksum, bytes, schema version). - `tool_result` full payload (UI schema) is uploaded to Supabase Storage private bucket; DB stores durable reference and verification fields. - Ensure DB->AG-UI `TOOL_CALL_RESULT` reconstruction is equivalent to SSE-streamed final tool result semantics. - Enforce metadata contract by Pydantic model at write path and read path (reject malformed metadata early). **Step 4: Run GREEN** Run: `uv run pytest backend/tests/unit/core/agent/test_run_resume_service.py backend/tests/unit/core/agent/test_state_snapshot.py backend/tests/unit/core/agent/test_tool_correlation.py -q` Expected: PASS. **Step 5: Commit** ```bash git add backend/src/core/agent/application backend/src/core/agent/domain backend/tests/unit/core/agent/test_run_resume_service.py backend/tests/unit/core/agent/test_state_snapshot.py backend/tests/unit/core/agent/test_tool_correlation.py git commit -m "feat(agent): add run-resume app services with db config and audit persistence" ``` ### Task 5: Wire Celery Worker Task to Run/Resume and Publish Runtime Events **Files:** - Create: `backend/src/core/agent/infrastructure/queue/tasks.py` - Modify: `backend/src/core/celery/app.py` - Test: `backend/tests/unit/core/agent/test_queue_tasks.py` - Test: `backend/tests/integration/core/agent/test_queue_run_resume.py` **Step 1: Write failing queue tests** ```python def test_run_agent_task_emits_started_runtime_and_finished_events() -> None: ... def test_run_agent_task_emits_error_event_on_exception() -> None: ... ``` **Step 2: Run RED** Run: `uv run pytest backend/tests/unit/core/agent/test_queue_tasks.py backend/tests/integration/core/agent/test_queue_run_resume.py -q` Expected: FAIL. **Step 3: Implement worker task flow** - Decode command type (`run`/`resume`). - Emit lifecycle events (`RUN_STARTED/RUN_RESUMED/RUN_FINISHED/RUN_ERROR`). - Forward runtime callback events to Redis stream immediately. - Persist session status/snapshot after completion. **Step 4: Run GREEN** Run: `uv run pytest backend/tests/unit/core/agent/test_queue_tasks.py backend/tests/integration/core/agent/test_queue_run_resume.py -q` Expected: PASS. **Step 5: Commit** ```bash git add backend/src/core/agent/infrastructure/queue/tasks.py backend/src/core/celery/app.py backend/tests/unit/core/agent/test_queue_tasks.py backend/tests/integration/core/agent/test_queue_run_resume.py git commit -m "feat(agent): wire celery run-resume execution and redis event publishing" ``` ### Task 6: Implement API Contracts (Run/Resume/SSE) + Auth/Ownership/Idempotency **Files:** - Create: `backend/src/v1/agent/schemas.py` - Create: `backend/src/v1/agent/repository.py` - Create: `backend/src/v1/agent/service.py` - Create: `backend/src/v1/agent/router.py` - Create: `backend/src/v1/agent/dependencies.py` - Modify: `backend/src/v1/router.py` - Test: `backend/tests/unit/v1/agent/test_service.py` - Test: `backend/tests/unit/v1/agent/test_owner_guard.py` - Test: `backend/tests/integration/v1/agent/test_routes.py` **Step 1: Write failing API tests** ```python async def test_run_requires_auth_and_returns_202_task_id() -> None: ... async def test_stream_reads_from_last_event_id() -> None: ... def test_resume_idempotency_uses_redis_lock_and_task_key() -> None: ... ``` **Step 2: Run RED** Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py backend/tests/unit/v1/agent/test_owner_guard.py backend/tests/integration/v1/agent/test_routes.py -q` Expected: FAIL. **Step 3: Implement API service/router** - `POST /api/v1/agent/runs` enqueue run command. - `POST /api/v1/agent/runs/{session_id}/resume` enqueue resume command with async redis lock + dedup task key. - `GET /api/v1/agent/runs/{session_id}/events` SSE stream from Redis with `Last-Event-ID`. - Enforce auth and session ownership checks on all endpoints. - Validate `tool_call_id` and message length/pattern boundaries. **Step 4: Run GREEN** Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py backend/tests/unit/v1/agent/test_owner_guard.py backend/tests/integration/v1/agent/test_routes.py -q` Expected: PASS. **Step 5: Commit** ```bash git add backend/src/v1/agent backend/src/v1/router.py backend/tests/unit/v1/agent backend/tests/integration/v1/agent/test_routes.py git commit -m "feat(agent): add authenticated run-resume-sse api with redis-backed idempotency" ``` ### Task 7: Add Schema/Migration Contract for Session Snapshot + Audit Fields **Files:** - Create: `backend/alembic/versions/20260305_agent_runtime_closed_loop_contract.py` - Modify: `backend/src/models/agent_chat_session.py` - Modify: `backend/src/models/agent_chat_message.py` - Test: `backend/tests/unit/database/test_sessions_state_snapshot_contract.py` **Migration scope note:** - Fix current schema drift: model has `sessions.state_snapshot` but migration chain does not reliably provide this column in current DB state. - Keep schema minimal; do not add new business tables in this migration. **Step 1: Write failing migration contract tests** ```python def test_session_has_state_snapshot_and_status_contract() -> None: ... def test_message_has_token_cost_and_metadata_contract() -> None: ... ``` **Step 2: Run RED** Run: `uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py -q` Expected: FAIL. **Step 3: Implement migration and model alignment** - Ensure `state_snapshot`, `status`, token/cost/metadata fields are present and nullable constraints are explicit. - Add/verify indexes needed for role-based semantic reconstruction (`session_id, seq`, and targeted metadata lookups if required). - Ensure `metadata` structure is validated by fixed Pydantic schema at application boundary. - Add DB-level guardrails where feasible (check constraints) for role/metadata consistency without introducing new tables. - Keep reversible downgrade path. **Step 4: Run GREEN** Run: `uv run pytest backend/tests/unit/database/test_sessions_state_snapshot_contract.py -q` Expected: PASS. **Step 5: Commit** ```bash git add backend/alembic/versions/20260305_agent_runtime_closed_loop_contract.py backend/src/models/agent_chat_session.py backend/src/models/agent_chat_message.py backend/tests/unit/database/test_sessions_state_snapshot_contract.py git commit -m "feat(agent): add db contract for session snapshot and usage audit fields" ``` ### Task 8: End-to-End Closure Verification and Docs Update **Files:** - Modify: `docs/runtime/runtime-route.md` - Modify: `docs/runtime/runtime-runbook.md` - Create: `backend/tests/integration/core/agent/test_session_message_persistence.py` **Step 1: Write integration test for full closure path** ```python async def test_closed_loop_run_flow_frontend_to_sse() -> None: # run request -> queue command -> runtime events -> redis stream -> sse read ... ``` Also verify: - `tool_result` full UI schema is written to Supabase Storage private bucket. - `messages.role=tool` row contains stable storage reference and checksum metadata. - Reading from DB can reconstruct final AG-UI `TOOL_CALL_RESULT` event payload semantics. **Step 2: Run RED** Run: `uv run pytest backend/tests/integration/core/agent/test_session_message_persistence.py -q` Expected: FAIL. **Step 3: Implement minimal missing glue and docs** - Fill any missing wiring revealed by the test. - Document endpoint contracts, event taxonomy, and operational runbook for redis/celery troubleshooting. **Step 4: Run GREEN + full gate verification** Run: - `PYTHONPATH=backend/src uv run python backend/src/core/runtime/cli.py migrate` - `uv run pytest backend/tests/unit/core/agent backend/tests/unit/v1/agent backend/tests/integration/core/agent backend/tests/integration/v1/agent -q` - `uv run ruff check backend/src backend/tests` - `uv run basedpyright backend/src` Expected: - All relevant tests PASS. - Ruff PASS. - basedpyright 0 errors (notes/warnings can be documented if pre-existing). **Step 5: Commit** ```bash git add docs/runtime/runtime-route.md docs/runtime/runtime-runbook.md backend/tests/integration/core/agent/test_session_message_persistence.py git commit -m "docs(agent): document closed-loop runtime and verify end-to-end chain" ``` ### Task 9: L2 Mandatory Review Gates **Files:** - No direct code changes required; apply fixes if findings appear. **Step 1: Run required agents** - `tdd-guide` (already enforced by plan sequence) - `refactor-cleaner` - `code-reviewer` - `security-reviewer` **Step 2: Fix all CRITICAL/HIGH findings** Run targeted tests after each fix. **Step 3: Final verification rerun** Run: - `uv run pytest backend/tests/unit/core/agent backend/tests/unit/v1/agent backend/tests/integration/core/agent backend/tests/integration/v1/agent -q` - `uv run ruff check backend/src backend/tests` - `uv run basedpyright backend/src` Expected: no failing tests; no lint errors; no type errors. **Step 4: Final commit (if review fixes were needed)** ```bash git add backend/src backend/tests docs/runtime git commit -m "fix(agent): resolve L2 review findings for closed-loop runtime" ```