Files
social-app/docs/plans/2026-03-03-interrupt-resume-fixes-implementation-plan.md
T

11 KiB
Raw Blame History

Agent Interrupt/Resume Strict Refactor Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: 通过严格重构一次性修复 interrupt/resume 的并发安全、过期校验和 state_snapshot 强类型版本化问题。

Architecture:state_snapshot v2 为唯一合法结构,服务层使用强类型模型解析与状态迁移,resume 路径在读取会话时加行锁保证并发一致性。路由层维持现有 run/resume 入口,错误通过 HTTPException 输出,测试覆盖版本校验、过期语义、并发幂等和状态机迁移。

Tech Stack: FastAPI, SQLAlchemy AsyncSession, Pydantic v2, pytest


Task 1: 新增 state_snapshot v2 强类型模型

Files:

  • Modify: backend/src/v1/agent/schemas.py
  • Test: backend/tests/unit/v1/agent/test_schemas.py

Step 1: Write the failing test

def test_state_snapshot_v2_model_accepts_valid_payload():
    payload = {
        "version": 2,
        "pending_tool_call": {
            "interrupt_id": "int-1",
            "tool_name": "srv.transfer_funds",
            "tool_args": {"to": "u2", "amount": 100},
            "status": "PENDING_APPROVAL",
            "expires_at": "2026-03-03T12:00:00Z",
            "decision": None,
            "result": None,
            "updated_at": "2026-03-03T11:59:00Z",
        },
        "run_context": {"thread_id": "t1", "run_id": "r1"},
    }
    model = AgentSessionSnapshot.model_validate(payload)
    assert model.version == 2


def test_state_snapshot_v2_rejects_wrong_version():
    payload = {
        "version": 1,
        "pending_tool_call": None,
        "run_context": {"thread_id": "t1", "run_id": "r1"},
    }
    with pytest.raises(ValueError):
        AgentSessionSnapshot.model_validate(payload)

Step 2: Run test to verify it fails

Run: uv run pytest backend/tests/unit/v1/agent/test_schemas.py -v Expected: FAILAgentSessionSnapshot 未定义或校验不符合预期)

Step 3: Write minimal implementation

class PendingToolStatus(str, Enum):
    PENDING_APPROVAL = "PENDING_APPROVAL"
    APPROVED_EXECUTING = "APPROVED_EXECUTING"
    EXECUTED = "EXECUTED"
    REJECTED = "REJECTED"
    EXPIRED = "EXPIRED"


class PendingToolCall(BaseModel):
    interrupt_id: str
    tool_name: str
    tool_args: dict[str, Any]
    status: PendingToolStatus
    expires_at: datetime
    decision: dict[str, Any] | None = None
    result: dict[str, Any] | None = None
    updated_at: datetime


class SnapshotRunContext(BaseModel):
    thread_id: str
    run_id: str


class AgentSessionSnapshot(BaseModel):
    version: Literal[2]
    pending_tool_call: PendingToolCall | None = None
    run_context: SnapshotRunContext

Step 4: Run test to verify it passes

Run: uv run pytest backend/tests/unit/v1/agent/test_schemas.py -v Expected: PASS

Step 5: Commit

git add backend/src/v1/agent/schemas.py backend/tests/unit/v1/agent/test_schemas.py
git commit -m "refactor(agent): add strict v2 session snapshot schema"

Task 2: service 层改为 v2 快照读写(严格拒绝旧结构)

Files:

  • Modify: backend/src/v1/agent/service.py
  • Test: backend/tests/unit/v1/agent/test_service_pending_tool_call.py

Step 1: Write the failing test

@pytest.mark.asyncio
async def test_set_pending_tool_call_writes_v2_snapshot(service, session):
    await service.set_pending_tool_call(
        session_id=session.id,
        interrupt_id="int-1",
        tool_name="srv.transfer_funds",
        tool_args={"to": "u2", "amount": 100},
        expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
        thread_id="t1",
        run_id="r1",
    )
    snapshot = await service.get_state_snapshot(session.id)
    assert snapshot["version"] == 2
    assert snapshot["run_context"]["run_id"] == "r1"


@pytest.mark.asyncio
async def test_invalid_legacy_snapshot_is_rejected(service, session):
    session.state_snapshot = {"pending_tool_call": {"status": "PENDING_APPROVAL"}}
    with pytest.raises(ValueError):
        await service.apply_resume_decision(
            session_id=session.id,
            interrupt_id="int-1",
            decision={"decision": "approved"},
        )

Step 2: Run test to verify it fails

Run: uv run pytest backend/tests/unit/v1/agent/test_service_pending_tool_call.py -v Expected: FAIL

Step 3: Write minimal implementation

def _build_snapshot_v2(...):
    return AgentSessionSnapshot(...).model_dump(mode="json")


def _load_snapshot_v2(raw: dict[str, Any] | None) -> AgentSessionSnapshot:
    if raw is None:
        raise ValueError("state_snapshot missing")
    return AgentSessionSnapshot.model_validate(raw)

并将 set_pending_tool_call/get_state_snapshot/update_pending_tool_call_status 全部改成 v2 模型读写。

Step 4: Run test to verify it passes

Run: uv run pytest backend/tests/unit/v1/agent/test_service_pending_tool_call.py -v Expected: PASS

Step 5: Commit

git add backend/src/v1/agent/service.py backend/tests/unit/v1/agent/test_service_pending_tool_call.py
git commit -m "refactor(agent): enforce v2 snapshot read write in service"

Task 3: 增加 resume 行锁与并发幂等

