Files
social-app/docs/plans/2026-03-25-agent-run-cancel-failed.md
T
2026-03-25 18:33:25 +08:00

16 KiB
Raw Blame History

Agent Run Cancel (Failed Semantics) Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal:/api/v1/agent/runs 增加可中断能力,在用户触发 cancel 后真正停止运行中的 agent 流程,并以 RUN_ERROR(code=RUN_CANCELED) 结束,最终将 session 状态落为 failed

Architecture: 使用“协作取消 + 主任务中断”方案:API 层写入 Redis cancel 信号,runtime 在 worker 进程内并行 watcher 监听信号,命中后先调用 active agent 的 interrupt() 做优雅收尾,再 cancel() 当前 run 主任务做硬兜底。终态统一通过 RUN_ERROR 事件落库,复用现有 FAILED 会话语义,避免数据库枚举迁移。

Tech Stack: FastAPI, TaskIQ, Redis, AgentScope, SQLAlchemy, Flutter, Pytest, Ruff, BasedPyright


Task 1: 先更新协议文档(接口与事件语义)

Files:

  • Modify: docs/protocols/agent/api-endpoints.md
  • Modify: docs/protocols/agent/sse-events.md

Step 1: 在 API 文档新增 cancel 端点契约

api-endpoints.md 的端点清单添加:

| POST | `/runs/{thread_id}/cancel` | 请求取消指定 run |

