refactor(backend): 重构 agentscope 运行时模块

This commit is contained in:
zl-q
2026-03-19 00:52:05 +08:00
parent 9219e8d047
commit f709023b6d
7 changed files with 261 additions and 255 deletions
+47 -156
View File
@@ -10,27 +10,20 @@ from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from agentscope.model import OpenAIChatModel
from core.agentscope.prompts.agent_prompt import build_worker_contract_prompt
from core.agentscope.prompts.system_prompt import build_system_prompt
from core.agentscope.runtime.json_react_agent import JsonReActAgent
from core.agentscope.runtime.model_tracking import TrackingChatModel
from core.agentscope.runtime.router_persistence import persist_router_message
from core.agentscope.runtime.stage_emitter import PipelineStageEmitter
from core.agentscope.runtime.tool_selection_registry import TOOL_SELECTION_REGISTRY
from core.agentscope.tools.toolkit import build_stage_toolkit
from core.agentscope.utils import (
finalize_json_response,
patch_agentscope_json_repair_compat,
)
from core.agentscope.utils import patch_agentscope_json_repair_compat
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.logging import get_logger
from models.llm import Llm
from models.llm_factory import LlmFactory
from models.system_agents import SystemAgents
from schemas.agent.runtime_models import (
RouterAgentOutput,
WorkerAgentOutputLite,
resolve_worker_output_model,
AgentOutput,
)
from schemas.agent.forwarded_props import (
ClientTimeContext,
@@ -45,8 +38,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
if TYPE_CHECKING:
from core.agentscope.runtime.orchestrator import PipelineLike
logger = get_logger("core.agentscope.runtime.runner")
@dataclass(frozen=True)
class SystemAgentRuntimeConfig:
@@ -76,110 +67,68 @@ class AgentScopeRunner:
context_messages: list[Msg],
pipeline: PipelineLike,
run_input: RunAgentInput,
system_agent_mode: str,
) -> dict[str, Any]:
owner_id = UUID(user_context.id)
runtime_client_time = self._resolve_runtime_client_time(run_input=run_input)
stage_agent_type = self._resolve_stage_agent_type(system_agent_mode)
async with AsyncSessionLocal() as session:
worker_toolkit = self._build_worker_toolkit(
session=session, owner_id=owner_id
)
router_config, worker_config = await self._load_stage_configs(
session=session
)
router_output = await self._execute_router_step(
stage_config = await self._load_stage_config(
session=session,
pipeline=pipeline,
run_input=run_input,
user_context=user_context,
context_messages=context_messages,
stage_config=router_config,
runtime_client_time=runtime_client_time,
agent_type=stage_agent_type,
)
stage_toolkit = self._build_stage_toolkit(
session=session,
owner_id=owner_id,
stage_config=stage_config,
)
worker_output = await self._execute_worker_step(
pipeline=pipeline,
run_input=run_input,
user_context=user_context,
router_output=router_output,
toolkit=worker_toolkit,
stage_config=worker_config,
context_messages=context_messages,
toolkit=stage_toolkit,
stage_config=stage_config,
runtime_client_time=runtime_client_time,
)
return {
"router": router_output.model_dump(mode="json", exclude_none=True),
"worker": worker_output.model_dump(mode="json", exclude_none=True),
}
def _build_worker_toolkit(
def _build_stage_toolkit(
self,
*,
session: AsyncSession,
owner_id: UUID,
stage_config: SystemAgentRuntimeConfig,
) -> Any:
enabled_tool_names = TOOL_SELECTION_REGISTRY.resolve(stage_config=stage_config)
return build_stage_toolkit(
agent_type=AgentType.WORKER,
agent_type=stage_config.agent_type,
session=session,
owner_id=owner_id,
enabled_tool_names=enabled_tool_names,
)
async def _load_stage_configs(
@staticmethod
def _resolve_stage_agent_type(system_agent_mode: str) -> AgentType:
mode = system_agent_mode.strip().lower() if system_agent_mode else "worker"
if mode == AgentType.MEMORY.value:
return AgentType.MEMORY
return AgentType.WORKER
async def _load_stage_config(
self,
*,
session: AsyncSession,
) -> tuple[SystemAgentRuntimeConfig, SystemAgentRuntimeConfig]:
router_config = await self._load_system_agent_config(
agent_type: AgentType,
) -> SystemAgentRuntimeConfig:
return await self._load_system_agent_config(
session=session,
agent_type=AgentType.ROUTER,
agent_type=agent_type,
)
worker_config = await self._load_system_agent_config(
session=session,
agent_type=AgentType.WORKER,
)
return router_config, worker_config
async def _execute_router_step(
self,
*,
session: AsyncSession,
pipeline: PipelineLike,
run_input: RunAgentInput,
user_context: UserContext,
context_messages: list[Msg],
stage_config: SystemAgentRuntimeConfig,
runtime_client_time: ClientTimeContext | None,
) -> RouterAgentOutput:
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
step_name="router",
event_type="STEP_STARTED",
)
router_result = await self._run_router_stage(
user_context=user_context,
context_messages=context_messages,
run_input=run_input,
stage_config=stage_config,
runtime_client_time=runtime_client_time,
)
router_output = RouterAgentOutput.model_validate(router_result.payload)
await persist_router_message(
session=session,
thread_id=run_input.thread_id,
run_id=run_input.run_id,
model_code=stage_config.model_code,
router_output=router_output,
response_metadata=router_result.response_metadata,
)
await session.commit()
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
step_name="router",
event_type="STEP_FINISHED",
)
return router_output
async def _execute_worker_step(
self,
@@ -187,21 +136,22 @@ class AgentScopeRunner:
pipeline: PipelineLike,
run_input: RunAgentInput,
user_context: UserContext,
router_output: RouterAgentOutput,
context_messages: list[Msg],
toolkit: Any,
stage_config: SystemAgentRuntimeConfig,
runtime_client_time: ClientTimeContext | None,
) -> WorkerAgentOutputLite:
worker_output_model = resolve_worker_output_model(router_output.ui.ui_mode)
) -> AgentOutput:
step_name = stage_config.agent_type.value
worker_output_model = AgentOutput
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
step_name="worker",
step_name=step_name,
event_type="STEP_STARTED",
)
worker_result = await self._run_worker_stage(
user_context=user_context,
router_output=router_output,
context_messages=context_messages,
toolkit=toolkit,
run_input=run_input,
stage_config=stage_config,
@@ -213,7 +163,7 @@ class AgentScopeRunner:
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
step_name="worker",
step_name=step_name,
event_type="STEP_FINISHED",
)
return worker_output
@@ -261,78 +211,33 @@ class AgentScopeRunner:
raise RuntimeError(f"provider api key missing for factory: {factory_name}")
return api_key
async def _run_router_stage(
self,
*,
user_context: UserContext,
context_messages: list[Msg],
run_input: RunAgentInput,
stage_config: SystemAgentRuntimeConfig,
runtime_client_time: ClientTimeContext | None,
) -> StageExecutionResult:
tracking_model = self._build_model(stage_config=stage_config)
system_prompt = build_system_prompt(
agent_type=AgentType.ROUTER,
user_context=user_context,
now_utc=datetime.now(timezone.utc),
runtime_client_time=runtime_client_time,
tools=None,
)
response, payload = await finalize_json_response(
model=tracking_model,
formatter=OpenAIChatFormatter(),
base_messages=[Msg("system", system_prompt, "system"), *context_messages],
output_model=RouterAgentOutput,
retries=0,
)
response_msg = Msg(
name="router",
role="assistant",
content=list(getattr(response, "content", [])),
metadata=payload,
)
logger.info(
"router_reply_received",
run_id=run_input.run_id,
thread_id=run_input.thread_id,
message_id=str(response_msg.id),
)
return StageExecutionResult(
message=response_msg,
payload=payload,
response_metadata=self._litellm_service.build_usage_metadata(
model=stage_config.model_code,
usage_summary=tracking_model.usage_summary(),
),
)
async def _run_worker_stage(
self,
*,
user_context: UserContext,
router_output: RouterAgentOutput,
context_messages: list[Msg],
toolkit: Any,
run_input: RunAgentInput,
stage_config: SystemAgentRuntimeConfig,
worker_output_model: type[WorkerAgentOutputLite],
worker_output_model: type[AgentOutput],
pipeline: PipelineLike,
runtime_client_time: ClientTimeContext | None,
) -> StageExecutionResult:
worker_input = self._build_worker_input_messages(router_output=router_output)
worker_input = list(context_messages)
tracking_model = self._build_model(stage_config=stage_config)
emitter = PipelineStageEmitter(
pipeline=pipeline,
session_id=run_input.thread_id,
run_id=run_input.run_id,
stage="worker",
stage=stage_config.agent_type.value,
emit_text_events=True,
emit_tool_events=True,
)
agent = self._build_agent(
agent_name="worker",
agent_name=stage_config.agent_type.value,
system_prompt=build_system_prompt(
agent_type=AgentType.WORKER,
agent_type=stage_config.agent_type,
llm_config=stage_config.llm_config,
user_context=user_context,
now_utc=datetime.now(timezone.utc),
runtime_client_time=runtime_client_time,
@@ -360,19 +265,6 @@ class AgentScopeRunner:
response_metadata=response_metadata,
)
def _build_worker_input_messages(
self,
*,
router_output: RouterAgentOutput,
) -> list[Msg]:
return [
Msg(
name="router",
role="user",
content=build_worker_contract_prompt(router_output=router_output),
)
]
def _build_model(
self, *, stage_config: SystemAgentRuntimeConfig
) -> TrackingChatModel:
@@ -381,8 +273,7 @@ class AgentScopeRunner:
"max_tokens": stage_config.llm_config.max_tokens,
"timeout": stage_config.llm_config.timeout_seconds,
}
if stage_config.agent_type == AgentType.ROUTER:
generate_kwargs["extra_body"] = {"enable_thinking": False}
generate_kwargs["extra_body"] = {"enable_thinking": False}
model = OpenAIChatModel(
model_name=stage_config.model_code,