# Agent Run Cancel (Failed Semantics) Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** 为 `/api/v1/agent/runs` 增加可中断能力,在用户触发 cancel 后真正停止运行中的 agent 流程,并以 `RUN_ERROR(code=RUN_CANCELED)` 结束,最终将 session 状态落为 `failed`。 **Architecture:** 使用“协作取消 + 主任务中断”方案:API 层写入 Redis cancel 信号,runtime 在 worker 进程内并行 watcher 监听信号,命中后先调用 active agent 的 `interrupt()` 做优雅收尾,再 `cancel()` 当前 run 主任务做硬兜底。终态统一通过 `RUN_ERROR` 事件落库,复用现有 `FAILED` 会话语义,避免数据库枚举迁移。 **Tech Stack:** FastAPI, TaskIQ, Redis, AgentScope, SQLAlchemy, Pytest, Ruff, BasedPyright --- ### Task 1: 先更新协议文档(接口与事件语义) **Files:** - Modify: `docs/protocols/agent/api-endpoints.md` - Modify: `docs/protocols/agent/sse-events.md` **Step 1: 在 API 文档新增 cancel 端点契约** 在 `api-endpoints.md` 的端点清单添加: ```md | POST | `/runs/{thread_id}/cancel` | 请求取消指定 run | ``` 并新增章节说明: - 请求参数:`thread_id` + `runId`(建议 query) - 返回:`202 Accepted` + `accepted: true` - 语义:仅表示“取消请求已接收”,不保证已即时终止 **Step 2: 在 SSE 文档补充取消终态语义** 在 `sse-events.md` 的 `RUN_ERROR` 章节补充: ```json { "type": "RUN_ERROR", "threadId": "...", "runId": "...", "message": "run canceled by user", "code": "RUN_CANCELED" } ``` 并明确: - `RUN_CANCELED` 是用户主动中断,不是系统异常 - 本阶段仍复用 session `failed`(向后兼容) **Step 3: 文档自检** 检查文档是否同时覆盖: - HTTP 行为 - SSE 终态事件 - 兼容策略(不引入新 session 状态) **Step 4: 提交文档变更** ```bash git add docs/protocols/agent/api-endpoints.md docs/protocols/agent/sse-events.md git commit -m "docs: define agent run cancel API and RUN_CANCELED error semantics" ``` ### Task 2: 打通 API 层到队列层的 cancel 信号写入 **Files:** - Modify: `backend/src/v1/agent/schemas.py` - Modify: `backend/src/v1/agent/dependencies.py` - Modify: `backend/src/v1/agent/service.py` - Modify: `backend/src/v1/agent/router.py` - Test: `backend/tests/unit/v1/agent/test_service.py` - Test: `backend/tests/integration/v1/agent/test_routes.py` **Step 1: 增加 cancel 接口响应 schema** 在 `v1/agent/schemas.py` 增加: ```python class CancelRunResponse(BaseModel): model_config = ConfigDict(populate_by_name=True, serialize_by_alias=True) thread_id: str = Field(alias="threadId") run_id: str = Field(alias="runId") accepted: bool ``` **Step 2: 扩展队列协议接口** 在 `QueueClientLike` 增加: ```python async def request_cancel(self, *, thread_id: str, run_id: str, requested_by: str) -> None: ... ``` **Step 3: 在 `TaskiqQueueClient` 实现 request_cancel** 在 `v1/agent/dependencies.py` 中新增: - cancel key 规范:`agent:cancel:{thread_id}:{run_id}` - `SET key value EX ` 写入取消信号 - `value` 可写 json 字符串(包含 user_id/timestamp) **Step 4: 在 service 层新增 cancel_run** 在 `v1/agent/service.py` 增加方法: - 校验 session owner(复用 `get_session_owner + ensure_session_owner`) - 调用 `self._queue.request_cancel(...)` - 返回 `accepted` 结果 DTO **Step 5: 在 router 新增 cancel 路由** 在 `v1/agent/router.py` 新增: ```python @router.post("/runs/{thread_id}/cancel", response_model=CancelRunResponse, status_code=202) async def cancel_run(...): ... ``` 约束: - `runId` 必填(建议 query) - 非 owner 返回 403 - 参数非法返回 422 **Step 6: 写 service 单测(先红)** 在 `test_service.py` 添加: - owner 可发起 cancel,`queue.request_cancel` 被调用 - 非 owner cancel 返回 403 **Step 7: 运行单测确认失败** Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py -k cancel -q` Expected: 至少 1 个测试失败(新逻辑尚未实现) **Step 8: 实现最小代码使测试通过** 按 Step 1-5 完成实现,避免额外重构。 **Step 9: 运行测试验证通过** Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py -k cancel -q` Expected: PASS **Step 10: 增加路由集成测试** 在 `test_routes.py` 增加: - `POST /api/v1/agent/runs/{thread_id}/cancel?runId=...` 返回 202 - 响应字段别名正确(`threadId/runId/accepted`) **Step 11: 运行路由测试** Run: `uv run pytest backend/tests/integration/v1/agent/test_routes.py -k cancel -q` Expected: PASS **Step 12: 提交 API 层变更** ```bash git add backend/src/v1/agent/schemas.py backend/src/v1/agent/dependencies.py backend/src/v1/agent/service.py backend/src/v1/agent/router.py backend/tests/unit/v1/agent/test_service.py backend/tests/integration/v1/agent/test_routes.py git commit -m "feat: add agent run cancel endpoint and Redis cancel signal" ``` ### Task 3: runtime runner 植入取消 watcher 与优雅中断 **Files:** - Modify: `backend/src/core/agentscope/runtime/runner.py` - Test: `backend/tests/unit/core/agentscope/runtime/test_runner.py` **Step 1: 在 runner.execute 增加 cancel_checker 参数** 更新 `execute()` 签名: ```python cancel_checker: Callable[[], Awaitable[bool]] | None = None ``` 并保持默认 `None` 向后兼容。 **Step 2: 增加 active agent 引用与锁** 在 `AgentScopeRunner.__init__` 增加: - `self._active_agent: JsonReActAgent | None = None` - `self._active_agent_lock = asyncio.Lock()` **Step 3: 在 `_run_worker_stage` 设置 active agent 生命周期** 在 `agent.reply_json(...)` 前后包裹: - before: 记录 `self._active_agent = agent` - finally: 清理引用 **Step 4: 新增 `_watch_cancel_signal` 协程** 行为: - 循环调用 `cancel_checker()` - 命中后先尝试 `await active_agent.interrupt()` - 再 `run_task.cancel("run canceled by user")` - 间隔 `await asyncio.sleep(0.2)` **Step 5: 在 execute 启停 watcher** - `run_task = asyncio.current_task()` - 如果有 `cancel_checker`,`create_task(_watch_cancel_signal(...))` - `finally` 中停止 watcher 并 `await` 回收 **Step 6: 补 stage 边界取消 gate(关键)** 在 router 结束后、worker 开始前检查一次 `cancel_checker()`: - 为 true 时抛 `asyncio.CancelledError` 目的:防止“router 已结束但仍进入 worker”。 **Step 7: 写 runner 单测(先红)** 新增测试用例: - cancel 信号触发后,`execute` 抛出 `CancelledError` - worker 未被继续执行(或中途被中断) **Step 8: 运行 runner 测试** Run: `uv run pytest backend/tests/unit/core/agentscope/runtime/test_runner.py -k cancel -q` Expected: PASS **Step 9: 提交 runner 变更** ```bash git add backend/src/core/agentscope/runtime/runner.py backend/tests/unit/core/agentscope/runtime/test_runner.py git commit -m "feat: add cooperative cancellation watcher to agentscope runner" ``` ### Task 4: orchestrator 与 task worker 处理 CancelledError 终态 **Files:** - Modify: `backend/src/core/agentscope/runtime/orchestrator.py` - Modify: `backend/src/core/agentscope/runtime/tasks.py` - Test: `backend/tests/unit/core/agentscope/runtime/test_orchestrator.py` - Test: `backend/tests/unit/core/agentscope/runtime/test_tasks.py` **Step 1: orchestrator 单独捕获 CancelledError** 在 `orchestrator.run()` 添加: ```python except asyncio.CancelledError: await self._pipeline.emit(... RUN_ERROR code="RUN_CANCELED" ...) raise ``` 保留现有 `except Exception` 处理系统错误。 **Step 2: task 层构造 cancel_checker 并注入 runtime.run** 在 `tasks.py`: - 构造 key:`agent:cancel:{thread_id}:{run_id}` - 定义 `async def cancel_checker() -> bool: return bool(await redis.exists(key))` - 调用 `runtime.run(..., cancel_checker=cancel_checker)` **Step 3: task 层补资源清理** 在 `run_agentscope_task` 的 `finally`: - 删除 cancel key 或缩短 TTL - 记录日志(仅必要字段) **Step 4: 写 orchestrator 单测(先红)** 验证: - 收到 `CancelledError` 时发 `RUN_ERROR` 且 `code == "RUN_CANCELED"` **Step 5: 写 tasks 单测(先红)** 验证: - runtime 收到的 `cancel_checker` 可用 - key 命中时上抛 `CancelledError` 路径成立 **Step 6: 运行测试** Run: `uv run pytest backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py -k cancel -q` Expected: PASS **Step 7: 提交 runtime 编排层变更** ```bash git add backend/src/core/agentscope/runtime/orchestrator.py backend/src/core/agentscope/runtime/tasks.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py git commit -m "fix: emit RUN_CANCELED error when run task is interrupted" ``` ### Task 5: 事件流与持久化一致性回归 **Files:** - Modify: `backend/tests/unit/core/agentscope/events/test_store.py` - Modify: `backend/tests/unit/core/agentscope/events/test_agui_codec.py` - Modify: `backend/tests/integration/v1/agent/test_sse_flow_live.py` **Step 1: 补 event store 行为测试** 新增断言: - 当 `RUN_ERROR` 且 `code=RUN_CANCELED` 时,session 状态依然为 `FAILED` **Step 2: 补 codec 测试** 新增断言: - `RUN_ERROR` 的 `code` 字段能正确透传到 wire event **Step 3: 补 SSE 集成测试** 场景: - 触发 `/runs` - 触发 `/runs/{thread_id}/cancel` - SSE 最终出现 `RUN_ERROR(code=RUN_CANCELED)` **Step 4: 运行事件相关测试** Run: `uv run pytest backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py backend/tests/integration/v1/agent/test_sse_flow_live.py -k "cancel or run_error" -q` Expected: PASS **Step 5: 提交事件层变更** ```bash git add backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py backend/tests/integration/v1/agent/test_sse_flow_live.py git commit -m "test: cover RUN_CANCELED propagation across store codec and SSE" ``` ### Task 6: 全量验证与发布前检查 **Files:** - Modify: `docs/protocols/agent/api-endpoints.md`(如需补充最终字段) - Modify: `docs/protocols/agent/sse-events.md`(如需补充最终字段) **Step 1: 运行受影响单元测试集合** Run: ```bash uv run pytest backend/tests/unit/v1/agent/test_service.py backend/tests/unit/core/agentscope/runtime/test_runner.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py -q ``` Expected: PASS **Step 2: 运行受影响集成测试集合** Run: ```bash uv run pytest backend/tests/integration/v1/agent/test_routes.py backend/tests/integration/v1/agent/test_sse_flow_live.py -q ``` Expected: PASS **Step 3: 运行静态检查** Run: ```bash uv run ruff check backend/src backend/tests uv run basedpyright ``` Expected: PASS(无新增 lint/type 错误) **Step 4: 手工验证路径** 手工流程: - 发起 `/runs` - 立刻调用 `/runs/{thread_id}/cancel?runId=...` - 观察 SSE:应以 `RUN_ERROR(code=RUN_CANCELED)` 结束 - 检查 session:`status=failed` **Step 5: 最终提交** ```bash git add docs/protocols/agent/api-endpoints.md docs/protocols/agent/sse-events.md backend/src/v1/agent/*.py backend/src/core/agentscope/runtime/*.py backend/tests/unit/core/agentscope/runtime/*.py backend/tests/unit/core/agentscope/events/*.py backend/tests/unit/v1/agent/test_service.py backend/tests/integration/v1/agent/test_routes.py backend/tests/integration/v1/agent/test_sse_flow_live.py git commit -m "feat: support run cancellation with RUN_CANCELED failed semantics" ``` --- ## 风险与回滚 - 风险 1:cancel key 误命中导致误中断 - 缓解:key 粒度使用 `thread_id + run_id`,并设置 TTL - 风险 2:中断时出现重复终态事件 - 缓解:在 orchestrator 保证 CancelledError 只走 `RUN_ERROR` 分支,避免继续发 `RUN_FINISHED` - 风险 3:高并发下 Redis 轮询压力上升 - 缓解:轮询间隔 200ms,后续按并发量评估改为 pub/sub 回滚策略: - 回滚 `router/service/dependencies` cancel 新接口 - 回滚 `runner/orchestrator/tasks` cancel 注入逻辑 - 保持原 `POST /runs` 与 SSE 流程不变