Files:

  • Modify: backend/src/v1/agent/service.py
  • Test: backend/tests/unit/v1/agent/test_resume_idempotency.py

Step 1: Write the failing test

@pytest.mark.asyncio
async def test_apply_resume_decision_uses_locked_session_fetch(service, fake_db, session):
    await service.apply_resume_decision(
        session_id=session.id,
        interrupt_id="int-1",
        decision={"decision": "approved"},
    )
    assert fake_db.last_fetch_with_lock is True


@pytest.mark.asyncio
async def test_resume_is_idempotent(service, session):
    first = await service.apply_resume_decision(...)
    second = await service.apply_resume_decision(...)
    assert first.applied is True
    assert second.applied is False

Step 2: Run test to verify it fails

Run: uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v Expected: FAIL

Step 3: Write minimal implementation

async def _get_session_for_update(self, session_id: UUID) -> AgentChatSession | None:
    stmt = (
        select(AgentChatSession)
        .where(AgentChatSession.id == session_id)
        .with_for_update()
        .limit(1)
    )
    result = await self._session.execute(stmt)
    return result.scalar_one_or_none()

apply_resume_decision 改为锁内读取、校验、状态迁移,保证并发下单次生效。

Step 4: Run test to verify it passes

Run: uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v Expected: PASS

Step 5: Commit

git add backend/src/v1/agent/service.py backend/tests/unit/v1/agent/test_resume_idempotency.py
git commit -m "fix(agent): add row lock for resume state transition"

Task 4: 增加 expires_at 过期校验(含 EXPIRED 迁移)

Files:

  • Modify: backend/src/v1/agent/service.py
  • Test: backend/tests/unit/v1/agent/test_resume_idempotency.py

Step 1: Write the failing test

@pytest.mark.asyncio
async def test_resume_expired_pending_returns_not_applied_and_marks_expired(service, session):
    await service.set_pending_tool_call(..., expires_at=datetime.now(timezone.utc) - timedelta(seconds=1), thread_id="t1", run_id="r1")
    result = await service.apply_resume_decision(
        session_id=session.id,
        interrupt_id="int-1",
        decision={"decision": "approved"},
    )
    assert result.applied is False
    snapshot = await service.get_state_snapshot(session.id)
    assert snapshot["pending_tool_call"]["status"] == "EXPIRED"

Step 2: Run test to verify it fails

Run: uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v Expected: FAIL

Step 3: Write minimal implementation

if pending.expires_at < datetime.now(timezone.utc):
    pending.status = PendingToolStatus.EXPIRED
    pending.updated_at = datetime.now(timezone.utc)
    session.state_snapshot = snapshot.model_dump(mode="json")
    return ResumeDecisionResult(applied=False, expired=True)

Step 4: Run test to verify it passes

Run: uv run pytest backend/tests/unit/v1/agent/test_resume_idempotency.py -v Expected: PASS

Step 5: Commit

git add backend/src/v1/agent/service.py backend/tests/unit/v1/agent/test_resume_idempotency.py
git commit -m "fix(agent): enforce expires_at when applying resume decision"

Task 5: 路由层补齐 v2 快照与过期/冲突错误映射

Files:

  • Modify: backend/src/v1/agent/router.py
  • Modify: backend/src/v1/agent/service.py
  • Test: backend/tests/integration/v1/agent/test_chat_routes.py
  • Test: backend/tests/integration/v1/agent/test_interrupt_resume_flow.py

Step 1: Write the failing test

def test_resume_route_returns_409_on_run_id_mismatch(client):
    ...


def test_resume_route_returns_410_when_pending_expired(client):
    ...


def test_resume_route_returns_422_for_legacy_snapshot(client):
    ...

Step 2: Run test to verify it fails

Run: uv run pytest backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py -v Expected: FAIL

Step 3: Write minimal implementation

stream_resume 或路由调用链里将领域错误映射为:

  • 过期 -> HTTPException(410)
  • 旧快照/结构错误 -> HTTPException(422)
  • 状态冲突/重复消费 -> HTTPException(409)

Step 4: Run test to verify it passes

Run: uv run pytest backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py -v Expected: PASS

Step 5: Commit

git add backend/src/v1/agent/router.py backend/src/v1/agent/service.py backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py
git commit -m "fix(agent): map resume snapshot errors to 409 410 422"

Task 6: 更新文档并完成验证

Files:

  • Modify: docs/plans/2026-03-03-agent-chat-design.md
  • Modify: docs/runtime/runtime-route.md

Step 1: Update docs

  • 明确 state_snapshot version=2 为唯一支持结构
  • 明确 resume 过期与并发冲突语义(410/409)
  • 明确旧快照拒绝策略(422

Step 2: Run unit tests

Run: uv run pytest backend/tests/unit/v1/agent -v Expected: PASS

Step 3: Run integration tests

Run: uv run pytest backend/tests/integration/v1/agent/test_chat_routes.py backend/tests/integration/v1/agent/test_interrupt_resume_flow.py -v Expected: PASS

Step 4: Run static checks

Run: cd backend && uv run ruff check src/v1/agent Expected: PASS

Run: cd backend && uv run basedpyright src/v1/agent Expected: PASS

Step 5: Commit

git add docs/plans/2026-03-03-agent-chat-design.md docs/runtime/runtime-route.md
git commit -m "docs(agent): document strict snapshot v2 and resume error semantics"

Plan complete and saved to docs/plans/2026-03-03-interrupt-resume-fixes-implementation-plan.md.

Execution mode selected by user request: Subagent-Driven (this session), proceed task-by-task immediately.