# Agent Interrupt/Resume Strict Refactor Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** 通过严格重构一次性修复 interrupt/resume 的并发安全、过期校验和 state_snapshot 强类型版本化问题。 **Architecture:** 以 `state_snapshot v2` 为唯一合法结构,服务层使用强类型模型解析与状态迁移,resume 路径在读取会话时加行锁保证并发一致性。路由层维持现有 run/resume 入口,错误通过 HTTPException 输出,测试覆盖版本校验、过期语义、并发幂等和状态机迁移。 **Tech Stack:** FastAPI, SQLAlchemy AsyncSession, Pydantic v2, pytest --- ### Task 1: 新增 state_snapshot v2 强类型模型 **Files:** - Modify: `backend/src/v1/agent/schemas.py` - Test: `backend/tests/unit/v1/agent/test_schemas.py` **Step 1: Write the failing test** ```python def test_state_snapshot_v2_model_accepts_valid_payload(): payload = { "version": 2, "pending_tool_call": { "interrupt_id": "int-1", "tool_name": "srv.transfer_funds", "tool_args": {"to": "u2", "amount": 100}, "status": "PENDING_APPROVAL", "expires_at": "2026-03-03T12:00:00Z", "decision": None, "result": None, "updated_at": "2026-03-03T11:59:00Z", }, "run_context": {"thread_id": "t1", "run_id": "r1"}, } model = AgentSessionSnapshot.model_validate(payload) assert model.version == 2 def test_state_snapshot_v2_rejects_wrong_version(): payload = { "version": 1, "pending_tool_call": None, "run_context": {"thread_id": "t1", "run_id": "r1"}, } with pytest.raises(ValueError): AgentSessionSnapshot.model_validate(payload) ``` **Step 2: Run test to verify it fails** Run: `uv run pytest backend/tests/unit/v1/agent/test_schemas.py -v` Expected: FAIL(`AgentSessionSnapshot` 未定义或校验不符合预期) **Step 3: Write minimal implementation** ```python class PendingToolStatus(str, Enum): PENDING_APPROVAL = "PENDING_APPROVAL" APPROVED_EXECUTING = "APPROVED_EXECUTING" EXECUTED = "EXECUTED" REJECTED = "REJECTED" EXPIRED = "EXPIRED" class PendingToolCall(BaseModel): interrupt_id: str tool_name: str tool_args: dict[str, Any] status: PendingToolStatus expires_at: datetime decision: dict[str, Any] | None = None result: dict[str, Any] | None = None updated_at: datetime class SnapshotRunContext(BaseModel): thread_id: str run_id: str class AgentSessionSnapshot(BaseModel): version: Literal[2] pending_tool_call: PendingToolCall | None = None run_context: SnapshotRunContext ``` **Step 4: Run test to verify it passes** Run: `uv run pytest backend/tests/unit/v1/agent/test_schemas.py -v` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/agent/schemas.py backend/tests/unit/v1/agent/test_schemas.py git commit -m "refactor(agent): add strict v2 session snapshot schema" ``` --- ### Task 2: service 层改为 v2 快照读写(严格拒绝旧结构) **Files:** - Modify: `backend/src/v1/agent/service.py` - Test: `backend/tests/unit/v1/agent/test_service_pending_tool_call.py` **Step 1: Write the failing test** ```python @pytest.mark.asyncio async def test_set_pending_tool_call_writes_v2_snapshot(service, session): await service.set_pending_tool_call( session_id=session.id, interrupt_id="int-1", tool_name="srv.transfer_funds", tool_args={"to": "u2", "amount": 100}, expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), thread_id="t1", run_id="r1", ) snapshot = await service.get_state_snapshot(session.id) assert snapshot["version"] == 2 assert snapshot["run_context"]["run_id"] == "r1" @pytest.mark.asyncio async def test_invalid_legacy_snapshot_is_rejected(service, session): session.state_snapshot = {"pending_tool_call": {"status": "PENDING_APPROVAL"}} with pytest.raises(ValueError): await service.apply_resume_decision( session_id=session.id, interrupt_id="int-1", decision={"decision": "approved"}, ) ``` **Step 2: Run test to verify it fails** Run: `uv run pytest backend/tests/unit/v1/agent/test_service_pending_tool_call.py -v` Expected: FAIL **Step 3: Write minimal implementation** ```python def _build_snapshot_v2(...): return AgentSessionSnapshot(...).model_dump(mode="json") def _load_snapshot_v2(raw: dict[str, Any] | None) -> AgentSessionSnapshot: if raw is None: raise ValueError("state_snapshot missing") return AgentSessionSnapshot.model_validate(raw) ``` 并将 `set_pending_tool_call/get_state_snapshot/update_pending_tool_call_status` 全部改成 v2 模型读写。 **Step 4: Run test to verify it passes** Run: `uv run pytest backend/tests/unit/v1/agent/test_service_pending_tool_call.py -v` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/agent/service.py backend/tests/unit/v1/agent/test_service_pending_tool_call.py git commit -m "refactor(agent): enforce v2 snapshot read write in service" ``` --- ### Task 3: 增加 resume 行锁与并发幂等 **Files:** - Modify: `backend/src/v1/agent/service.py` - Test: `backend/tests/unit/v1/agent/test_resume_idempotency.py` **Step 1: Write the failing test** ```python @pytest.mark.asyncio async def test_apply_resume_decision_uses_locked_session_fetch(service, fake_db, session): await service.apply_resume_decision( session_id=session.id, interrupt_id="int-1", decision={"decision": "approved"}, ) assert fake_db.last_fetch_with_lock is True @pytest.mark.asyncio async def test_resume_is_idempotent(service, session): first = await service.apply_resume_decision(...) second = await service.apply_resume_decision(...) assert first.applied is True assert second.applied is False ``` **Step 2: Run test to verify it fails** Run: `uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v` Expected: FAIL **Step 3: Write minimal implementation** ```python async def _get_session_for_update(self, session_id: UUID) -> AgentChatSession | None: stmt = ( select(AgentChatSession) .where(AgentChatSession.id == session_id) .with_for_update() .limit(1) ) result = await self._session.execute(stmt) return result.scalar_one_or_none() ``` `apply_resume_decision` 改为锁内读取、校验、状态迁移,保证并发下单次生效。 **Step 4: Run test to verify it passes** Run: `uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/agent/service.py backend/tests/unit/v1/agent/test_resume_idempotency.py git commit -m "fix(agent): add row lock for resume state transition" ``` --- ### Task 4: 增加 expires_at 过期校验(含 EXPIRED 迁移) **Files:** - Modify: `backend/src/v1/agent/service.py` - Test: `backend/tests/unit/v1/agent/test_resume_idempotency.py` **Step 1: Write the failing test** ```python @pytest.mark.asyncio async def test_resume_expired_pending_returns_not_applied_and_marks_expired(service, session): await service.set_pending_tool_call(..., expires_at=datetime.now(timezone.utc) - timedelta(seconds=1), thread_id="t1", run_id="r1") result = await service.apply_resume_decision( session_id=session.id, interrupt_id="int-1", decision={"decision": "approved"}, ) assert result.applied is False snapshot = await service.get_state_snapshot(session.id) assert snapshot["pending_tool_call"]["status"] == "EXPIRED" ``` **Step 2: Run test to verify it fails** Run: `uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v` Expected: FAIL **Step 3: Write minimal implementation** ```python if pending.expires_at < datetime.now(timezone.utc): pending.status = PendingToolStatus.EXPIRED pending.updated_at = datetime.now(timezone.utc) session.state_snapshot = snapshot.model_dump(mode="json") return ResumeDecisionResult(applied=False, expired=True) ``` **Step 4: Run test to verify it passes** Run: `uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/agent/service.py backend/tests/unit/v1/agent/test_resume_idempotency.py git commit -m "fix(agent): enforce expires_at when applying resume decision" ``` --- ### Task 5: 路由层补齐 v2 快照与过期/冲突错误映射 **Files:** - Modify: `backend/src/v1/agent/router.py` - Modify: `backend/src/v1/agent/service.py` - Test: `backend/tests/integration/v1/agent/test_chat_routes.py` - Test: `backend/tests/integration/v1/agent/test_interrupt_resume_flow.py` **Step 1: Write the failing test** ```python def test_resume_route_returns_409_on_run_id_mismatch(client): ... def test_resume_route_returns_410_when_pending_expired(client): ... def test_resume_route_returns_422_for_legacy_snapshot(client): ... ``` **Step 2: Run test to verify it fails** Run: `uv run pytest backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py -v` Expected: FAIL **Step 3: Write minimal implementation** 在 `stream_resume` 或路由调用链里将领域错误映射为: - 过期 -> `HTTPException(410)` - 旧快照/结构错误 -> `HTTPException(422)` - 状态冲突/重复消费 -> `HTTPException(409)` **Step 4: Run test to verify it passes** Run: `uv run pytest backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py -v` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/agent/router.py backend/src/v1/agent/service.py backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py git commit -m "fix(agent): map resume snapshot errors to 409 410 422" ``` --- ### Task 6: 更新文档并完成验证 **Files:** - Modify: `docs/plans/2026-03-03-agent-chat-design.md` - Modify: `docs/runtime/runtime-route.md` **Step 1: Update docs** - 明确 `state_snapshot version=2` 为唯一支持结构 - 明确 resume 过期与并发冲突语义(410/409) - 明确旧快照拒绝策略(422) **Step 2: Run unit tests** Run: `uv run pytest backend/tests/unit/v1/agent -v` Expected: PASS **Step 3: Run integration tests** Run: `uv run pytest backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py -v` Expected: PASS **Step 4: Run static checks** Run: `cd backend && uv run ruff check src/v1/agent` Expected: PASS Run: `cd backend && uv run basedpyright src/v1/agent` Expected: PASS **Step 5: Commit** ```bash git add docs/plans/2026-03-03-agent-chat-design.md docs/runtime/runtime-route.md git commit -m "docs(agent): document strict snapshot v2 and resume error semantics" ``` --- Plan complete and saved to `docs/plans/2026-03-03-interrupt-resume-fixes-implementation-plan.md`. Execution mode selected by user request: Subagent-Driven (this session), proceed task-by-task immediately.