refactor(backend): update API routes and service layer
- Update agent router/service/repository with new endpoints - Update auth routes with phone-based authentication - Update users service with new phone lookup - Update schedule_items with new schemas - Update message schemas with visibility support - Update settings with new automation scheduler config - Update CLI with new commands - Update tests to match new API contracts
This commit is contained in:
@@ -47,6 +47,7 @@ logger = get_logger("v1.agent.router")
|
||||
_LAST_EVENT_ID_RE = re.compile(r"^\d+-\d+$")
|
||||
_MAX_SSE_CONNECTIONS_PER_USER = 3
|
||||
_SSE_SLOT_TTL_SECONDS = 15 * 60
|
||||
_TERMINAL_RUN_EVENT_TYPES = {"RUN_FINISHED", "RUN_ERROR"}
|
||||
_MAX_TRANSCRIBE_AUDIO_BYTES = 10 * 1024 * 1024
|
||||
_TRANSCRIBE_READ_CHUNK_BYTES = 1024 * 1024
|
||||
_MULTIPART_OVERHEAD_BYTES = 64 * 1024
|
||||
@@ -72,8 +73,14 @@ async def _acquire_sse_slot(*, user_id: str) -> bool:
|
||||
count = await redis.incr(key)
|
||||
if count == 1:
|
||||
await redis.expire(key, _SSE_SLOT_TTL_SECONDS)
|
||||
else:
|
||||
ttl = await redis.ttl(key)
|
||||
if int(ttl) < 0:
|
||||
await redis.expire(key, _SSE_SLOT_TTL_SECONDS)
|
||||
if int(count) > _MAX_SSE_CONNECTIONS_PER_USER:
|
||||
await redis.decr(key)
|
||||
after_decr = await redis.decr(key)
|
||||
if int(after_decr) <= 0:
|
||||
await redis.delete(key)
|
||||
return False
|
||||
return True
|
||||
except Exception as exc: # noqa: BLE001
|
||||
@@ -82,7 +89,7 @@ async def _acquire_sse_slot(*, user_id: str) -> bool:
|
||||
user_id=user_id,
|
||||
reason=str(exc),
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
async def _release_sse_slot(*, user_id: str) -> None:
|
||||
@@ -92,10 +99,21 @@ async def _release_sse_slot(*, user_id: str) -> None:
|
||||
count = await redis.decr(key)
|
||||
if int(count) <= 0:
|
||||
await redis.delete(key)
|
||||
return None
|
||||
ttl = await redis.ttl(key)
|
||||
if int(ttl) < 0:
|
||||
await redis.expire(key, _SSE_SLOT_TTL_SECONDS)
|
||||
except Exception: # noqa: BLE001
|
||||
return None
|
||||
|
||||
|
||||
def _is_terminal_run_event(event: dict[str, object]) -> bool:
|
||||
raw_event_type = event.get("type")
|
||||
return (
|
||||
isinstance(raw_event_type, str) and raw_event_type in _TERMINAL_RUN_EVENT_TYPES
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/runs", response_model=TaskAcceptedResponse, status_code=status.HTTP_202_ACCEPTED
|
||||
)
|
||||
@@ -145,8 +163,13 @@ async def stream_events(
|
||||
async def _event_iter() -> AsyncIterator[str]:
|
||||
cursor = last_event_id
|
||||
idle_polls = 0
|
||||
terminal_event_reached = False
|
||||
try:
|
||||
while not await request.is_disconnected() and idle_polls < idle_limit:
|
||||
while (
|
||||
not terminal_event_reached
|
||||
and not await request.is_disconnected()
|
||||
and idle_polls < idle_limit
|
||||
):
|
||||
try:
|
||||
rows = await service.stream_events(
|
||||
thread_id=thread_id,
|
||||
@@ -181,6 +204,9 @@ async def stream_events(
|
||||
continue
|
||||
cursor = row_id
|
||||
yield to_sse_event(row_id, event)
|
||||
if _is_terminal_run_event(event):
|
||||
terminal_event_reached = True
|
||||
break
|
||||
|
||||
finally:
|
||||
await _release_sse_slot(user_id=str(current_user.id))
|
||||
|
||||
Reference in New Issue
Block a user