feat: 实现 AgentScope tool call context,支持 runtime 上下文续接

This commit is contained in:
qzl
2026-03-17 14:12:44 +08:00
parent d6cc5a0dae
commit 3bf7640000
15 changed files with 547 additions and 243 deletions
+5 -3
View File
@@ -251,10 +251,12 @@ Agent loop functionality MUST follow the AG-UI protocol. **Use the `ag-ui` skill
Custom tool `ToolAgentOutput` MUST follow these rules:
- Use field name `result` only. Do not introduce or keep `result_summary` compatibility aliases.
- `metadata.tool_agent_output` is the canonical source for runtime observation and history replay.
- `tool_call_args` stores input snapshot only; avoid mixing execution output into `tool_call_args`.
- `result` stores output facts only; do not repeat input parameters already present in `tool_call_args`.
- `result` is for downstream agent reasoning and tool chaining, not for end-user presentation.
- Prefer compact structural facts over prose: include identifiers and execution-critical facts (`id`, `status`, `count`, `page`, operation outcome, missing required args).
- For list/read tools, include multiple candidate records when needed (at least top matches) with stable identifiers.
- For write tools, always include affected resource identifiers in `result`.
- For list/read tools, include multiple candidate records when needed (at least top matches) with stable identifiers and scheduling-critical fields.
- For write tools, include per-item operation outcomes and affected resource identifiers in `result`.
- Keep `result` concise, deterministic, and machine-oriented; avoid decorative wording and UI-style formatting.
## Multi-Agent Orchestration (AgentScope Framework)
@@ -109,7 +109,7 @@ class PipelineStageEmitter:
"role": "tool",
"stage": self._stage,
"tool_name": tool_output.tool_name,
"tool_call_id": tool_output.tool_call_id,
"tool_call_id": tool_call_id,
"tool_call_args": tool_output.tool_call_args,
"status": tool_output.status.value,
"result": tool_output.result,
+49 -5
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import base64
import json
from typing import Any, cast
from uuid import UUID
@@ -18,10 +19,13 @@ from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.logging import get_logger
from core.taskiq.app import bulk_broker, critical_broker, default_broker
from schemas.messages.chat_message import (
AgentChatMessageMetadata,
extract_user_message_attachments,
)
from schemas.user import UserContext
from services.base.redis import get_or_init_redis_client
from services.base.supabase import supabase_service
from schemas.messages.chat_message import extract_user_message_attachments
from v1.agent.dependencies import get_agent_service
from v1.users.dependencies import get_user_service
@@ -29,6 +33,33 @@ logger = get_logger("core.agentscope.runtime.tasks")
_MAX_CONTEXT_ATTACHMENTS = 3
def _serialize_tool_agent_output(
*,
metadata: AgentChatMessageMetadata | dict[str, object] | None,
) -> str | None:
if metadata is None:
return None
try:
resolved_metadata = (
metadata
if isinstance(metadata, AgentChatMessageMetadata)
else AgentChatMessageMetadata.model_validate(metadata)
)
except Exception:
return None
tool_agent_output = resolved_metadata.tool_agent_output
if tool_agent_output is None:
return None
return json.dumps(
tool_agent_output.model_dump(mode="json", exclude_none=True),
ensure_ascii=True,
separators=(",", ":"),
)
def _load_runtime() -> type[Any]:
return AgentScopeRuntimeOrchestrator
@@ -53,16 +84,25 @@ async def _build_recent_context_messages(
if not result:
return []
raw_messages: list[dict[str, Any]] = result.get("messages") or []
raw_messages: list[dict[str, object]] = result.get("messages") or []
if not raw_messages:
return []
converted: list[Msg] = []
for msg in raw_messages:
role = msg.get("role")
content = msg.get("content", "")
metadata = msg.get("metadata")
role_raw = msg.get("role")
role = role_raw if isinstance(role_raw, str) else "user"
content_raw = msg.get("content", "")
content: str = content_raw if isinstance(content_raw, str) else ""
metadata_raw = msg.get("metadata")
metadata: AgentChatMessageMetadata | dict[str, object] | None
if isinstance(metadata_raw, AgentChatMessageMetadata):
metadata = metadata_raw
elif isinstance(metadata_raw, dict):
metadata = metadata_raw
else:
metadata = None
if role == "user" and metadata:
image_blocks: list[dict[str, Any]] = []
@@ -105,6 +145,10 @@ async def _build_recent_context_messages(
if role == "tool":
role = "assistant"
tool_content = _serialize_tool_agent_output(metadata=metadata)
if not tool_content:
continue
content = tool_content
converted.append(
Msg(
@@ -18,6 +18,7 @@ from core.agentscope.tools.utils.calendar_ui import (
calendar_error_output,
dump_tool_output,
)
from core.agentscope.tools.tool_call_context import get_current_tool_call_id
from schemas.agent.runtime_models import ErrorInfo, ToolAgentOutput, ToolStatus
from v1.schedule_items.schemas import (
ScheduleItemCreateRequest,
@@ -75,9 +76,28 @@ def _format_event_brief(event_items: list[dict[str, Any]], limit: int = 3) -> st
event_id = str(item.get("id") or "")
title = str(item.get("title") or "")
start_at = str(item.get("startAt") or "")
end_at = str(item.get("endAt") or "")
timezone = str(item.get("timezone") or "")
status = str(item.get("status") or "")
description = str(item.get("description") or "")
location = str(item.get("location") or "")
reminder_minutes = item.get("reminderMinutes")
color = str(item.get("color") or "")
source_type = str(item.get("sourceType") or "")
updated_at = str(item.get("updatedAt") or "")
permission = item.get("permission")
is_owner = item.get("isOwner")
if not event_id:
continue
briefs.append(f"{{id={event_id},title={title},startAt={start_at}}}")
briefs.append(
"{"
f"id={event_id},title={title},startAt={start_at},endAt={end_at},"
f"timezone={timezone},status={status},description={description},"
f"location={location},reminderMinutes={reminder_minutes},color={color},"
f"sourceType={source_type},updatedAt={updated_at},permission={permission},"
f"isOwner={is_owner}"
"}"
)
return ",".join(briefs)
@@ -129,18 +149,17 @@ async def calendar_read(
)
total_pages = (total + page_size - 1) // page_size if total else 0
event_items = [schedule_event_to_dict(item) for item in items]
query_value = (query or "").strip() or "*"
event_brief = _format_event_brief(event_items)
summary = (
f"status=success query={query_value} total={total} page={page}/"
f"{total_pages or 1} returned={len(event_items)}"
f"status=success total={total} total_pages={total_pages or 1} "
f"returned={len(event_items)} has_next={str(page < (total_pages or 1)).lower()}"
)
if event_brief:
summary = f"{summary} items=[{event_brief}]"
return dump_tool_output(
ToolAgentOutput(
tool_name=tool_name,
tool_call_id=f"{tool_name}-call",
tool_call_id=get_current_tool_call_id(tool_name=tool_name),
tool_call_args=tool_call_args,
status=ToolStatus.SUCCESS,
result=summary,
@@ -359,11 +378,8 @@ async def calendar_write(
success_count += 1
result_items.append(
{
"index": idx,
"operation": operation,
"status": "success",
"eventId": str(created.id),
"message": f"日程「{created.title}」已创建",
}
)
success_event_ids.append(str(created.id))
@@ -397,6 +413,7 @@ async def calendar_write(
color=cast(str | None, color),
reminder_minutes=cast(int | None, reminder_minutes),
)
changed_fields = sorted(update_data.keys())
updated = await service.update(
parsed_event_id,
ScheduleItemUpdateRequest.model_validate(update_data),
@@ -404,11 +421,9 @@ async def calendar_write(
success_count += 1
result_items.append(
{
"index": idx,
"operation": operation,
"status": "success",
"eventId": str(updated.id),
"message": f"日程「{updated.title}」已更新",
"changedFields": changed_fields,
}
)
success_event_ids.append(str(updated.id))
@@ -421,11 +436,8 @@ async def calendar_write(
success_count += 1
result_items.append(
{
"index": idx,
"operation": operation,
"status": "success",
"eventId": event_id,
"message": f"日程 {event_id} 已删除",
}
)
success_event_ids.append(event_id)
@@ -435,8 +447,6 @@ async def calendar_write(
failed_count += 1
result_items.append(
{
"index": idx,
"operation": operation,
"status": "failure",
"eventId": event_id,
"code": code,
@@ -447,21 +457,30 @@ async def calendar_write(
if failed_count == 0:
final_status = ToolStatus.SUCCESS
summary = (
f"status=success batch={batch_size} success={success_count} "
f"failed={failed_count} ids=[{','.join(success_event_ids)}]"
f"status=success success={success_count} failed={failed_count} "
f"ids=[{','.join(success_event_ids)}]"
)
elif success_count == 0:
final_status = ToolStatus.FAILURE
summary = (
f"status=failure batch={batch_size} success={success_count} "
f"failed={failed_count}"
)
summary = f"status=failure success={success_count} failed={failed_count}"
else:
final_status = ToolStatus.PARTIAL
summary = (
f"status=partial batch={batch_size} success={success_count} "
f"failed={failed_count} ids=[{','.join(success_event_ids)}]"
f"status=partial success={success_count} failed={failed_count} "
f"ids=[{','.join(success_event_ids)}]"
)
compact_items = ",".join(
[
"{"
f"status={item.get('status')},"
f"eventId={item.get('eventId')},code={item.get('code')},"
f"changedFields={item.get('changedFields')}"
"}"
for item in result_items
]
)
if compact_items:
summary = f"{summary} items=[{compact_items}]"
error_info: ErrorInfo | None = None
if final_status == ToolStatus.FAILURE:
@@ -477,7 +496,11 @@ async def calendar_write(
code=str(
first_failure.get("code") if first_failure else "BATCH_FAILED"
),
message=str(first_failure.get("message") if first_failure else summary),
message=str(
first_failure.get("message")
if first_failure and first_failure.get("message")
else summary
),
retryable=False,
details={"results": result_items},
)
@@ -489,7 +512,7 @@ async def calendar_write(
return dump_tool_output(
ToolAgentOutput(
tool_name=tool_name,
tool_call_id=f"{tool_name}-call",
tool_call_id=get_current_tool_call_id(tool_name=tool_name),
tool_call_args=tool_call_args,
status=final_status,
result=summary,
@@ -597,11 +620,13 @@ async def calendar_share(
retryable=False,
)
summary = f"status=success event_id={event_id} invited_count={len(invited)}"
summary = (
f"status=success invited_count={len(invited)} invited=[{','.join(invited)}]"
)
return dump_tool_output(
ToolAgentOutput(
tool_name=tool_name,
tool_call_id=f"{tool_name}-call",
tool_call_id=get_current_tool_call_id(tool_name=tool_name),
tool_call_args=tool_call_args,
status=ToolStatus.SUCCESS,
result=summary,
@@ -7,6 +7,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from agentscope.tool import ToolResponse
from core.agentscope.tools.tool_call_context import get_current_tool_call_id
from core.agentscope.tools.utils import (
find_auth_email_by_user_id,
list_auth_users,
@@ -33,7 +34,7 @@ def _lookup_error_output(
) -> ToolResponse:
output = build_error_output(
tool_name="user_lookup",
tool_call_id="user_lookup-call",
tool_call_id=get_current_tool_call_id(tool_name="user_lookup"),
code=code,
message=message,
retryable=retryable,
@@ -148,7 +149,7 @@ async def user_lookup(
return _dump_tool_output(
ToolAgentOutput(
tool_name="user_lookup",
tool_call_id="user_lookup-call",
tool_call_id=get_current_tool_call_id(tool_name="user_lookup"),
tool_call_args=tool_call_args,
status=ToolStatus.SUCCESS,
result=summary,
@@ -0,0 +1,24 @@
from __future__ import annotations
from contextvars import ContextVar, Token
from uuid import uuid4
_CURRENT_TOOL_CALL_ID: ContextVar[str | None] = ContextVar(
"current_tool_call_id",
default=None,
)
def set_current_tool_call_id(tool_call_id: str | None) -> Token[str | None]:
return _CURRENT_TOOL_CALL_ID.set(tool_call_id)
def reset_current_tool_call_id(token: Token[str | None]) -> None:
_CURRENT_TOOL_CALL_ID.reset(token)
def get_current_tool_call_id(*, tool_name: str) -> str:
current = _CURRENT_TOOL_CALL_ID.get()
if isinstance(current, str) and current.strip():
return current.strip()
return f"{tool_name}-call-{uuid4().hex}"
@@ -1,7 +1,12 @@
from __future__ import annotations
from typing import Any, AsyncGenerator, Callable
from uuid import uuid4
from core.agentscope.tools.tool_call_context import (
reset_current_tool_call_id,
set_current_tool_call_id,
)
from core.agentscope.tools.utils.tool_response_builder import (
build_error_response,
)
@@ -17,6 +22,7 @@ def register_tool_middlewares(
| None = None,
) -> None:
effective_config = config_by_name or meta_by_name or TOOL_CONFIGS
toolkit.register_middleware(create_tool_call_context_middleware())
toolkit.register_middleware(
create_approval_middleware(
config_by_name=effective_config,
@@ -25,12 +31,40 @@ def register_tool_middlewares(
)
def create_tool_call_context_middleware() -> Callable[..., AsyncGenerator[Any, None]]:
async def tool_call_context_middleware(
kwargs: dict[str, Any],
next_handler: Callable[..., Any],
) -> AsyncGenerator[Any, None]:
tool_call = kwargs.get("tool_call")
tool_call_id: str | None = None
if isinstance(tool_call, dict):
raw_id = tool_call.get("id")
if isinstance(raw_id, str) and raw_id.strip():
tool_call_id = raw_id.strip()
token = set_current_tool_call_id(tool_call_id)
try:
async for response in await next_handler(**kwargs):
yield response
finally:
reset_current_tool_call_id(token)
return tool_call_context_middleware
def create_approval_middleware(
*,
config_by_name: dict[str, ToolConfig],
approval_resolver: Callable[[str, dict[str, Any], ToolConfig], str | None]
| None = None,
) -> Callable[..., AsyncGenerator[Any, None]]:
def _resolve_tool_call_id(tool_call: dict[str, Any]) -> str:
raw_tool_call_id = tool_call.get("id")
if isinstance(raw_tool_call_id, str) and raw_tool_call_id.strip():
return raw_tool_call_id.strip()
return f"tool-call-{uuid4().hex}"
async def approval_middleware(
kwargs: dict[str, Any],
next_handler: Callable[..., Any],
@@ -74,7 +108,7 @@ def create_approval_middleware(
if decision == "rejected":
content = build_error_response(
tool_name=tool_name,
tool_call_id=tool_call.get("id", "unknown"),
tool_call_id=_resolve_tool_call_id(tool_call),
code="TOOL_REJECTED",
message=f"工具 {tool_name} 的调用已被审核拒绝",
retryable=False,
@@ -88,7 +122,7 @@ def create_approval_middleware(
pending_response = build_error_response(
tool_name=tool_name,
tool_call_id=tool_call.get("id", "unknown"),
tool_call_id=_resolve_tool_call_id(tool_call),
code="TOOL_PENDING_APPROVAL",
message=f"工具 {tool_name} 需要审核批准",
retryable=True,
@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import Any
from agentscope.tool import ToolResponse
from core.agentscope.tools.tool_call_context import get_current_tool_call_id
from core.agentscope.tools.utils.tool_response_builder import (
build_error_output,
build_tool_response,
@@ -24,7 +25,7 @@ def calendar_error_output(
) -> ToolResponse:
output = build_error_output(
tool_name=tool_name,
tool_call_id=f"{tool_name}-call",
tool_call_id=get_current_tool_call_id(tool_name=tool_name),
code=code,
message=message,
retryable=retryable,
@@ -0,0 +1,67 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, cast
import pytest
from agentscope.message import Msg
from core.agentscope.runtime.stage_emitter import PipelineStageEmitter
@dataclass
class _FakePipeline:
events: list[dict[str, Any]]
async def emit(self, *, session_id: str, event: dict[str, Any]) -> str:
del session_id
self.events.append(event)
return "ok"
@pytest.mark.asyncio
async def test_tool_result_event_uses_runtime_tool_call_id() -> None:
pipeline = _FakePipeline(events=[])
emitter = PipelineStageEmitter(
pipeline=pipeline,
session_id="thread-1",
run_id="run-1",
stage="worker",
emit_text_events=False,
emit_tool_events=True,
)
tool_output = {
"tool_name": "calendar_read",
"tool_call_id": "calendar_read-call",
"tool_call_args": {"query": "demo"},
"status": "success",
"result": "status=success total=1 returned=1",
}
msg = Msg(
name="worker",
role="assistant",
content=cast(
Any,
[
{
"type": "tool_use",
"id": "runtime-call-123",
"name": "calendar_read",
"input": {"query": "demo"},
},
{
"type": "tool_result",
"id": "runtime-call-123",
"output": [{"type": "text", "text": json.dumps(tool_output)}],
},
],
),
)
await emitter.handle_print(msg=msg, last=True)
result_events = [e for e in pipeline.events if e.get("type") == "TOOL_CALL_RESULT"]
assert len(result_events) == 1
assert result_events[0]["tool_call_id"] == "runtime-call-123"
@@ -6,6 +6,7 @@ from uuid import uuid4
import pytest
import core.agentscope.runtime.tasks as tasks_module
from schemas.agent import ToolStatus
from schemas.user import UserContext, parse_profile_settings
@@ -231,3 +232,88 @@ async def test_build_recent_context_messages_includes_all_user_attachments(
assert content[0]["type"] == "text"
assert content[1]["type"] == "image"
assert content[2]["type"] == "image"
@pytest.mark.asyncio
async def test_build_recent_context_messages_uses_tool_metadata_output(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class _FakeAgentService:
async def load_agent_input_messages(
self,
*,
thread_id: str,
) -> dict[str, object] | None:
del thread_id
return {
"messages": [
{
"role": "tool",
"content": "legacy-content",
"metadata": {
"run_id": "run-1",
"tool_agent_output": {
"tool_name": "calendar_read",
"tool_call_id": "tool-call-1",
"tool_call_args": {
"query": "team sync",
"page": 1,
"page_size": 20,
},
"status": ToolStatus.SUCCESS.value,
"result": "status=success total=1 returned=1",
},
},
}
]
}
monkeypatch.setattr(
tasks_module, "get_agent_service", lambda session: _FakeAgentService()
)
messages = await tasks_module._build_recent_context_messages(
session=object(),
thread_id=str(uuid4()),
)
assert len(messages) == 1
assert messages[0].role == "assistant"
assert messages[0].content == (
'{"tool_name":"calendar_read","tool_call_id":"tool-call-1",'
'"tool_call_args":{"query":"team sync","page":1,"page_size":20},'
'"status":"success","result":"status=success total=1 returned=1"}'
)
@pytest.mark.asyncio
async def test_build_recent_context_messages_skips_tool_without_metadata_output(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class _FakeAgentService:
async def load_agent_input_messages(
self,
*,
thread_id: str,
) -> dict[str, object] | None:
del thread_id
return {
"messages": [
{
"role": "tool",
"content": "legacy-content",
"metadata": {"run_id": "run-1"},
}
]
}
monkeypatch.setattr(
tasks_module, "get_agent_service", lambda session: _FakeAgentService()
)
messages = await tasks_module._build_recent_context_messages(
session=object(),
thread_id=str(uuid4()),
)
assert messages == []
@@ -156,6 +156,7 @@ async def test_calendar_write_create_normalizes_to_utc(
assert payload["status"] == "success"
assert payload["result"].startswith("status=success")
assert "items=[{status=success,eventId=" in payload["result"]
assert fake_service.created_id in payload["result"]
assert fake_service.created_request is not None
request = fake_service.created_request
@@ -207,6 +208,9 @@ async def test_calendar_read_returns_structured_result_with_ids(
assert payload["status"] == "success"
assert payload["result"].startswith("status=success")
assert "query=会议" in payload["result"]
assert "total=1" in payload["result"]
assert "timezone=Asia/Shanghai" in payload["result"]
assert "description=今天下午五点的会议" in payload["result"]
assert "status=" in payload["result"]
assert fake_service.created_id in payload["result"]
assert fake_service.list_calls == [{"page": 1, "page_size": 20, "query": "会议"}]
+8 -2
View File
@@ -12,7 +12,13 @@ services:
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "sh", "-c", "if [ -n \"$$REDIS_PASSWORD\" ]; then redis-cli -a \"$$REDIS_PASSWORD\" ping; else redis-cli ping; fi"]
test:
[
"CMD",
"sh",
"-c",
'if [ -n "$$REDIS_PASSWORD" ]; then redis-cli -a "$$REDIS_PASSWORD" ping; else redis-cli ping; fi',
]
interval: 5s
timeout: 3s
retries: 10
@@ -35,7 +41,7 @@ services:
condition: service_healthy
litellm:
image: ${SOCIAL_LITELLM_IMAGE:-ghcr.io/berriai/litellm@sha256:b959a1816fa454a14d2842242d0fa1cd0d39f96fc94d3a1f4e1de4e48e2398c6}
image: ghcr.io/berriai/litellm@sha256:b959a1816fa454a14d2842242d0fa1cd0d39f96fc94d3a1f4e1de4e48e2398c6
container_name: social-prod-litellm
restart: unless-stopped
env_file:
+1 -1
View File
@@ -122,7 +122,7 @@ Base URL: `/api/v1/agent`
}
```
tool 消息在存储层用于运行时上下文续接,不在 `/history` 对外返回。
tool 消息在存储层用于运行时上下文续接,不在 `/history` 对外返回。续接时以 `metadata.tool_agent_output` 作为主信源(`content` 为轻量摘要)。
### 说明
+6
View File
@@ -165,6 +165,12 @@ data: <json>
说明:`TOOL_CALL_RESULT` 不再携带 `ui_schema`。tool 结果通过 `result` 字段提供紧凑、结构化、可执行的信息(优先包含 id/status/count 等关键事实),用于 agent 后续推理与工具编排。
补充约束:
- `tool_call_id` 必须与同次调用的 `TOOL_CALL_START/ARGS/END.toolCallId` 一致,并在每次工具调用中保持唯一。
- `tool_call_args` 仅表示输入参数快照。
- `result` 仅表示执行输出事实,不重复 `tool_call_args` 已包含的输入参数。
### 3.4 文本完成事件
#### `TEXT_MESSAGE_END`
+5 -1
View File
@@ -98,7 +98,11 @@ data: <json>
- events:在 runtime 发送事件前编译,字段名为 `ui_schema`
- history:在历史转换时编译,字段名为 `ui_schema`
tool 结果不再走 UI 编译链路:`TOOL_CALL_RESULT` 提供 `result`,并在持久化时写入 message `content`
tool 结果不再走 UI 编译链路:`TOOL_CALL_RESULT` 提供 `tool_call_args` + `result` 组合
- `metadata.tool_agent_output` 是 tool 消息的完整信源(用于 runtime observation 与 history replay)。
- `message.content` 保持轻量摘要(当前等于 `result`)。
- `tool_call_args` 记录输入参数,`result` 记录输出事实,二者不做冗余重复。
### 5.3 当前命名差异(实现现状)