Files
social-app/backend/src/core/agentscope/runtime/tasks.py
T

217 lines
7.3 KiB
Python
Raw Normal View History

from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID
from ag_ui.core import RunAgentInput
from sqlalchemy import select
from core.agentscope.events import (
AgentScopeAgUiCodec,
AgentScopeEventPipeline,
RedisStreamBus,
SqlAlchemyEventStore,
)
from core.agentscope.schemas.agui_input import (
extract_latest_tool_result,
parse_run_input,
)
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.logging import get_logger
from core.taskiq.app import bulk_broker, critical_broker, default_broker
from core.agentscope.tools.tool_result_storage import create_tool_result_storage
from models.agent_chat_message import AgentChatMessage, AgentChatMessageRole
from schemas.user import UserContext, parse_profile_settings
from services.base.redis import get_or_init_redis_client
logger = get_logger("core.agentscope.runtime.tasks")
AgentScopeRuntimeOrchestrator: type[Any] | None = None
def _load_runtime_type() -> type[Any]:
global AgentScopeRuntimeOrchestrator
if AgentScopeRuntimeOrchestrator is None:
from core.agentscope.runtime.orchestrator import (
AgentScopeRuntimeOrchestrator as _ASRO,
)
AgentScopeRuntimeOrchestrator = _ASRO
runtime_type = AgentScopeRuntimeOrchestrator
if runtime_type is None:
raise RuntimeError("failed to load AgentScopeRuntimeOrchestrator")
return runtime_type
def _build_user_context(*, owner_id: UUID, run_input: RunAgentInput) -> UserContext:
forwarded = (
run_input.forwarded_props if isinstance(run_input.forwarded_props, dict) else {}
)
username = str(forwarded.get("username", "user")).strip() or "user"
bio_value = forwarded.get("bio")
bio = str(bio_value).strip() if isinstance(bio_value, str) else None
email_value = forwarded.get("email")
email = str(email_value).strip() if isinstance(email_value, str) else None
avatar_value = forwarded.get("avatarUrl")
avatar_url = str(avatar_value).strip() if isinstance(avatar_value, str) else None
profile_settings = forwarded.get("profileSettings")
settings_raw = profile_settings if isinstance(profile_settings, dict) else None
return UserContext(
id=str(owner_id),
username=username,
email=email,
avatar_url=avatar_url,
bio=bio,
settings=parse_profile_settings(settings_raw),
)
async def _build_recent_context_messages(
*,
session: Any,
thread_id: str,
current_run_id: str,
max_messages: int = 20,
) -> list[dict[str, Any]]:
try:
session_uuid = UUID(thread_id)
except ValueError:
return []
utc_now = datetime.now(timezone.utc)
start_of_today = utc_now.replace(hour=0, minute=0, second=0, microsecond=0)
start_of_yesterday = start_of_today - timedelta(days=1)
stmt = (
select(AgentChatMessage)
.where(AgentChatMessage.session_id == session_uuid)
.where(AgentChatMessage.deleted_at.is_(None))
.where(AgentChatMessage.created_at >= start_of_yesterday)
.order_by(AgentChatMessage.seq.asc())
)
rows = (await session.execute(stmt)).scalars().all()
normalized: list[dict[str, Any]] = []
for row in rows:
metadata = row.metadata_json if isinstance(row.metadata_json, dict) else {}
if metadata.get("run_id") == current_run_id:
continue
role = (
row.role.value
if isinstance(row.role, AgentChatMessageRole)
else str(row.role)
)
if role not in {"user", "assistant"}:
continue
normalized.append(
{
"id": str(row.id),
"role": role,
"content": row.content,
}
)
if len(normalized) <= max_messages:
return normalized
return normalized[-max_messages:]
async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]:
command_type = str(command.get("command", "run")).strip().lower()
raw_run_input = command.get("run_input")
raw_owner_id = command.get("owner_id")
if not isinstance(raw_run_input, dict):
raise ValueError("run_input is required")
if not isinstance(raw_owner_id, str) or not raw_owner_id.strip():
raise ValueError("owner_id is required")
owner_id = UUID(raw_owner_id)
if command_type not in {"run", "resume"}:
raise ValueError("invalid command type")
orchestrator_type = _load_runtime_type()
parsed_run_input = parse_run_input(raw_run_input)
if command_type == "resume":
extract_latest_tool_result(parsed_run_input)
user_context = _build_user_context(owner_id=owner_id, run_input=parsed_run_input)
redis_client = await get_or_init_redis_client()
bus = RedisStreamBus(
client=redis_client,
stream_prefix=config.agent_runtime.redis_stream_prefix,
read_count=config.agent_runtime.redis_stream_read_count,
block_ms=config.agent_runtime.redis_stream_block_ms,
)
pipeline = AgentScopeEventPipeline(
codec=AgentScopeAgUiCodec(),
store=SqlAlchemyEventStore(
session_factory=AsyncSessionLocal,
tool_result_storage=create_tool_result_storage(),
tool_result_bucket=config.storage.bucket,
),
bus=bus,
)
runtime = orchestrator_type(
pipeline=pipeline,
)
async with AsyncSessionLocal() as session:
context_messages = await _build_recent_context_messages(
session=session,
thread_id=parsed_run_input.thread_id,
current_run_id=parsed_run_input.run_id,
)
if context_messages:
parsed_run_input = parsed_run_input.model_copy(
update={
"messages": [
*context_messages,
*parsed_run_input.messages,
]
}
)
if command_type == "resume":
await runtime.resume(
command=parsed_run_input,
owner_id=owner_id,
user_context=user_context,
session=session,
)
elif command_type == "run":
await runtime.run(
command=parsed_run_input,
owner_id=owner_id,
user_context=user_context,
session=session,
)
logger.info(
"agentscope runtime task completed",
command_type=command_type,
thread_id=parsed_run_input.thread_id,
run_id=parsed_run_input.run_id,
)
return {
"thread_id": parsed_run_input.thread_id,
"run_id": parsed_run_input.run_id,
"status": "completed",
}
@default_broker.task(task_name="tasks.agentscope.run_command")
async def run_command_task(command: dict[str, Any]) -> dict[str, object]:
return await run_agentscope_task(command)
@critical_broker.task(task_name="tasks.agentscope.run_command.critical")
async def run_command_task_critical(command: dict[str, Any]) -> dict[str, object]:
return await run_agentscope_task(command)
@bulk_broker.task(task_name="tasks.agentscope.run_command.bulk")
async def run_command_task_bulk(command: dict[str, Any]) -> dict[str, object]:
return await run_agentscope_task(command)