并新增章节说明:

  • 请求参数:thread_id + runId(建议 query
  • 返回:202 Accepted + accepted: true
  • 语义:仅表示“取消请求已接收”,不保证已即时终止

Step 2: 在 SSE 文档补充取消终态语义

sse-events.mdRUN_ERROR 章节补充:

{
  "type": "RUN_ERROR",
  "threadId": "...",
  "runId": "...",
  "message": "run canceled by user",
  "code": "RUN_CANCELED"
}

并明确:

  • RUN_CANCELED 是用户主动中断,不是系统异常
  • 本阶段仍复用 session failed(向后兼容)

Step 3: 文档自检

检查文档是否同时覆盖:

  • HTTP 行为
  • SSE 终态事件
  • 兼容策略(不引入新 session 状态)

Step 4: 提交文档变更

git add docs/protocols/agent/api-endpoints.md docs/protocols/agent/sse-events.md
git commit -m "docs: define agent run cancel API and RUN_CANCELED error semantics"

Task 2: 打通 API 层到队列层的 cancel 信号写入

Files:

  • Modify: backend/src/v1/agent/schemas.py
  • Modify: backend/src/v1/agent/dependencies.py
  • Modify: backend/src/v1/agent/service.py
  • Modify: backend/src/v1/agent/router.py
  • Test: backend/tests/unit/v1/agent/test_service.py
  • Test: backend/tests/integration/v1/agent/test_routes.py

Step 1: 增加 cancel 接口响应 schema

v1/agent/schemas.py 增加:

class CancelRunResponse(BaseModel):
    model_config = ConfigDict(populate_by_name=True, serialize_by_alias=True)
    thread_id: str = Field(alias="threadId")
    run_id: str = Field(alias="runId")
    accepted: bool

Step 2: 扩展队列协议接口

QueueClientLike 增加:

async def request_cancel(self, *, thread_id: str, run_id: str, requested_by: str) -> None: ...

Step 3: 在 TaskiqQueueClient 实现 request_cancel

v1/agent/dependencies.py 中新增:

  • cancel key 规范:agent:cancel:{thread_id}:{run_id}
  • SET key value EX <ttl> 写入取消信号
  • value 可写 json 字符串(包含 user_id/timestamp

Step 4: 在 service 层新增 cancel_run

v1/agent/service.py 增加方法:

  • 校验 session owner(复用 get_session_owner + ensure_session_owner
  • 调用 self._queue.request_cancel(...)
  • 返回 accepted 结果 DTO

Step 5: 在 router 新增 cancel 路由

v1/agent/router.py 新增:

@router.post("/runs/{thread_id}/cancel", response_model=CancelRunResponse, status_code=202)
async def cancel_run(...):
    ...

约束:

  • runId 必填(建议 query
  • 非 owner 返回 403
  • 参数非法返回 422

Step 6: 写 service 单测(先红)

test_service.py 添加:

  • owner 可发起 cancelqueue.request_cancel 被调用
  • 非 owner cancel 返回 403

Step 7: 运行单测确认失败

Run: uv run pytest backend/tests/unit/v1/agent/test_service.py -k cancel -q

Expected: 至少 1 个测试失败(新逻辑尚未实现)

Step 8: 实现最小代码使测试通过

按 Step 1-5 完成实现,避免额外重构。

Step 9: 运行测试验证通过

Run: uv run pytest backend/tests/unit/v1/agent/test_service.py -k cancel -q

Expected: PASS

Step 10: 增加路由集成测试

test_routes.py 增加:

  • POST /api/v1/agent/runs/{thread_id}/cancel?runId=... 返回 202
  • 响应字段别名正确(threadId/runId/accepted

Step 11: 运行路由测试

Run: uv run pytest backend/tests/integration/v1/agent/test_routes.py -k cancel -q

Expected: PASS

Step 12: 提交 API 层变更

git add backend/src/v1/agent/schemas.py backend/src/v1/agent/dependencies.py backend/src/v1/agent/service.py backend/src/v1/agent/router.py backend/tests/unit/v1/agent/test_service.py backend/tests/integration/v1/agent/test_routes.py
git commit -m "feat: add agent run cancel endpoint and Redis cancel signal"

Task 3: runtime runner 植入取消 watcher 与优雅中断

Files:

  • Modify: backend/src/core/agentscope/runtime/runner.py
  • Test: backend/tests/unit/core/agentscope/runtime/test_runner.py

Step 1: 在 runner.execute 增加 cancel_checker 参数

更新 execute() 签名:

cancel_checker: Callable[[], Awaitable[bool]] | None = None

并保持默认 None 向后兼容。

Step 2: 增加 active agent 引用与锁

AgentScopeRunner.__init__ 增加:

  • self._active_agent: JsonReActAgent | None = None
  • self._active_agent_lock = asyncio.Lock()

Step 3: 在 _run_worker_stage 设置 active agent 生命周期

agent.reply_json(...) 前后包裹:

  • before: 记录 self._active_agent = agent
  • finally: 清理引用

Step 4: 新增 _watch_cancel_signal 协程

行为:

  • 循环调用 cancel_checker()
  • 命中后先尝试 await active_agent.interrupt()
  • run_task.cancel("run canceled by user")
  • 间隔 await asyncio.sleep(0.2)

Step 5: 在 execute 启停 watcher

  • run_task = asyncio.current_task()
  • 如果有 cancel_checkercreate_task(_watch_cancel_signal(...))
  • finally 中停止 watcher 并 await 回收

Step 6: 补 stage 边界取消 gate(关键)

在 router 结束后、worker 开始前检查一次 cancel_checker()

  • 为 true 时抛 asyncio.CancelledError

目的:防止“router 已结束但仍进入 worker”。

Step 7: 写 runner 单测(先红)

新增测试用例:

  • cancel 信号触发后,execute 抛出 CancelledError
  • worker 未被继续执行(或中途被中断)

Step 8: 运行 runner 测试

Run: uv run pytest backend/tests/unit/core/agentscope/runtime/test_runner.py -k cancel -q

Expected: PASS

Step 9: 提交 runner 变更

git add backend/src/core/agentscope/runtime/runner.py backend/tests/unit/core/agentscope/runtime/test_runner.py
git commit -m "feat: add cooperative cancellation watcher to agentscope runner"

Task 4: orchestrator 与 task worker 处理 CancelledError 终态

Files:

  • Modify: backend/src/core/agentscope/runtime/orchestrator.py
  • Modify: backend/src/core/agentscope/runtime/tasks.py
  • Test: backend/tests/unit/core/agentscope/runtime/test_orchestrator.py
  • Test: backend/tests/unit/core/agentscope/runtime/test_tasks.py

Step 1: orchestrator 单独捕获 CancelledError

orchestrator.run() 添加:

except asyncio.CancelledError:
    await self._pipeline.emit(... RUN_ERROR code="RUN_CANCELED" ...)
    raise

保留现有 except Exception 处理系统错误。

Step 2: task 层构造 cancel_checker 并注入 runtime.run

tasks.py

  • 构造 keyagent:cancel:{thread_id}:{run_id}
  • 定义 async def cancel_checker() -> bool: return bool(await redis.exists(key))
  • 调用 runtime.run(..., cancel_checker=cancel_checker)

Step 3: task 层补资源清理

run_agentscope_taskfinally

  • 删除 cancel key 或缩短 TTL
  • 记录日志(仅必要字段)

Step 4: 写 orchestrator 单测(先红)

验证:

  • 收到 CancelledError 时发 RUN_ERRORcode == "RUN_CANCELED"

Step 5: 写 tasks 单测(先红)

验证:

  • runtime 收到的 cancel_checker 可用
  • key 命中时上抛 CancelledError 路径成立

Step 6: 运行测试

Run: uv run pytest backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py -k cancel -q

Expected: PASS

Step 7: 提交 runtime 编排层变更

git add backend/src/core/agentscope/runtime/orchestrator.py backend/src/core/agentscope/runtime/tasks.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py
git commit -m "fix: emit RUN_CANCELED error when run task is interrupted"

Task 5: 事件流与持久化一致性回归

Files:

  • Modify: backend/tests/unit/core/agentscope/events/test_store.py
  • Modify: backend/tests/unit/core/agentscope/events/test_agui_codec.py
  • Modify: backend/tests/integration/v1/agent/test_sse_flow_live.py

Step 1: 补 event store 行为测试

新增断言:

  • RUN_ERRORcode=RUN_CANCELED 时,session 状态依然为 FAILED

Step 2: 补 codec 测试

新增断言:

  • RUN_ERRORcode 字段能正确透传到 wire event

Step 3: 补 SSE 集成测试

场景:

  • 触发 /runs
  • 触发 /runs/{thread_id}/cancel
  • SSE 最终出现 RUN_ERROR(code=RUN_CANCELED)

Step 4: 运行事件相关测试

Run: uv run pytest backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py backend/tests/integration/v1/agent/test_sse_flow_live.py -k "cancel or run_error" -q

Expected: PASS

Step 5: 提交事件层变更

git add backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py backend/tests/integration/v1/agent/test_sse_flow_live.py
git commit -m "test: cover RUN_CANCELED propagation across store codec and SSE"

Task 6: 全量验证与发布前检查

Files:

  • Modify: docs/protocols/agent/api-endpoints.md(如需补充最终字段)
  • Modify: docs/protocols/agent/sse-events.md(如需补充最终字段)

Step 1: 运行受影响单元测试集合

Run:

uv run pytest backend/tests/unit/v1/agent/test_service.py backend/tests/unit/core/agentscope/runtime/test_runner.py backend/tests/unit/core/agentscope/runtime/test_orchestrator.py backend/tests/unit/core/agentscope/runtime/test_tasks.py backend/tests/unit/core/agentscope/events/test_store.py backend/tests/unit/core/agentscope/events/test_agui_codec.py -q

Expected: PASS

Step 2: 运行受影响集成测试集合

Run:

uv run pytest backend/tests/integration/v1/agent/test_routes.py backend/tests/integration/v1/agent/test_sse_flow_live.py -q

Expected: PASS

Step 3: 运行静态检查

Run:

uv run ruff check backend/src backend/tests
uv run basedpyright

Expected: PASS(无新增 lint/type 错误)

Step 4: 手工验证路径

手工流程:

  • 发起 /runs
  • 立刻调用 /runs/{thread_id}/cancel?runId=...
  • 观察 SSE:应以 RUN_ERROR(code=RUN_CANCELED) 结束
  • 检查 sessionstatus=failed

Step 5: 最终提交

git add docs/protocols/agent/api-endpoints.md docs/protocols/agent/sse-events.md backend/src/v1/agent/*.py backend/src/core/agentscope/runtime/*.py backend/tests/unit/core/agentscope/runtime/*.py backend/tests/unit/core/agentscope/events/*.py backend/tests/unit/v1/agent/test_service.py backend/tests/integration/v1/agent/test_routes.py backend/tests/integration/v1/agent/test_sse_flow_live.py
git commit -m "feat: support run cancellation with RUN_CANCELED failed semantics"

风险与回滚

  • 风险 1:cancel key 误命中导致误中断
    • 缓解:key 粒度使用 thread_id + run_id,并设置 TTL
  • 风险 2:中断时出现重复终态事件
    • 缓解:在 orchestrator 保证 CancelledError 只走 RUN_ERROR 分支,避免继续发 RUN_FINISHED
  • 风险 3:高并发下 Redis 轮询压力上升
    • 缓解:轮询间隔 200ms,后续按并发量评估改为 pub/sub

回滚策略:

  • 回滚 router/service/dependencies cancel 新接口
  • 回滚 runner/orchestrator/tasks cancel 注入逻辑
  • 保持原 POST /runs 与 SSE 流程不变

Task 7: 前端接入 cancel API(发送后“停止生成”按钮走后端真实取消)

Files:

  • Modify: apps/lib/features/chat/data/services/ag_ui_service.dart
  • Modify: apps/lib/features/chat/presentation/bloc/chat_bloc.dart
  • Modify: apps/lib/features/chat/data/models/ag_ui_event.dart
  • Modify: apps/lib/features/home/ui/screens/home_screen_interactions.dart
  • Test: apps/test/features/chat/data/services/ag_ui_service_test.dart
  • Test: apps/test/features/chat/presentation/chat_bloc_attachment_sync_test.dart

Step 1: 在 AgUiService 维护当前运行态标识

AgUiService 增加字段:

  • _activeThreadIdForRun: String?
  • _activeRunId: String?

并在 sendMessage 成功拿到 /runs 响应后设置这两个字段;在收到目标 run 的终态事件(RUN_FINISHED / RUN_ERROR)后清理。

Step 2: 将 cancelCurrentRun 从“仅断 SSE”升级为“先调用后端 cancel,再本地收流”

AgUiService.cancelCurrentRun() 改为:

  1. _activeThreadIdForRun_activeRunId 为空:退化为当前行为(仅关闭 SSE)
  2. 否则先调用:
POST /api/v1/agent/runs/{threadId}/cancel?runId={runId}
  1. 请求成功后再执行 _cancelActiveSseSubscription()(避免继续占用本地连接)
  2. 不论后端是否即时生效,都清理本地 active run 字段,防止重复 cancel

说明:这一步就是把“发送消息后的停止按钮”真正连到后端取消能力。

Step 3: 错误语义细化(前端展示友好)

chat_bloc.dart 处理 RunErrorEvent 时:

  • 如果 errorEvent.code == 'RUN_CANCELED',错误文案不按失败提示展示(可置空或显示“已停止生成”)
  • 仍执行 _resetRunState 与 tool 卡片收尾,保持 UI 一致性

Step 4: 保持现有按钮入口,不改交互入口路径

home_screen_interactions.dart 里的 _onStopGenerating -> _chatBloc.cancelCurrentRun() 已经是正确入口,继续复用。

仅调整 Toast 文案策略:

  • 请求已发出:已请求停止
  • 收到 RUN_ERROR(code=RUN_CANCELED):最终态 已停止生成

Step 5: 写 AgUiService 测试(先红)

ag_ui_service_test.dart 增加:

  • cancelCurrentRun 会调用新端点 /api/v1/agent/runs/{threadId}/cancel
  • query 参数包含 runId
  • 调用后会关闭当前 SSE subscription

Step 6: 写 ChatBloc 测试(先红)

chat_bloc_attachment_sync_test.dart 增加:

  • 收到 RunErrorEvent(message: 'run canceled by user', code: 'RUN_CANCELED') 后:
    • isWaitingFirstToken/isStreaming/isCancelling 全部归零
    • 不显示普通失败文案(或显示取消态文案,按你们最终文案策略断言)

Step 7: 运行 Flutter 测试

Run:

flutter test apps/test/features/chat/data/services/ag_ui_service_test.dart apps/test/features/chat/presentation/chat_bloc_attachment_sync_test.dart

Expected: PASS

Step 8: 前端接入提交

git add apps/lib/features/chat/data/services/ag_ui_service.dart apps/lib/features/chat/presentation/bloc/chat_bloc.dart apps/lib/features/chat/data/models/ag_ui_event.dart apps/lib/features/home/ui/screens/home_screen_interactions.dart apps/test/features/chat/data/services/ag_ui_service_test.dart apps/test/features/chat/presentation/chat_bloc_attachment_sync_test.dart
git commit -m "feat: wire stop-generating button to backend run cancel API"