2026-03-11 17:16:11 +08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-03-12 00:18:45 +08:00
|
|
|
from datetime import datetime, timedelta, timezone
|
2026-03-11 17:16:11 +08:00
|
|
|
from typing import Any
|
|
|
|
|
from uuid import UUID
|
|
|
|
|
|
2026-03-12 00:18:45 +08:00
|
|
|
from sqlalchemy import select
|
|
|
|
|
|
2026-03-11 17:16:11 +08:00
|
|
|
from core.agentscope.events import (
|
|
|
|
|
AgentScopeAgUiCodec,
|
|
|
|
|
AgentScopeEventPipeline,
|
|
|
|
|
RedisStreamBus,
|
2026-03-11 20:51:56 +08:00
|
|
|
SqlAlchemyEventStore,
|
|
|
|
|
)
|
|
|
|
|
from core.agentscope.schemas.user_context import (
|
|
|
|
|
UserAgentContext,
|
|
|
|
|
parse_profile_settings,
|
2026-03-11 17:16:11 +08:00
|
|
|
)
|
|
|
|
|
from core.agentscope.schemas.agent_runtime import ResumeCommand, RunCommand
|
|
|
|
|
from core.config.settings import config
|
|
|
|
|
from core.db.session import AsyncSessionLocal
|
|
|
|
|
from core.logging import get_logger
|
|
|
|
|
from core.taskiq.app import bulk_broker, critical_broker, default_broker
|
2026-03-12 00:18:45 +08:00
|
|
|
from models.agent_chat_message import AgentChatMessage, AgentChatMessageRole
|
2026-03-11 17:16:11 +08:00
|
|
|
from services.base.redis import get_or_init_redis_client
|
|
|
|
|
|
|
|
|
|
logger = get_logger("core.agentscope.runtime.tasks")
|
|
|
|
|
|
2026-03-11 20:51:56 +08:00
|
|
|
AgentRouteRuntime: type[Any] | None = None
|
|
|
|
|
AgentScopeRuntimeOrchestrator: type[Any] | None = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_runtime_types() -> tuple[type[Any], type[Any]]:
|
|
|
|
|
global AgentRouteRuntime, AgentScopeRuntimeOrchestrator
|
|
|
|
|
if AgentRouteRuntime is None:
|
|
|
|
|
from core.agentscope.runtime.agent_route_runtime import (
|
|
|
|
|
AgentRouteRuntime as _ARR,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
AgentRouteRuntime = _ARR
|
|
|
|
|
if AgentScopeRuntimeOrchestrator is None:
|
|
|
|
|
from core.agentscope.runtime.orchestrator import (
|
|
|
|
|
AgentScopeRuntimeOrchestrator as _ASRO,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
AgentScopeRuntimeOrchestrator = _ASRO
|
|
|
|
|
return AgentRouteRuntime, AgentScopeRuntimeOrchestrator
|
|
|
|
|
|
2026-03-11 17:16:11 +08:00
|
|
|
|
|
|
|
|
def _build_user_context(*, owner_id: UUID, run_input: RunCommand) -> UserAgentContext:
|
|
|
|
|
forwarded = (
|
|
|
|
|
run_input.forwarded_props if isinstance(run_input.forwarded_props, dict) else {}
|
|
|
|
|
)
|
|
|
|
|
username = str(forwarded.get("username", "user")).strip() or "user"
|
|
|
|
|
bio_value = forwarded.get("bio")
|
|
|
|
|
bio = str(bio_value).strip() if isinstance(bio_value, str) else None
|
|
|
|
|
profile_settings = forwarded.get("profileSettings")
|
|
|
|
|
settings_raw = profile_settings if isinstance(profile_settings, dict) else None
|
|
|
|
|
return UserAgentContext(
|
|
|
|
|
user_id=owner_id,
|
|
|
|
|
username=username,
|
|
|
|
|
bio=bio,
|
|
|
|
|
settings=parse_profile_settings(settings_raw),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_user_token(
|
|
|
|
|
*, command: dict[str, Any], run_input: RunCommand
|
|
|
|
|
) -> str | None:
|
|
|
|
|
raw_token = command.get("user_token")
|
|
|
|
|
if isinstance(raw_token, str) and raw_token.strip():
|
|
|
|
|
return raw_token.strip()
|
|
|
|
|
forwarded = (
|
|
|
|
|
run_input.forwarded_props if isinstance(run_input.forwarded_props, dict) else {}
|
|
|
|
|
)
|
|
|
|
|
for key in ("accessToken", "userToken", "token"):
|
|
|
|
|
value = forwarded.get(key)
|
|
|
|
|
if isinstance(value, str) and value.strip():
|
|
|
|
|
return value.strip()
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
2026-03-12 00:18:45 +08:00
|
|
|
async def _build_recent_context_messages(
|
|
|
|
|
*,
|
|
|
|
|
session: Any,
|
|
|
|
|
thread_id: str,
|
|
|
|
|
current_run_id: str,
|
|
|
|
|
max_messages: int = 20,
|
|
|
|
|
) -> list[dict[str, Any]]:
|
|
|
|
|
try:
|
|
|
|
|
session_uuid = UUID(thread_id)
|
|
|
|
|
except ValueError:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
utc_now = datetime.now(timezone.utc)
|
|
|
|
|
start_of_today = utc_now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
|
start_of_yesterday = start_of_today - timedelta(days=1)
|
|
|
|
|
|
|
|
|
|
stmt = (
|
|
|
|
|
select(AgentChatMessage)
|
|
|
|
|
.where(AgentChatMessage.session_id == session_uuid)
|
|
|
|
|
.where(AgentChatMessage.deleted_at.is_(None))
|
|
|
|
|
.where(AgentChatMessage.created_at >= start_of_yesterday)
|
|
|
|
|
.order_by(AgentChatMessage.seq.asc())
|
|
|
|
|
)
|
|
|
|
|
rows = (await session.execute(stmt)).scalars().all()
|
|
|
|
|
|
|
|
|
|
normalized: list[dict[str, Any]] = []
|
|
|
|
|
for row in rows:
|
|
|
|
|
metadata = row.metadata_json if isinstance(row.metadata_json, dict) else {}
|
|
|
|
|
if metadata.get("run_id") == current_run_id:
|
|
|
|
|
continue
|
|
|
|
|
role = (
|
|
|
|
|
row.role.value
|
|
|
|
|
if isinstance(row.role, AgentChatMessageRole)
|
|
|
|
|
else str(row.role)
|
|
|
|
|
)
|
|
|
|
|
if role not in {"user", "assistant"}:
|
|
|
|
|
continue
|
|
|
|
|
normalized.append(
|
|
|
|
|
{
|
|
|
|
|
"id": str(row.id),
|
|
|
|
|
"role": role,
|
|
|
|
|
"content": row.content,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if len(normalized) <= max_messages:
|
|
|
|
|
return normalized
|
|
|
|
|
return normalized[-max_messages:]
|
|
|
|
|
|
|
|
|
|
|
2026-03-11 17:16:11 +08:00
|
|
|
async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]:
|
|
|
|
|
command_type = str(command.get("command", "run")).strip().lower()
|
|
|
|
|
raw_run_input = command.get("run_input")
|
|
|
|
|
raw_owner_id = command.get("owner_id")
|
|
|
|
|
|
|
|
|
|
if not isinstance(raw_run_input, dict):
|
|
|
|
|
raise ValueError("run_input is required")
|
|
|
|
|
if not isinstance(raw_owner_id, str) or not raw_owner_id.strip():
|
|
|
|
|
raise ValueError("owner_id is required")
|
|
|
|
|
|
|
|
|
|
owner_id = UUID(raw_owner_id)
|
2026-03-11 20:51:56 +08:00
|
|
|
if command_type not in {"run", "resume"}:
|
|
|
|
|
raise ValueError("invalid command type")
|
|
|
|
|
|
|
|
|
|
route_runtime_type, orchestrator_type = _load_runtime_types()
|
2026-03-11 17:16:11 +08:00
|
|
|
parsed_run_input = (
|
|
|
|
|
ResumeCommand.model_validate(raw_run_input)
|
|
|
|
|
if command_type == "resume"
|
|
|
|
|
else RunCommand.model_validate(raw_run_input)
|
|
|
|
|
)
|
|
|
|
|
user_context = _build_user_context(owner_id=owner_id, run_input=parsed_run_input)
|
|
|
|
|
user_token = _extract_user_token(command=command, run_input=parsed_run_input) or ""
|
|
|
|
|
|
|
|
|
|
redis_client = await get_or_init_redis_client()
|
|
|
|
|
bus = RedisStreamBus(
|
|
|
|
|
client=redis_client,
|
|
|
|
|
stream_prefix=config.agent_runtime.redis_stream_prefix,
|
|
|
|
|
read_count=config.agent_runtime.redis_stream_read_count,
|
|
|
|
|
block_ms=config.agent_runtime.redis_stream_block_ms,
|
|
|
|
|
)
|
|
|
|
|
pipeline = AgentScopeEventPipeline(
|
|
|
|
|
codec=AgentScopeAgUiCodec(),
|
2026-03-11 20:51:56 +08:00
|
|
|
store=SqlAlchemyEventStore(session_factory=AsyncSessionLocal),
|
2026-03-11 17:16:11 +08:00
|
|
|
bus=bus,
|
|
|
|
|
)
|
2026-03-11 20:51:56 +08:00
|
|
|
runtime = route_runtime_type(
|
|
|
|
|
orchestrator=orchestrator_type(),
|
2026-03-11 17:16:11 +08:00
|
|
|
pipeline=pipeline,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async with AsyncSessionLocal() as session:
|
2026-03-12 00:18:45 +08:00
|
|
|
if command_type == "run":
|
|
|
|
|
context_messages = await _build_recent_context_messages(
|
|
|
|
|
session=session,
|
|
|
|
|
thread_id=parsed_run_input.thread_id,
|
|
|
|
|
current_run_id=parsed_run_input.run_id,
|
|
|
|
|
)
|
|
|
|
|
parsed_run_input = parsed_run_input.model_copy(
|
|
|
|
|
update={
|
|
|
|
|
"messages": [
|
|
|
|
|
*context_messages,
|
|
|
|
|
*parsed_run_input.messages,
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
2026-03-11 17:16:11 +08:00
|
|
|
if command_type == "resume":
|
|
|
|
|
await runtime.resume(
|
2026-03-11 20:51:56 +08:00
|
|
|
command=parsed_run_input,
|
2026-03-11 17:16:11 +08:00
|
|
|
owner_id=owner_id,
|
|
|
|
|
user_token=user_token,
|
|
|
|
|
user_context=user_context,
|
|
|
|
|
session=session,
|
|
|
|
|
)
|
|
|
|
|
elif command_type == "run":
|
|
|
|
|
await runtime.run(
|
2026-03-11 20:51:56 +08:00
|
|
|
command=parsed_run_input,
|
2026-03-11 17:16:11 +08:00
|
|
|
owner_id=owner_id,
|
|
|
|
|
user_token=user_token,
|
|
|
|
|
user_context=user_context,
|
|
|
|
|
session=session,
|
|
|
|
|
)
|
|
|
|
|
logger.info(
|
|
|
|
|
"agentscope runtime task completed",
|
|
|
|
|
command_type=command_type,
|
|
|
|
|
thread_id=parsed_run_input.thread_id,
|
|
|
|
|
run_id=parsed_run_input.run_id,
|
|
|
|
|
)
|
|
|
|
|
return {
|
|
|
|
|
"thread_id": parsed_run_input.thread_id,
|
|
|
|
|
"run_id": parsed_run_input.run_id,
|
|
|
|
|
"status": "completed",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@default_broker.task(task_name="tasks.agentscope.run_command")
|
|
|
|
|
async def run_command_task(command: dict[str, Any]) -> dict[str, object]:
|
|
|
|
|
return await run_agentscope_task(command)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@critical_broker.task(task_name="tasks.agentscope.run_command.critical")
|
|
|
|
|
async def run_command_task_critical(command: dict[str, Any]) -> dict[str, object]:
|
|
|
|
|
return await run_agentscope_task(command)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@bulk_broker.task(task_name="tasks.agentscope.run_command.bulk")
|
|
|
|
|
async def run_command_task_bulk(command: dict[str, Any]) -> dict[str, object]:
|
|
|
|
|
return await run_agentscope_task(command)
|