feat(agent): add interrupt-aware tool dispatcher

This commit is contained in:
qzl
2026-03-03 15:44:41 +08:00
parent dedd23fdf9
commit 3a64410641
2 changed files with 108 additions and 0 deletions
+54
View File
@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import Any
from pydantic import BaseModel
class InterruptResult(BaseModel):
interrupt_type: str
tool_name: str
tool_args: dict[str, Any]
class BackendExecutionResult(BaseModel):
tool_name: str
tool_args: dict[str, Any]
result: Any | None = None
class ToolDispatcher:
def dispatch(
self, tool: dict[str, Any]
) -> InterruptResult | BackendExecutionResult:
return dispatch_tool_call(tool)
def dispatch_tool_call(
tool: dict[str, Any],
) -> InterruptResult | BackendExecutionResult:
name = tool["name"]
target = tool["execution_target"]
args = tool.get("args", {})
if target == "frontend":
return InterruptResult(
interrupt_type="tool_execution",
tool_name=name,
tool_args=args,
)
if target == "backend":
requires_approval = tool.get("requires_approval", False)
if requires_approval:
return InterruptResult(
interrupt_type="approval_required",
tool_name=name,
tool_args=args,
)
return BackendExecutionResult(
tool_name=name,
tool_args=args,
)
raise ValueError(f"Unknown execution_target: {target}")