from __future__ import annotations from typing import Any from uuid import UUID from core.agentscope.events import ( AgentScopeAgUiCodec, AgentScopeEventPipeline, RedisStreamBus, SqlAlchemyEventStore, ) from core.agentscope.schemas.user_context import ( UserAgentContext, parse_profile_settings, ) from core.agentscope.schemas.agent_runtime import ResumeCommand, RunCommand from core.config.settings import config from core.db.session import AsyncSessionLocal from core.logging import get_logger from core.taskiq.app import bulk_broker, critical_broker, default_broker from services.base.redis import get_or_init_redis_client logger = get_logger("core.agentscope.runtime.tasks") AgentRouteRuntime: type[Any] | None = None AgentScopeRuntimeOrchestrator: type[Any] | None = None def _load_runtime_types() -> tuple[type[Any], type[Any]]: global AgentRouteRuntime, AgentScopeRuntimeOrchestrator if AgentRouteRuntime is None: from core.agentscope.runtime.agent_route_runtime import ( AgentRouteRuntime as _ARR, ) AgentRouteRuntime = _ARR if AgentScopeRuntimeOrchestrator is None: from core.agentscope.runtime.orchestrator import ( AgentScopeRuntimeOrchestrator as _ASRO, ) AgentScopeRuntimeOrchestrator = _ASRO return AgentRouteRuntime, AgentScopeRuntimeOrchestrator def _build_user_context(*, owner_id: UUID, run_input: RunCommand) -> UserAgentContext: forwarded = ( run_input.forwarded_props if isinstance(run_input.forwarded_props, dict) else {} ) username = str(forwarded.get("username", "user")).strip() or "user" bio_value = forwarded.get("bio") bio = str(bio_value).strip() if isinstance(bio_value, str) else None profile_settings = forwarded.get("profileSettings") settings_raw = profile_settings if isinstance(profile_settings, dict) else None return UserAgentContext( user_id=owner_id, username=username, bio=bio, settings=parse_profile_settings(settings_raw), ) def _extract_user_token( *, command: dict[str, Any], run_input: RunCommand ) -> str | None: raw_token = command.get("user_token") if isinstance(raw_token, str) and raw_token.strip(): return raw_token.strip() forwarded = ( run_input.forwarded_props if isinstance(run_input.forwarded_props, dict) else {} ) for key in ("accessToken", "userToken", "token"): value = forwarded.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: command_type = str(command.get("command", "run")).strip().lower() raw_run_input = command.get("run_input") raw_owner_id = command.get("owner_id") if not isinstance(raw_run_input, dict): raise ValueError("run_input is required") if not isinstance(raw_owner_id, str) or not raw_owner_id.strip(): raise ValueError("owner_id is required") owner_id = UUID(raw_owner_id) if command_type not in {"run", "resume"}: raise ValueError("invalid command type") route_runtime_type, orchestrator_type = _load_runtime_types() parsed_run_input = ( ResumeCommand.model_validate(raw_run_input) if command_type == "resume" else RunCommand.model_validate(raw_run_input) ) user_context = _build_user_context(owner_id=owner_id, run_input=parsed_run_input) user_token = _extract_user_token(command=command, run_input=parsed_run_input) or "" redis_client = await get_or_init_redis_client() bus = RedisStreamBus( client=redis_client, stream_prefix=config.agent_runtime.redis_stream_prefix, read_count=config.agent_runtime.redis_stream_read_count, block_ms=config.agent_runtime.redis_stream_block_ms, ) pipeline = AgentScopeEventPipeline( codec=AgentScopeAgUiCodec(), store=SqlAlchemyEventStore(session_factory=AsyncSessionLocal), bus=bus, ) runtime = route_runtime_type( orchestrator=orchestrator_type(), pipeline=pipeline, ) async with AsyncSessionLocal() as session: if command_type == "resume": await runtime.resume( command=parsed_run_input, owner_id=owner_id, user_token=user_token, user_context=user_context, session=session, ) elif command_type == "run": await runtime.run( command=parsed_run_input, owner_id=owner_id, user_token=user_token, user_context=user_context, session=session, ) logger.info( "agentscope runtime task completed", command_type=command_type, thread_id=parsed_run_input.thread_id, run_id=parsed_run_input.run_id, ) return { "thread_id": parsed_run_input.thread_id, "run_id": parsed_run_input.run_id, "status": "completed", } @default_broker.task(task_name="tasks.agentscope.run_command") async def run_command_task(command: dict[str, Any]) -> dict[str, object]: return await run_agentscope_task(command) @critical_broker.task(task_name="tasks.agentscope.run_command.critical") async def run_command_task_critical(command: dict[str, Any]) -> dict[str, object]: return await run_agentscope_task(command) @bulk_broker.task(task_name="tasks.agentscope.run_command.bulk") async def run_command_task_bulk(command: dict[str, Any]) -> dict[str, object]: return await run_agentscope_task(command)