2026-03-05 15:34:37 +08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from dataclasses import dataclass
|
2026-03-07 17:30:20 +08:00
|
|
|
from datetime import date
|
2026-03-05 15:34:37 +08:00
|
|
|
from typing import Protocol
|
|
|
|
|
|
2026-03-07 17:30:20 +08:00
|
|
|
from ag_ui.core import StateSnapshotEvent
|
|
|
|
|
from ag_ui.core import RunAgentInput
|
2026-03-05 15:34:37 +08:00
|
|
|
from fastapi import HTTPException
|
2026-03-07 17:30:20 +08:00
|
|
|
from sqlalchemy.exc import IntegrityError
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
from core.auth.models import CurrentUser
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class TaskAccepted:
|
|
|
|
|
task_id: str
|
2026-03-07 17:30:20 +08:00
|
|
|
thread_id: str
|
|
|
|
|
run_id: str
|
2026-03-05 15:34:37 +08:00
|
|
|
created: bool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AgentRepositoryLike(Protocol):
|
|
|
|
|
async def get_session_owner(self, *, session_id: str) -> str: ...
|
|
|
|
|
|
2026-03-07 17:30:20 +08:00
|
|
|
async def create_session_for_user(
|
|
|
|
|
self, *, user_id: str, session_id: str | None = None
|
|
|
|
|
) -> str: ...
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
async def commit(self) -> None: ...
|
|
|
|
|
|
|
|
|
|
async def rollback(self) -> None: ...
|
|
|
|
|
|
2026-03-07 17:30:20 +08:00
|
|
|
async def get_history_day(
|
|
|
|
|
self, *, session_id: str, before: date | None
|
|
|
|
|
) -> dict[str, object] | None: ...
|
|
|
|
|
|
|
|
|
|
async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: ...
|
|
|
|
|
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
class QueueClientLike(Protocol):
|
|
|
|
|
async def enqueue(
|
|
|
|
|
self, *, command: dict[str, object], dedup_key: str | None
|
|
|
|
|
) -> str: ...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EventStreamLike(Protocol):
|
|
|
|
|
async def read(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
session_id: str,
|
|
|
|
|
last_event_id: str | None,
|
|
|
|
|
) -> list[dict[str, object]]: ...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_session_owner(*, owner_id: str, current_user: CurrentUser) -> None:
|
|
|
|
|
if owner_id != str(current_user.id):
|
|
|
|
|
raise HTTPException(status_code=403, detail="Forbidden")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AgentService:
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
repository: AgentRepositoryLike,
|
|
|
|
|
queue: QueueClientLike,
|
|
|
|
|
stream: EventStreamLike,
|
|
|
|
|
) -> None:
|
|
|
|
|
self._repository = repository
|
|
|
|
|
self._queue = queue
|
|
|
|
|
self._stream = stream
|
|
|
|
|
|
|
|
|
|
async def enqueue_run(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
2026-03-07 17:30:20 +08:00
|
|
|
run_input: RunAgentInput,
|
2026-03-05 15:34:37 +08:00
|
|
|
current_user: CurrentUser,
|
|
|
|
|
) -> TaskAccepted:
|
|
|
|
|
created = False
|
2026-03-07 17:30:20 +08:00
|
|
|
thread_id = run_input.thread_id
|
|
|
|
|
run_id = run_input.run_id
|
|
|
|
|
try:
|
|
|
|
|
owner = await self._repository.get_session_owner(session_id=thread_id)
|
|
|
|
|
except HTTPException as exc:
|
|
|
|
|
if exc.status_code != 404:
|
|
|
|
|
raise
|
|
|
|
|
try:
|
|
|
|
|
await self._repository.create_session_for_user(
|
|
|
|
|
user_id=str(current_user.id),
|
|
|
|
|
session_id=thread_id,
|
|
|
|
|
)
|
|
|
|
|
await self._repository.commit()
|
|
|
|
|
created = True
|
|
|
|
|
except IntegrityError:
|
|
|
|
|
await self._repository.rollback()
|
|
|
|
|
owner = await self._repository.get_session_owner(session_id=thread_id)
|
|
|
|
|
ensure_session_owner(owner_id=owner, current_user=current_user)
|
2026-03-05 15:34:37 +08:00
|
|
|
else:
|
|
|
|
|
ensure_session_owner(owner_id=owner, current_user=current_user)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
task_id = await self._queue.enqueue(
|
|
|
|
|
command={
|
|
|
|
|
"command": "run",
|
2026-03-07 17:30:20 +08:00
|
|
|
"run_input": run_input.model_dump(mode="json", by_alias=True),
|
2026-03-05 15:34:37 +08:00
|
|
|
},
|
|
|
|
|
dedup_key=None,
|
|
|
|
|
)
|
|
|
|
|
except Exception: # noqa: BLE001
|
|
|
|
|
raise
|
|
|
|
|
return TaskAccepted(
|
2026-03-07 17:30:20 +08:00
|
|
|
task_id=task_id,
|
|
|
|
|
thread_id=thread_id,
|
|
|
|
|
run_id=run_id,
|
|
|
|
|
created=created,
|
2026-03-05 15:34:37 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def enqueue_resume(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
2026-03-07 17:30:20 +08:00
|
|
|
thread_id: str,
|
|
|
|
|
run_input: RunAgentInput,
|
2026-03-05 15:34:37 +08:00
|
|
|
current_user: CurrentUser,
|
|
|
|
|
) -> TaskAccepted:
|
2026-03-07 17:30:20 +08:00
|
|
|
owner = await self._repository.get_session_owner(session_id=thread_id)
|
2026-03-05 15:34:37 +08:00
|
|
|
ensure_session_owner(owner_id=owner, current_user=current_user)
|
|
|
|
|
|
2026-03-07 17:30:20 +08:00
|
|
|
dedup_key = f"resume:{thread_id}:{run_input.run_id}"
|
2026-03-05 15:34:37 +08:00
|
|
|
task_id = await self._queue.enqueue(
|
|
|
|
|
command={
|
|
|
|
|
"command": "resume",
|
2026-03-07 17:30:20 +08:00
|
|
|
"run_input": run_input.model_dump(mode="json", by_alias=True),
|
2026-03-05 15:34:37 +08:00
|
|
|
},
|
|
|
|
|
dedup_key=dedup_key,
|
|
|
|
|
)
|
|
|
|
|
|
2026-03-07 17:30:20 +08:00
|
|
|
return TaskAccepted(
|
|
|
|
|
task_id=task_id,
|
|
|
|
|
thread_id=thread_id,
|
|
|
|
|
run_id=run_input.run_id,
|
|
|
|
|
created=False,
|
|
|
|
|
)
|
2026-03-05 15:34:37 +08:00
|
|
|
|
|
|
|
|
async def stream_events(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
2026-03-07 17:30:20 +08:00
|
|
|
thread_id: str,
|
2026-03-05 15:34:37 +08:00
|
|
|
last_event_id: str | None,
|
|
|
|
|
current_user: CurrentUser,
|
|
|
|
|
) -> list[dict[str, object]]:
|
2026-03-07 17:30:20 +08:00
|
|
|
owner = await self._repository.get_session_owner(session_id=thread_id)
|
2026-03-05 15:34:37 +08:00
|
|
|
ensure_session_owner(owner_id=owner, current_user=current_user)
|
|
|
|
|
return await self._stream.read(
|
2026-03-07 17:30:20 +08:00
|
|
|
session_id=thread_id,
|
2026-03-05 15:34:37 +08:00
|
|
|
last_event_id=last_event_id,
|
|
|
|
|
)
|
2026-03-07 17:30:20 +08:00
|
|
|
|
|
|
|
|
async def get_history_snapshot(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
thread_id: str,
|
|
|
|
|
before: date | None,
|
|
|
|
|
current_user: CurrentUser,
|
|
|
|
|
) -> dict[str, object]:
|
|
|
|
|
owner = await self._repository.get_session_owner(session_id=thread_id)
|
|
|
|
|
ensure_session_owner(owner_id=owner, current_user=current_user)
|
|
|
|
|
day_payload = await self._repository.get_history_day(
|
|
|
|
|
session_id=thread_id,
|
|
|
|
|
before=before,
|
|
|
|
|
)
|
|
|
|
|
snapshot = {
|
|
|
|
|
"scope": "history_day",
|
|
|
|
|
"threadId": thread_id,
|
|
|
|
|
"day": day_payload["day"] if day_payload else None,
|
|
|
|
|
"hasMore": day_payload["hasMore"] if day_payload else False,
|
|
|
|
|
"messages": day_payload["messages"] if day_payload else [],
|
|
|
|
|
}
|
|
|
|
|
event = StateSnapshotEvent(snapshot=snapshot).model_dump(
|
|
|
|
|
mode="json",
|
|
|
|
|
by_alias=True,
|
|
|
|
|
exclude_none=True,
|
|
|
|
|
)
|
|
|
|
|
event["threadId"] = thread_id
|
|
|
|
|
return event
|
|
|
|
|
|
|
|
|
|
async def get_user_history_snapshot(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
current_user: CurrentUser,
|
|
|
|
|
thread_id: str | None,
|
|
|
|
|
before: date | None,
|
|
|
|
|
) -> dict[str, object]:
|
|
|
|
|
target_thread_id = thread_id
|
|
|
|
|
if target_thread_id is None:
|
|
|
|
|
target_thread_id = await self._repository.get_latest_session_id_for_user(
|
|
|
|
|
user_id=str(current_user.id)
|
|
|
|
|
)
|
|
|
|
|
if target_thread_id is None:
|
|
|
|
|
return StateSnapshotEvent(
|
|
|
|
|
snapshot={
|
|
|
|
|
"scope": "history_day",
|
|
|
|
|
"threadId": None,
|
|
|
|
|
"day": None,
|
|
|
|
|
"hasMore": False,
|
|
|
|
|
"messages": [],
|
|
|
|
|
}
|
|
|
|
|
).model_dump(mode="json", by_alias=True, exclude_none=True)
|
|
|
|
|
return await self.get_history_snapshot(
|
|
|
|
|
thread_id=target_thread_id,
|
|
|
|
|
before=before,
|
|
|
|
|
current_user=current_user,
|
|
|
|
|
)
|