2026-03-05 15:34:37 +08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-03-06 17:28:17 +08:00
|
|
|
import asyncio
|
|
|
|
|
from typing import Any
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
from fastapi import Depends
|
2026-03-06 17:28:17 +08:00
|
|
|
from redis.asyncio import Redis
|
2026-03-05 15:34:37 +08:00
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
2026-03-11 17:16:11 +08:00
|
|
|
from core.agentscope.events import RedisStreamBus
|
|
|
|
|
from core.agentscope.runtime.tasks import (
|
2026-03-06 17:28:17 +08:00
|
|
|
run_command_task,
|
|
|
|
|
run_command_task_bulk,
|
|
|
|
|
run_command_task_critical,
|
|
|
|
|
)
|
2026-03-11 20:51:56 +08:00
|
|
|
from core.agentscope.tools.tool_result_storage import (
|
2026-03-11 17:16:11 +08:00
|
|
|
create_tool_result_storage,
|
|
|
|
|
)
|
2026-03-05 15:34:37 +08:00
|
|
|
from core.config.settings import config
|
|
|
|
|
from core.db import get_db
|
2026-03-06 17:28:17 +08:00
|
|
|
from services.base.redis import get_or_init_redis_client
|
2026-03-05 15:34:37 +08:00
|
|
|
from v1.agent.repository import AgentRepository
|
2026-03-11 21:06:02 +08:00
|
|
|
from v1.agent.attachment_storage import create_attachment_storage
|
2026-03-05 15:34:37 +08:00
|
|
|
from v1.agent.service import AgentService
|
|
|
|
|
|
2026-03-06 17:28:17 +08:00
|
|
|
DEDUP_WAIT_RETRIES = 20
|
|
|
|
|
DEDUP_WAIT_SECONDS = 0.05
|
|
|
|
|
DEDUP_LOCK_SECONDS = 300
|
|
|
|
|
DEDUP_INFLIGHT_MARKER = "__inflight__"
|
2026-03-05 15:34:37 +08:00
|
|
|
|
2026-03-06 17:28:17 +08:00
|
|
|
|
|
|
|
|
class TaskiqQueueClient:
|
2026-03-05 15:34:37 +08:00
|
|
|
def __init__(self) -> None:
|
2026-03-06 17:28:17 +08:00
|
|
|
self._redis: Redis | None = None
|
|
|
|
|
|
|
|
|
|
async def _get_redis(self) -> Redis:
|
|
|
|
|
if self._redis is None:
|
|
|
|
|
self._redis = await get_or_init_redis_client()
|
|
|
|
|
return self._redis
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _select_queue_task(command: dict[str, object]) -> Any:
|
|
|
|
|
queue = str(command.get("queue", "default")).strip().lower()
|
|
|
|
|
if queue == "critical":
|
|
|
|
|
return run_command_task_critical
|
|
|
|
|
if queue == "bulk":
|
|
|
|
|
return run_command_task_bulk
|
|
|
|
|
return run_command_task
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
async def enqueue(
|
|
|
|
|
self, *, command: dict[str, object], dedup_key: str | None
|
|
|
|
|
) -> str:
|
2026-03-06 17:28:17 +08:00
|
|
|
redis_client = await self._get_redis()
|
2026-03-05 15:34:37 +08:00
|
|
|
redis_key = None
|
|
|
|
|
if dedup_key:
|
|
|
|
|
redis_key = f"agent:dedup:{dedup_key}"
|
2026-03-06 17:28:17 +08:00
|
|
|
locked = await redis_client.set(
|
|
|
|
|
redis_key,
|
|
|
|
|
DEDUP_INFLIGHT_MARKER,
|
|
|
|
|
nx=True,
|
|
|
|
|
ex=DEDUP_LOCK_SECONDS,
|
|
|
|
|
)
|
2026-03-05 15:34:37 +08:00
|
|
|
if not locked:
|
2026-03-06 17:28:17 +08:00
|
|
|
for _ in range(DEDUP_WAIT_RETRIES):
|
|
|
|
|
existing = await redis_client.get(redis_key)
|
|
|
|
|
if existing and existing != DEDUP_INFLIGHT_MARKER:
|
|
|
|
|
return existing
|
|
|
|
|
await asyncio.sleep(DEDUP_WAIT_SECONDS)
|
|
|
|
|
raise RuntimeError("duplicate request is still in progress")
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
payload = dict(command)
|
2026-03-06 17:28:17 +08:00
|
|
|
queue_task = self._select_queue_task(payload)
|
|
|
|
|
try:
|
|
|
|
|
result = await queue_task.kiq(payload)
|
|
|
|
|
task_id = str(result.task_id)
|
|
|
|
|
if redis_key is not None:
|
|
|
|
|
await redis_client.set(redis_key, task_id, ex=DEDUP_LOCK_SECONDS)
|
|
|
|
|
return task_id
|
|
|
|
|
except Exception:
|
|
|
|
|
if redis_key is not None:
|
|
|
|
|
await redis_client.delete(redis_key)
|
|
|
|
|
raise
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class RedisEventStream:
|
|
|
|
|
def __init__(self) -> None:
|
2026-03-11 17:16:11 +08:00
|
|
|
self._bus: RedisStreamBus | None = None
|
2026-03-06 17:28:17 +08:00
|
|
|
|
2026-03-11 17:16:11 +08:00
|
|
|
async def _get_bus(self) -> RedisStreamBus:
|
|
|
|
|
if self._bus is None:
|
2026-03-06 17:28:17 +08:00
|
|
|
client = await get_or_init_redis_client()
|
2026-03-11 17:16:11 +08:00
|
|
|
self._bus = RedisStreamBus(
|
2026-03-06 17:28:17 +08:00
|
|
|
client=client,
|
|
|
|
|
stream_prefix=config.agent_runtime.redis_stream_prefix,
|
|
|
|
|
read_count=config.agent_runtime.redis_stream_read_count,
|
|
|
|
|
block_ms=config.agent_runtime.redis_stream_block_ms,
|
|
|
|
|
)
|
2026-03-11 17:16:11 +08:00
|
|
|
return self._bus
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
async def read(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
session_id: str,
|
|
|
|
|
last_event_id: str | None,
|
|
|
|
|
) -> list[dict[str, Any]]:
|
2026-03-11 17:16:11 +08:00
|
|
|
bus = await self._get_bus()
|
|
|
|
|
rows = await bus.read(session_id=session_id, last_event_id=last_event_id)
|
|
|
|
|
return [{**row, "cursor": row.get("id")} for row in rows]
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_agent_service(session: AsyncSession = Depends(get_db)) -> AgentService:
|
2026-03-08 17:07:09 +08:00
|
|
|
tool_result_storage = create_tool_result_storage()
|
2026-03-11 21:06:02 +08:00
|
|
|
attachment_storage = create_attachment_storage()
|
2026-03-05 15:34:37 +08:00
|
|
|
return AgentService(
|
2026-03-08 17:07:09 +08:00
|
|
|
repository=AgentRepository(session, tool_result_storage=tool_result_storage),
|
2026-03-06 17:28:17 +08:00
|
|
|
queue=TaskiqQueueClient(),
|
2026-03-05 15:34:37 +08:00
|
|
|
stream=RedisEventStream(),
|
2026-03-11 21:06:02 +08:00
|
|
|
attachment_storage=attachment_storage,
|
2026-03-05 15:34:37 +08:00
|
|
|
)
|