Files
social-app/backend/src/core/agentscope/runtime/tasks.py
T

139 lines
4.9 KiB
Python
Raw Normal View History

from __future__ import annotations
from typing import Any
from uuid import UUID
from core.agent.domain.user_context import UserAgentContext, parse_profile_settings
from core.agentscope.events import (
AgentScopeAgUiCodec,
AgentScopeEventPipeline,
NullEventStore,
RedisStreamBus,
)
from core.agentscope.runtime import AgentRouteRuntime, AgentScopeRuntimeOrchestrator
from core.agentscope.schemas.agent_runtime import ResumeCommand, RunCommand
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.logging import get_logger
from core.taskiq.app import bulk_broker, critical_broker, default_broker
from services.base.redis import get_or_init_redis_client
logger = get_logger("core.agentscope.runtime.tasks")
def _build_user_context(*, owner_id: UUID, run_input: RunCommand) -> UserAgentContext:
forwarded = (
run_input.forwarded_props if isinstance(run_input.forwarded_props, dict) else {}
)
username = str(forwarded.get("username", "user")).strip() or "user"
bio_value = forwarded.get("bio")
bio = str(bio_value).strip() if isinstance(bio_value, str) else None
profile_settings = forwarded.get("profileSettings")
settings_raw = profile_settings if isinstance(profile_settings, dict) else None
return UserAgentContext(
user_id=owner_id,
username=username,
bio=bio,
settings=parse_profile_settings(settings_raw),
)
def _extract_user_token(
*, command: dict[str, Any], run_input: RunCommand
) -> str | None:
raw_token = command.get("user_token")
if isinstance(raw_token, str) and raw_token.strip():
return raw_token.strip()
forwarded = (
run_input.forwarded_props if isinstance(run_input.forwarded_props, dict) else {}
)
for key in ("accessToken", "userToken", "token"):
value = forwarded.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return None
async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]:
command_type = str(command.get("command", "run")).strip().lower()
raw_run_input = command.get("run_input")
raw_owner_id = command.get("owner_id")
if not isinstance(raw_run_input, dict):
raise ValueError("run_input is required")
if not isinstance(raw_owner_id, str) or not raw_owner_id.strip():
raise ValueError("owner_id is required")
owner_id = UUID(raw_owner_id)
parsed_run_input = (
ResumeCommand.model_validate(raw_run_input)
if command_type == "resume"
else RunCommand.model_validate(raw_run_input)
)
user_context = _build_user_context(owner_id=owner_id, run_input=parsed_run_input)
user_token = _extract_user_token(command=command, run_input=parsed_run_input) or ""
redis_client = await get_or_init_redis_client()
bus = RedisStreamBus(
client=redis_client,
stream_prefix=config.agent_runtime.redis_stream_prefix,
read_count=config.agent_runtime.redis_stream_read_count,
block_ms=config.agent_runtime.redis_stream_block_ms,
)
pipeline = AgentScopeEventPipeline(
codec=AgentScopeAgUiCodec(),
store=NullEventStore(),
bus=bus,
)
runtime = AgentRouteRuntime(
orchestrator=AgentScopeRuntimeOrchestrator(),
pipeline=pipeline,
)
async with AsyncSessionLocal() as session:
if command_type == "resume":
await runtime.resume(
command=ResumeCommand.model_validate(raw_run_input),
owner_id=owner_id,
user_token=user_token,
user_context=user_context,
session=session,
)
elif command_type == "run":
await runtime.run(
command=RunCommand.model_validate(raw_run_input),
owner_id=owner_id,
user_token=user_token,
user_context=user_context,
session=session,
)
else:
raise ValueError("invalid command type")
logger.info(
"agentscope runtime task completed",
command_type=command_type,
thread_id=parsed_run_input.thread_id,
run_id=parsed_run_input.run_id,
)
return {
"thread_id": parsed_run_input.thread_id,
"run_id": parsed_run_input.run_id,
"status": "completed",
}
@default_broker.task(task_name="tasks.agentscope.run_command")
async def run_command_task(command: dict[str, Any]) -> dict[str, object]:
return await run_agentscope_task(command)
@critical_broker.task(task_name="tasks.agentscope.run_command.critical")
async def run_command_task_critical(command: dict[str, Any]) -> dict[str, object]:
return await run_agentscope_task(command)
@bulk_broker.task(task_name="tasks.agentscope.run_command.bulk")
async def run_command_task_bulk(command: dict[str, Any]) -> dict[str, object]:
return await run_agentscope_task(command)