feat: 实现 AgentScope ReAct Runner 两阶段执行并重构事件处理

This commit is contained in:
zl-q
2026-03-16 09:01:01 +08:00
parent 072c09d99d
commit dcceb48d84
51 changed files with 5015 additions and 5663 deletions
@@ -1,9 +1,9 @@
from __future__ import annotations
from typing import Any
from uuid import UUID
import pytest
from ag_ui.core import RunAgentInput
from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator
from schemas.user import UserContext, parse_profile_settings
@@ -19,36 +19,9 @@ class _FakePipeline:
class _FakeRunner:
def __init__(self) -> None:
self.last_user_input: str | list[dict[str, Any]] | None = None
async def run_router_then_worker(
self,
*,
session,
user_context,
user_input,
router_toolkit,
worker_toolkit,
extra_context=None,
) -> dict[str, Any]:
del session, user_context, router_toolkit, worker_toolkit, extra_context
self.last_user_input = user_input
return {
"worker": {
"status": "success",
"answer": "done",
"key_points": [],
"result_type": "summary",
"suggested_actions": [],
"error": None,
"response_metadata": {
"model": "qwen3.5-flash",
"inputTokens": 10,
"outputTokens": 5,
},
}
}
async def execute(self, **kwargs: object) -> dict[str, Any]:
del kwargs
return {"worker": {"answer": "done"}}
def _user_context() -> UserContext:
@@ -56,34 +29,17 @@ def _user_context() -> UserContext:
id="00000000-0000-0000-0000-000000000001",
username="alice",
email="alice@example.com",
avatar_url=None,
bio=None,
settings=parse_profile_settings(None),
)
def _run_command_with_binary() -> Any:
from ag_ui.core import RunAgentInput
def _run_input() -> RunAgentInput:
return RunAgentInput.model_validate(
{
"threadId": "00000000-0000-0000-0000-000000000010",
"runId": "run-1",
"state": {},
"messages": [
{
"id": "u1",
"role": "user",
"content": [
{"type": "text", "text": "看这张图"},
{
"type": "binary",
"mimeType": "image/png",
"url": "https://example.com/signed.png",
},
],
}
],
"messages": [{"id": "u1", "role": "user", "content": "hello"}],
"tools": [],
"context": [],
"forwardedProps": {},
@@ -92,71 +48,18 @@ def _run_command_with_binary() -> Any:
@pytest.mark.asyncio
async def test_orchestrator_maps_binary_to_model_image_url(
monkeypatch: pytest.MonkeyPatch,
) -> None:
async def test_orchestrator_emits_run_lifecycle_events() -> None:
pipeline = _FakePipeline()
runner = _FakeRunner()
monkeypatch.setattr(
"core.agentscope.runtime.orchestrator.build_stage_toolkit",
lambda **_: None,
)
orchestrator = AgentScopeRuntimeOrchestrator(pipeline=pipeline, runner=runner)
await orchestrator.run(
thread_id="00000000-0000-0000-0000-000000000010",
run_id="run-1",
context_messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "看这张图"},
{
"type": "image",
"source": {
"type": "url",
"url": "https://example.com/signed.png",
},
},
],
}
],
owner_id=UUID("00000000-0000-0000-0000-000000000001"),
user_context=_user_context(),
session=None,
orchestrator = AgentScopeRuntimeOrchestrator(
pipeline=pipeline, runner=_FakeRunner()
)
assert isinstance(runner.last_user_input, list)
assert runner.last_user_input[0]["type"] == "text"
assert runner.last_user_input[1]["type"] == "image_url"
assert (
runner.last_user_input[1]["image_url"]["url"]
== "https://example.com/signed.png"
)
@pytest.mark.asyncio
async def test_orchestrator_emits_worker_output_on_text_end(
monkeypatch: pytest.MonkeyPatch,
) -> None:
pipeline = _FakePipeline()
runner = _FakeRunner()
monkeypatch.setattr(
"core.agentscope.runtime.orchestrator.build_stage_toolkit",
lambda **_: None,
)
orchestrator = AgentScopeRuntimeOrchestrator(pipeline=pipeline, runner=runner)
await orchestrator.run(
thread_id="00000000-0000-0000-0000-000000000010",
run_id="run-1",
result = await orchestrator.run(
run_input=_run_input(),
context_messages=[],
owner_id=UUID("00000000-0000-0000-0000-000000000001"),
user_context=_user_context(),
session=None,
)
emitted = [item["event"] for item in pipeline.events]
text_end = next(item for item in emitted if item.get("type") == "text.end")
assert text_end["data"]["workerAgentOutput"]["answer"] == "done"
assert any(item.get("type") == "run.finished" for item in emitted)
assert result["worker"]["answer"] == "done"
event_types = [item["event"]["type"] for item in pipeline.events]
assert event_types == ["run.started", "run.finished"]
@@ -1,223 +1,201 @@
from __future__ import annotations
import json
from types import SimpleNamespace
import pytest
from ag_ui.core import RunAgentInput
from agentscope.message import Msg
from core.agentscope.schemas.system_agent_config import SystemAgentLLMConfig
from core.agentscope.runtime.config_loader import RuntimeStageConfig
from core.agentscope.runtime.react_runner import (
AgentScopeReActRunner,
_chat_response_text,
_merge_stage_response_metadata,
_parse_json_text,
_to_litellm_model,
StageExecutionResult,
SystemAgentRuntimeConfig,
)
from schemas.agent.runtime_models import (
RouterAgentOutput,
UiMode,
WorkerAgentOutputRich,
)
from schemas.agent.system_agent import AgentType, SystemAgentLLMConfig
from schemas.user.context import UserContext, parse_profile_settings
def _stage_config() -> RuntimeStageConfig:
return RuntimeStageConfig(
stage="intent",
model_code="qwen3.5-flash",
provider_name="dashscope",
llm_config=SystemAgentLLMConfig(
temperature=0.1, max_tokens=128, timeout_seconds=30
),
class _FakePipeline:
def __init__(self) -> None:
self.events: list[dict[str, object]] = []
async def emit(self, *, session_id: str, event: dict[str, object]) -> str:
self.events.append({"session_id": session_id, "event": event})
return "1-0"
class _FakeSessionCtx:
def __init__(self, session: object) -> None:
self._session = session
async def __aenter__(self) -> object:
return self._session
async def __aexit__(self, exc_type, exc, tb) -> None:
del exc_type, exc, tb
def _user_context() -> UserContext:
return UserContext(
id="00000000-0000-0000-0000-000000000001",
username="alice",
email="alice@example.com",
settings=parse_profile_settings(None),
)
def test_to_litellm_model_keeps_prefixed_model() -> None:
assert (
_to_litellm_model(provider_name="dashscope", model_code="openai/gpt-4o")
== "openai/gpt-4o"
)
def test_to_litellm_model_uses_plain_model_name_when_unprefixed() -> None:
assert (
_to_litellm_model(provider_name="dashscope", model_code="qwen3.5-flash")
== "qwen3.5-flash"
)
def test_parse_json_text_supports_fenced_json() -> None:
parsed = _parse_json_text('```json\n{"route":"DIRECT_RESPONSE"}\n```')
assert parsed["route"] == "DIRECT_RESPONSE"
def test_parse_json_text_rejects_non_json() -> None:
with pytest.raises(json.JSONDecodeError):
_parse_json_text("not-json")
def test_chat_response_text_falls_back_to_choice_message_content() -> None:
response = SimpleNamespace(
content=None,
choices=[
{
"message": {
"content": '{"assistant_text":"fallback","response_metadata":{}}'
}
}
],
)
assert (
_chat_response_text(response)
== '{"assistant_text":"fallback","response_metadata":{}}'
)
@pytest.mark.asyncio
async def test_run_json_stage_wraps_json_decode_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
pytest.importorskip("agentscope")
import agentscope.agent as agent_module
import agentscope.formatter as formatter_module
import agentscope.memory as memory_module
class _FakeAgent:
def __init__(self, **kwargs: object) -> None:
del kwargs
async def __call__(self, _msg: object) -> object:
return SimpleNamespace(get_text_content=lambda: "not-json")
monkeypatch.setattr(agent_module, "ReActAgent", _FakeAgent)
monkeypatch.setattr(formatter_module, "OpenAIChatFormatter", lambda: object())
monkeypatch.setattr(memory_module, "InMemoryMemory", lambda: object())
runner = AgentScopeReActRunner()
monkeypatch.setattr(runner, "_build_model", lambda **_: object())
with pytest.raises(RuntimeError, match="agent output format invalid"):
await runner.run_json_stage(
stage_config=_stage_config(),
agent_name="intent-agent",
system_prompt="sys",
user_prompt="user",
toolkit=None,
)
@pytest.mark.asyncio
async def test_run_json_stage_wraps_runtime_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
pytest.importorskip("agentscope")
import agentscope.agent as agent_module
import agentscope.formatter as formatter_module
import agentscope.memory as memory_module
class _FakeAgent:
def __init__(self, **kwargs: object) -> None:
del kwargs
async def __call__(self, _msg: object) -> object:
raise ValueError("boom")
monkeypatch.setattr(agent_module, "ReActAgent", _FakeAgent)
monkeypatch.setattr(formatter_module, "OpenAIChatFormatter", lambda: object())
monkeypatch.setattr(memory_module, "InMemoryMemory", lambda: object())
runner = AgentScopeReActRunner()
monkeypatch.setattr(runner, "_build_model", lambda **_: object())
with pytest.raises(RuntimeError, match="agent execution failed"):
await runner.run_json_stage(
stage_config=_stage_config(),
agent_name="intent-agent",
system_prompt="sys",
user_prompt="user",
toolkit=None,
)
@pytest.mark.asyncio
async def test_run_json_stage_report_merges_usage_metadata(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class _FakeLiteLLMService:
def run_completion_with_cost(self, **kwargs: object) -> object:
del kwargs
return SimpleNamespace(
response={
"model": "dashscope/qwen3.5-flash",
"choices": [
{
"message": {
"content": '{"assistant_text":"ok","response_metadata":{}}'
}
}
],
def _run_input() -> RunAgentInput:
return RunAgentInput.model_validate(
{
"threadId": "00000000-0000-0000-0000-000000000010",
"runId": "run-1",
"state": {},
"messages": [{"id": "u1", "role": "user", "content": "hello"}],
"tools": [
{
"name": "calendar.read",
"description": "read",
"parameters": {"type": "object"},
},
usage=SimpleNamespace(
prompt_tokens=9,
completion_tokens=4,
cost=0.006,
),
)
{
"name": "calendar-write",
"description": "write",
"parameters": {"type": "object"},
},
],
"context": [],
"forwardedProps": {},
}
)
def _router_output(*, ui_mode: UiMode) -> RouterAgentOutput:
return RouterAgentOutput.model_validate(
{
"normalized_task_input": {
"user_text": "hello",
"multimodal_summary": [],
},
"key_entities": [],
"constraints": [],
"task_typing": {"primary": "knowledge", "secondary": []},
"execution_mode": "onestep",
"result_typing": {"primary": "direct_answer", "secondary": []},
"ui": {
"ui_mode": ui_mode.value,
"ui_decision_reason": "need structure"
if ui_mode == UiMode.RICH
else "plain text",
},
}
)
@pytest.mark.asyncio
async def test_execute_uses_router_ui_mode_to_select_worker_output_model(
monkeypatch: pytest.MonkeyPatch,
) -> None:
runner = AgentScopeReActRunner()
pipeline = _FakePipeline()
worker_model_holder: dict[str, type[object]] = {}
class _CommitSession:
async def commit(self) -> None:
return None
monkeypatch.setattr(
"core.agentscope.runtime.react_runner.AsyncSessionLocal",
lambda: _FakeSessionCtx(_CommitSession()),
)
monkeypatch.setattr(
runner,
"_build_litellm_service",
lambda: _FakeLiteLLMService(),
"_build_toolkits",
lambda **kwargs: ("router-toolkit", "worker-toolkit"),
)
report_stage = RuntimeStageConfig(
stage="report",
model_code="qwen3.5-flash",
provider_name="dashscope",
llm_config=SystemAgentLLMConfig(
temperature=0.1,
max_tokens=128,
timeout_seconds=30,
),
)
payload = await runner.run_json_stage(
stage_config=report_stage,
agent_name="report-agent",
system_prompt="sys",
user_prompt="user",
toolkit=None,
)
metadata = payload["response_metadata"]
assert metadata["model"] == "dashscope/qwen3.5-flash"
assert metadata["inputTokens"] == 9
assert metadata["outputTokens"] == 4
assert metadata["cost"] == 0.006
assert isinstance(metadata["latencyMs"], int)
assert metadata["latencyMs"] >= 0
def test_merge_stage_response_metadata_estimates_cost_from_pricing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(
"core.agentscope.runtime.react_runner._estimate_cost_by_pricing",
lambda **kwargs: 0.0025,
)
payload = _merge_stage_response_metadata(
payload={"route": "DIRECT_RESPONSE", "response_metadata": {}},
stage_config=_stage_config(),
response=SimpleNamespace(
usage=SimpleNamespace(
prompt_tokens=12,
completion_tokens=8,
async def _load_system_agent_config(**kwargs):
return SystemAgentRuntimeConfig(
agent_type=kwargs["agent_type"],
model_code="qwen3.5-flash"
if kwargs["agent_type"] == AgentType.ROUTER
else "deepseek-chat",
llm_config=SystemAgentLLMConfig(
temperature=0.1, max_tokens=256, timeout_seconds=30
),
model="qwen3.5-flash",
),
latency_ms=50,
system_prompt="system",
user_prompt="user",
assistant_text='{"route":"DIRECT_RESPONSE"}',
)
monkeypatch.setattr(runner, "_load_system_agent_config", _load_system_agent_config)
async def _run_router_stage(**kwargs):
return StageExecutionResult(
message=Msg(name="router", content="", role="assistant"),
payload=_router_output(ui_mode=UiMode.RICH).model_dump(mode="json"),
response_metadata={
"model": "qwen3.5-flash",
"inputTokens": 12,
"outputTokens": 6,
"cost": 0.001,
"latencyMs": 50,
},
)
monkeypatch.setattr(runner, "_run_router_stage", _run_router_stage)
async def _persist_router_message(**kwargs) -> None:
assert kwargs["model_code"] == "qwen3.5-flash"
monkeypatch.setattr(runner, "_persist_router_message", _persist_router_message)
async def _run_worker_stage(**kwargs):
worker_model_holder["model"] = kwargs["worker_output_model"]
return StageExecutionResult(
message=Msg(name="worker", content="done", role="assistant"),
payload=WorkerAgentOutputRich.model_validate(
{
"status": "success",
"answer": "done",
"key_points": [],
"result_type": "direct_answer",
"suggested_actions": [],
"error": None,
"ui_hints": None,
}
).model_dump(mode="json", exclude_none=True),
response_metadata={
"model": "deepseek-chat",
"inputTokens": 8,
"outputTokens": 4,
"cost": 0.002,
"latencyMs": 40,
},
)
monkeypatch.setattr(runner, "_run_worker_stage", _run_worker_stage)
result = await runner.execute(
user_context=_user_context(),
context_messages=[],
pipeline=pipeline,
run_input=_run_input(),
)
metadata = payload["response_metadata"]
assert metadata["inputTokens"] == 12
assert metadata["outputTokens"] == 8
assert metadata["cost"] == 0.0025
assert worker_model_holder["model"].__name__ == "WorkerAgentOutputRich"
event_types = []
for item in pipeline.events:
event = item.get("event")
if isinstance(event, dict):
event_types.append(event.get("type"))
assert event_types == ["step.start", "step.finish", "step.start", "step.finish"]
assert result["router"]["ui"]["ui_mode"] == "rich"
assert result["worker"]["answer"] == "done"
def test_extract_tool_names_normalizes_client_tool_names() -> None:
runner = AgentScopeReActRunner()
names = runner._extract_tool_names(_run_input())
assert names == {"calendar_read", "calendar_write"}
@@ -6,15 +6,17 @@ from uuid import uuid4
import pytest
import core.agentscope.runtime.tasks as tasks_module
from schemas.user import UserContext, parse_profile_settings
def _run_input_payload() -> dict[str, Any]:
return {
"threadId": str(uuid4()),
"runId": "run-1",
"state": {},
"messages": [],
"tools": [],
"context": {},
"context": [],
"forwardedProps": {},
}
@@ -27,6 +29,16 @@ class _FakeSessionCtx:
del exc_type, exc, tb
async def _fake_user_context(**kwargs: object) -> UserContext:
del kwargs
return UserContext(
id=str(uuid4()),
username="alice",
email="alice@example.com",
settings=parse_profile_settings(None),
)
@pytest.mark.asyncio
async def test_run_agentscope_task_calls_runtime_run(
monkeypatch: pytest.MonkeyPatch,
@@ -56,6 +68,7 @@ async def test_run_agentscope_task_calls_runtime_run(
_fake_get_redis_client,
)
monkeypatch.setattr(tasks_module, "AsyncSessionLocal", lambda: _FakeSessionCtx())
monkeypatch.setattr(tasks_module, "_build_user_context", _fake_user_context)
monkeypatch.setattr(
tasks_module,
"_build_recent_context_messages",
@@ -85,9 +98,12 @@ async def test_run_agentscope_task_includes_recent_context_messages(
del kwargs
async def run(self, **kwargs: object) -> object:
command = kwargs.get("command")
if command is not None:
raw_messages = getattr(command, "messages", [])
raw_context_messages = kwargs.get("context_messages")
raw_run_input = kwargs.get("run_input")
if isinstance(raw_context_messages, list):
captured_messages.extend(raw_context_messages)
if raw_run_input is not None:
raw_messages = getattr(raw_run_input, "messages", [])
if isinstance(raw_messages, list):
captured_messages.extend(raw_messages)
return object()
@@ -110,6 +126,7 @@ async def test_run_agentscope_task_includes_recent_context_messages(
_fake_get_redis_client,
)
monkeypatch.setattr(tasks_module, "AsyncSessionLocal", lambda: _FakeSessionCtx())
monkeypatch.setattr(tasks_module, "_build_user_context", _fake_user_context)
monkeypatch.setattr(
tasks_module,
"_build_recent_context_messages",
@@ -133,7 +150,7 @@ async def test_run_agentscope_task_includes_recent_context_messages(
assert len(captured_messages) == 2
assert captured_messages[0]["id"] == "ctx-1"
assert captured_messages[1]["id"] == "u1"
assert getattr(captured_messages[1], "id", None) == "u1"
@pytest.mark.asyncio