from __future__ import annotations import os from uuid import UUID, uuid4 import httpx import pytest from sqlalchemy import select from core.db.session import AsyncSessionLocal from models.agent_chat_message import AgentChatMessage from models.agent_chat_session import AgentChatSession BASE_URL = os.getenv("AGENT_LIVE_BASE_URL", "http://localhost:5775") async def _live_access_token(client: httpx.AsyncClient) -> str: email = os.getenv("AGENT_LIVE_EMAIL") password = os.getenv("AGENT_LIVE_PASSWORD") if not email or not password: pytest.fail( "AGENT_LIVE_INTEGRATION=1 requires AGENT_LIVE_EMAIL and AGENT_LIVE_PASSWORD" ) response = await client.post( f"{BASE_URL}/api/v1/auth/sessions", json={"email": email, "password": password}, ) response_text = response.text.strip().replace("\n", " ") truncated_text = response_text[:200] if len(response_text) > 200: truncated_text += "..." assert response.status_code == 200, ( f"live login failed: status={response.status_code}, response={truncated_text!r}" ) token = response.json().get("access_token") assert isinstance(token, str) and token return token @pytest.mark.asyncio @pytest.mark.live async def test_agent_sse_closed_loop_live() -> None: if os.getenv("AGENT_LIVE_INTEGRATION") != "1": pytest.skip("set AGENT_LIVE_INTEGRATION=1 to run live integration test") async with httpx.AsyncClient(timeout=30.0) as client: token = await _live_access_token(client) headers = {"Authorization": f"Bearer {token}"} run_resp = await client.post( f"{BASE_URL}/api/v1/agent/runs", headers=headers, json={ "threadId": str(uuid4()), "runId": "run-live-1", "state": {}, "messages": [ {"id": "u1", "role": "user", "content": "请用一句话介绍你自己"} ], "tools": [], "context": [], "forwardedProps": {}, }, ) assert run_resp.status_code == 202 accepted = run_resp.json() thread_id = str(accepted["threadId"]) assert thread_id events_url = f"{BASE_URL}/api/v1/agent/runs/{thread_id}/events" event_names: list[str] = [] async with client.stream( "GET", events_url, headers=headers, timeout=20.0 ) as sse_resp: assert sse_resp.status_code == 200 assert sse_resp.headers.get("content-type", "").startswith( "text/event-stream" ) async for line in sse_resp.aiter_lines(): if line.startswith("event:"): event_names.append(line.split(":", 1)[1].strip()) assert "RUN_STARTED" in event_names assert "RUN_FINISHED" in event_names or "RUN_ERROR" in event_names async with AsyncSessionLocal() as session: session_row = await session.get(AgentChatSession, UUID(thread_id)) assert session_row is not None assert session_row.message_count >= 1 assert session_row.total_tokens >= 0 assert session_row.total_cost >= 0 rows = await session.execute( select(AgentChatMessage).where( AgentChatMessage.session_id == UUID(thread_id) ) ) assert len(list(rows.scalars().all())) >= 1