Files
social-app/docs/plans/2026-03-25-agent-run-cancel-failed.md
T
2026-03-25 18:33:25 +08:00

471 lines
16 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Agent Run Cancel (Failed Semantics) Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:**`/api/v1/agent/runs` 增加可中断能力,在用户触发 cancel 后真正停止运行中的 agent 流程,并以 `RUN_ERROR(code=RUN_CANCELED)` 结束,最终将 session 状态落为 `failed`
**Architecture:** 使用“协作取消 + 主任务中断”方案:API 层写入 Redis cancel 信号,runtime 在 worker 进程内并行 watcher 监听信号,命中后先调用 active agent 的 `interrupt()` 做优雅收尾,再 `cancel()` 当前 run 主任务做硬兜底。终态统一通过 `RUN_ERROR` 事件落库,复用现有 `FAILED` 会话语义,避免数据库枚举迁移。
**Tech Stack:** FastAPI, TaskIQ, Redis, AgentScope, SQLAlchemy, Flutter, Pytest, Ruff, BasedPyright
---
### Task 1: 先更新协议文档(接口与事件语义)
**Files:**
- Modify: `docs/protocols/agent/api-endpoints.md`
- Modify: `docs/protocols/agent/sse-events.md`
**Step 1: 在 API 文档新增 cancel 端点契约**
`api-endpoints.md` 的端点清单添加:
```md
| POST | `/runs/{thread_id}/cancel` | 请求取消指定 run |
```
并新增章节说明:
- 请求参数:`thread_id` + `runId`(建议 query
- 返回:`202 Accepted` + `accepted: true`
- 语义:仅表示“取消请求已接收”,不保证已即时终止
**Step 2: 在 SSE 文档补充取消终态语义**
`sse-events.md``RUN_ERROR` 章节补充:
```json
{
"type": "RUN_ERROR",
"threadId": "...",
"runId": "...",
"message": "run canceled by user",
"code": "RUN_CANCELED"
}
```
并明确:
- `RUN_CANCELED` 是用户主动中断,不是系统异常
- 本阶段仍复用 session `failed`(向后兼容)
**Step 3: 文档自检**
检查文档是否同时覆盖:
- HTTP 行为
- SSE 终态事件
- 兼容策略(不引入新 session 状态)
**Step 4: 提交文档变更**
```bash
git add docs/protocols/agent/api-endpoints.md docs/protocols/agent/sse-events.md
git commit -m "docs: define agent run cancel API and RUN_CANCELED error semantics"
```
### Task 2: 打通 API 层到队列层的 cancel 信号写入
**Files:**
- Modify: `backend/src/v1/agent/schemas.py`
- Modify: `backend/src/v1/agent/dependencies.py`
- Modify: `backend/src/v1/agent/service.py`
- Modify: `backend/src/v1/agent/router.py`
- Test: `backend/tests/unit/v1/agent/test_service.py`
- Test: `backend/tests/integration/v1/agent/test_routes.py`
**Step 1: 增加 cancel 接口响应 schema**
`v1/agent/schemas.py` 增加:
```python
class CancelRunResponse(BaseModel):
model_config = ConfigDict(populate_by_name=True, serialize_by_alias=True)
thread_id: str = Field(alias="threadId")
run_id: str = Field(alias="runId")
accepted: bool
```
**Step 2: 扩展队列协议接口**
`QueueClientLike` 增加:
```python
async def request_cancel(self, *, thread_id: str, run_id: str, requested_by: str) -> None: ...
```
**Step 3: 在 `TaskiqQueueClient` 实现 request_cancel**
`v1/agent/dependencies.py` 中新增:
- cancel key 规范:`agent:cancel:{thread_id}:{run_id}`
- `SET key value EX <ttl>` 写入取消信号
- `value` 可写 json 字符串(包含 user_id/timestamp
**Step 4: 在 service 层新增 cancel_run**
`v1/agent/service.py` 增加方法:
- 校验 session owner(复用 `get_session_owner + ensure_session_owner`
- 调用 `self._queue.request_cancel(...)`
- 返回 `accepted` 结果 DTO
**Step 5: 在 router 新增 cancel 路由**
`v1/agent/router.py` 新增:
```python
@router.post("/runs/{thread_id}/cancel", response_model=CancelRunResponse, status_code=202)
async def cancel_run(...):
...
```
约束:
- `runId` 必填(建议 query
- 非 owner 返回 403
- 参数非法返回 422
**Step 6: 写 service 单测(先红)**
`test_service.py` 添加:
- owner 可发起 cancel`queue.request_cancel` 被调用
- 非 owner cancel 返回 403
**Step 7: 运行单测确认失败**
Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py -k cancel -q`
Expected: 至少 1 个测试失败(新逻辑尚未实现)
**Step 8: 实现最小代码使测试通过**
按 Step 1-5 完成实现,避免额外重构。
**Step 9: 运行测试验证通过**
Run: `uv run pytest backend/tests/unit/v1/agent/test_service.py -k cancel -q`
Expected: PASS
**Step 10: 增加路由集成测试**
`test_routes.py` 增加:
- `POST /api/v1/agent/runs/{thread_id}/cancel?runId=...` 返回 202
- 响应字段别名正确(`threadId/runId/accepted`
**Step 11: 运行路由测试**
Run: `uv run pytest backend/tests/integration/v1/agent/test_routes.py -k cancel -q`
Expected: PASS
**Step 12: 提交 API 层变更**
```bash
git add backend/src/v1/agent/schemas.py backend/src/v1/agent/dependencies.py backend/src/v1/agent/service.py backend/src/v1/agent/router.py backend/tests/unit/v1/agent/test_service.py backend/tests/integration/v1/agent/test_routes.py
git commit -m "feat: add agent run cancel endpoint and Redis cancel signal"
```
### Task 3: runtime runner 植入取消 watcher 与优雅中断
**Files:**
- Modify: `backend/src/core/agentscope/runtime/runner.py`
- Test: `backend/tests/unit/core/agentscope/runtime/test_runner.py`
**Step 1: 在 runner.execute 增加 cancel_checker 参数**
更新 `execute()` 签名:
```python
cancel_checker: Callable[[], Awaitable[bool]] | None = None
```
并保持默认 `None` 向后兼容。
**Step 2: 增加 active agent 引用与锁**
`AgentScopeRunner.__init__` 增加:
- `self._active_agent: JsonReActAgent | None = None`
- `self._active_agent_lock = asyncio.Lock()`
**Step 3: 在 `_run_worker_stage` 设置 active agent 生命周期**
`agent.reply_json(...)` 前后包裹:
- before: 记录 `self._active_agent = agent`
- finally: 清理引用
**Step 4: 新增 `_watch_cancel_signal` 协程**
行为:
- 循环调用 `cancel_checker()`
- 命中后先尝试 `await active_agent.interrupt()`
-`run_task.cancel("run canceled by user")`
- 间隔 `await asyncio.sleep(0.2)`
**Step 5: 在 execute 启停 watcher**
- `run_task = asyncio.current_task()`
- 如果有 `cancel_checker``create_task(_watch_cancel_signal(...))`
- `finally` 中停止 watcher 并 `await` 回收
**Step 6: 补 stage 边界取消 gate(关键)**
在 router 结束后、worker 开始前检查一次 `cancel_checker()`
- 为 true 时抛 `asyncio.CancelledError`
目的:防止“router 已结束但仍进入 worker”。
**Step 7: 写 runner 单测(先红)**
新增测试用例:
- cancel 信号触发后,`execute` 抛出 `CancelledError`
- worker 未被继续执行(或中途被中断)
**Step 8: 运行 runner 测试**
Run: `uv run pytest backend/tests/unit/core/agentscope/runtime/test_runner.py -k cancel -q`
Expected: PASS
**Step 9: 提交 runner 变更**
```bash
git add backend/src/core/agentscope/runtime/runner.py backend/tests/unit/core/agentscope/runtime/test_runner.py
git commit -m "feat: add cooperative cancellation watcher to agentscope runner"
```
### Task 4: orchestrator 与 task worker 处理 CancelledError 终态
**Files:**
- Modify: `backend/src/core/agentscope/runtime/orchestrator.py`
- Modify: `backend/src/core/agentscope/runtime/tasks.py`
- Test: `backend/tests/unit/core/agentscope/runtime/test_orchestrator.py`
- Test: `backend/tests/unit/core/agentscope/runtime/test_tasks.py`
**Step 1: orchestrator 单独捕获 CancelledError**
`orchestrator.run()` 添加:
```python
except asyncio.CancelledError:
await self._pipeline.emit(... RUN_ERROR code="RUN_CANCELED" ...)
raise
```
保留现有 `except Exception` 处理系统错误。
**Step 2: task 层构造 cancel_checker 并注入 runtime.run**
`tasks.py`
- 构造 key`agent:cancel:{thread_id}:{run_id}`
- 定义 `async def cancel_checker() -> bool: return bool(await redis.exists(key))`
- 调用 `runtime.run(..., cancel_checker=cancel_checker)`
**Step 3: task 层补资源清理**
`run_agentscope_task``finally`
- 删除 cancel key 或缩短 TTL
- 记录日志(仅必要字段)
**Step 4: 写 orchestrator 单测(先红)**
验证:
- 收到 `CancelledError` 时发 `RUN_ERROR``code == "RUN_CANCELED"`
**Step 5: 写 tasks 单测(先红)**
验证:
- runtime 收到的 `cancel_checker` 可用
- key 命中时上抛 `CancelledError` 路径成立
**Step 6: 运行测试**
Run: `uv run pytest backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py -k cancel -q`
Expected: PASS
**Step 7: 提交 runtime 编排层变更**
```bash
git add backend/src/core/agentscope/runtime/orchestrator.py backend/src/core/agentscope/runtime/tasks.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py
git commit -m "fix: emit RUN_CANCELED error when run task is interrupted"
```
### Task 5: 事件流与持久化一致性回归
**Files:**
- Modify: `backend/tests/unit/core/agentscope/events/test_store.py`
- Modify: `backend/tests/unit/core/agentscope/events/test_agui_codec.py`
- Modify: `backend/tests/integration/v1/agent/test_sse_flow_live.py`
**Step 1: 补 event store 行为测试**
新增断言:
-`RUN_ERROR``code=RUN_CANCELED` 时,session 状态依然为 `FAILED`
**Step 2: 补 codec 测试**
新增断言:
- `RUN_ERROR``code` 字段能正确透传到 wire event
**Step 3: 补 SSE 集成测试**
场景:
- 触发 `/runs`
- 触发 `/runs/{thread_id}/cancel`
- SSE 最终出现 `RUN_ERROR(code=RUN_CANCELED)`
**Step 4: 运行事件相关测试**
Run: `uv run pytest backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py backend/tests/integration/v1/agent/test_sse_flow_live.py -k "cancel or run_error" -q`
Expected: PASS
**Step 5: 提交事件层变更**
```bash
git add backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py backend/tests/integration/v1/agent/test_sse_flow_live.py
git commit -m "test: cover RUN_CANCELED propagation across store codec and SSE"
```
### Task 6: 全量验证与发布前检查
**Files:**
- Modify: `docs/protocols/agent/api-endpoints.md`(如需补充最终字段)
- Modify: `docs/protocols/agent/sse-events.md`(如需补充最终字段)
**Step 1: 运行受影响单元测试集合**
Run:
```bash
uv run pytest backend/tests/unit/v1/agent/test_service.py backend/tests/unit/core/agentscope/runtime/test_runner.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py -q
```
Expected: PASS
**Step 2: 运行受影响集成测试集合**
Run:
```bash
uv run pytest backend/tests/integration/v1/agent/test_routes.py backend/tests/integration/v1/agent/test_sse_flow_live.py -q
```
Expected: PASS
**Step 3: 运行静态检查**
Run:
```bash
uv run ruff check backend/src backend/tests
uv run basedpyright
```
Expected: PASS(无新增 lint/type 错误)
**Step 4: 手工验证路径**
手工流程:
- 发起 `/runs`
- 立刻调用 `/runs/{thread_id}/cancel?runId=...`
- 观察 SSE:应以 `RUN_ERROR(code=RUN_CANCELED)` 结束
- 检查 session`status=failed`
**Step 5: 最终提交**
```bash
git add docs/protocols/agent/api-endpoints.md docs/protocols/agent/sse-events.md backend/src/v1/agent/*.py backend/src/core/agentscope/runtime/*.py backend/tests/unit/core/agentscope/runtime/*.py backend/tests/unit/core/agentscope/events/*.py backend/tests/unit/v1/agent/test_service.py backend/tests/integration/v1/agent/test_routes.py backend/tests/integration/v1/agent/test_sse_flow_live.py
git commit -m "feat: support run cancellation with RUN_CANCELED failed semantics"
```
---
## 风险与回滚
- 风险 1:cancel key 误命中导致误中断
- 缓解:key 粒度使用 `thread_id + run_id`,并设置 TTL
- 风险 2:中断时出现重复终态事件
- 缓解:在 orchestrator 保证 CancelledError 只走 `RUN_ERROR` 分支,避免继续发 `RUN_FINISHED`
- 风险 3:高并发下 Redis 轮询压力上升
- 缓解:轮询间隔 200ms,后续按并发量评估改为 pub/sub
回滚策略:
- 回滚 `router/service/dependencies` cancel 新接口
- 回滚 `runner/orchestrator/tasks` cancel 注入逻辑
- 保持原 `POST /runs` 与 SSE 流程不变
### Task 7: 前端接入 cancel API(发送后“停止生成”按钮走后端真实取消)
**Files:**
- Modify: `apps/lib/features/chat/data/services/ag_ui_service.dart`
- Modify: `apps/lib/features/chat/presentation/bloc/chat_bloc.dart`
- Modify: `apps/lib/features/chat/data/models/ag_ui_event.dart`
- Modify: `apps/lib/features/home/ui/screens/home_screen_interactions.dart`
- Test: `apps/test/features/chat/data/services/ag_ui_service_test.dart`
- Test: `apps/test/features/chat/presentation/chat_bloc_attachment_sync_test.dart`
**Step 1: 在 AgUiService 维护当前运行态标识**
`AgUiService` 增加字段:
- `_activeThreadIdForRun: String?`
- `_activeRunId: String?`
并在 `sendMessage` 成功拿到 `/runs` 响应后设置这两个字段;在收到目标 run 的终态事件(`RUN_FINISHED` / `RUN_ERROR`)后清理。
**Step 2: 将 cancelCurrentRun 从“仅断 SSE”升级为“先调用后端 cancel,再本地收流”**
`AgUiService.cancelCurrentRun()` 改为:
1.`_activeThreadIdForRun``_activeRunId` 为空:退化为当前行为(仅关闭 SSE)
2. 否则先调用:
```text
POST /api/v1/agent/runs/{threadId}/cancel?runId={runId}
```
3. 请求成功后再执行 `_cancelActiveSseSubscription()`(避免继续占用本地连接)
4. 不论后端是否即时生效,都清理本地 active run 字段,防止重复 cancel
说明:这一步就是把“发送消息后的停止按钮”真正连到后端取消能力。
**Step 3: 错误语义细化(前端展示友好)**
`chat_bloc.dart` 处理 `RunErrorEvent` 时:
- 如果 `errorEvent.code == 'RUN_CANCELED'`,错误文案不按失败提示展示(可置空或显示“已停止生成”)
- 仍执行 `_resetRunState` 与 tool 卡片收尾,保持 UI 一致性
**Step 4: 保持现有按钮入口,不改交互入口路径**
`home_screen_interactions.dart` 里的 `_onStopGenerating -> _chatBloc.cancelCurrentRun()` 已经是正确入口,继续复用。
仅调整 Toast 文案策略:
- 请求已发出:`已请求停止`
- 收到 `RUN_ERROR(code=RUN_CANCELED)`:最终态 `已停止生成`
**Step 5: 写 AgUiService 测试(先红)**
`ag_ui_service_test.dart` 增加:
- `cancelCurrentRun` 会调用新端点 `/api/v1/agent/runs/{threadId}/cancel`
- query 参数包含 `runId`
- 调用后会关闭当前 SSE subscription
**Step 6: 写 ChatBloc 测试(先红)**
`chat_bloc_attachment_sync_test.dart` 增加:
- 收到 `RunErrorEvent(message: 'run canceled by user', code: 'RUN_CANCELED')` 后:
- `isWaitingFirstToken/isStreaming/isCancelling` 全部归零
- 不显示普通失败文案(或显示取消态文案,按你们最终文案策略断言)
**Step 7: 运行 Flutter 测试**
Run:
```bash
flutter test apps/test/features/chat/data/services/ag_ui_service_test.dart apps/test/features/chat/presentation/chat_bloc_attachment_sync_test.dart
```
Expected: PASS
**Step 8: 前端接入提交**
```bash
git add apps/lib/features/chat/data/services/ag_ui_service.dart apps/lib/features/chat/presentation/bloc/chat_bloc.dart apps/lib/features/chat/data/models/ag_ui_event.dart apps/lib/features/home/ui/screens/home_screen_interactions.dart apps/test/features/chat/data/services/ag_ui_service_test.dart apps/test/features/chat/presentation/chat_bloc_attachment_sync_test.dart
git commit -m "feat: wire stop-generating button to backend run cancel API"
```