from __future__ import annotations import base64 import os from pathlib import Path from uuid import UUID, uuid4 import httpx import pytest from sqlalchemy import select from core.db.session import AsyncSessionLocal from models.agent_chat_message import AgentChatMessage from models.agent_chat_session import AgentChatSession BASE_URL = os.getenv("AGENT_LIVE_BASE_URL", "http://localhost:5775") FIXTURE_IMAGE_PATH = ( Path(__file__).resolve().parents[3] / "fixtures" / "images" / "calendar_text_cn.png" ) async def _live_access_token(client: httpx.AsyncClient) -> str: email = os.getenv("AGENT_LIVE_EMAIL") password = os.getenv("AGENT_LIVE_PASSWORD") if not email or not password: pytest.fail( "AGENT_LIVE_INTEGRATION=1 requires AGENT_LIVE_EMAIL and AGENT_LIVE_PASSWORD" ) response = await client.post( f"{BASE_URL}/api/v1/auth/sessions", json={"email": email, "password": password}, ) response_text = response.text.strip().replace("\n", " ") truncated_text = response_text[:200] if len(response_text) > 200: truncated_text += "..." assert response.status_code == 200, ( f"live login failed: status={response.status_code}, response={truncated_text!r}" ) token = response.json().get("access_token") assert isinstance(token, str) and token return token @pytest.mark.asyncio @pytest.mark.live async def test_agent_sse_closed_loop_live() -> None: if os.getenv("AGENT_LIVE_INTEGRATION") != "1": pytest.skip("set AGENT_LIVE_INTEGRATION=1 to run live integration test") async with httpx.AsyncClient(timeout=30.0) as client: token = await _live_access_token(client) headers = {"Authorization": f"Bearer {token}"} run_resp = await client.post( f"{BASE_URL}/api/v1/agent/runs", headers=headers, json={ "threadId": str(uuid4()), "runId": "run-live-1", "state": {}, "messages": [ {"id": "u1", "role": "user", "content": "请用一句话介绍你自己"} ], "tools": [], "context": [], "forwardedProps": {}, }, ) assert run_resp.status_code == 202 accepted = run_resp.json() thread_id = str(accepted["threadId"]) assert thread_id events_url = f"{BASE_URL}/api/v1/agent/runs/{thread_id}/events" event_names: list[str] = [] async with client.stream( "GET", events_url, headers=headers, timeout=20.0 ) as sse_resp: assert sse_resp.status_code == 200 assert sse_resp.headers.get("content-type", "").startswith( "text/event-stream" ) async for line in sse_resp.aiter_lines(): if line.startswith("event:"): event_names.append(line.split(":", 1)[1].strip()) assert "RUN_STARTED" in event_names assert "RUN_FINISHED" in event_names or "RUN_ERROR" in event_names async with AsyncSessionLocal() as session: session_row = await session.get(AgentChatSession, UUID(thread_id)) assert session_row is not None assert session_row.message_count >= 1 assert session_row.total_tokens >= 0 assert session_row.total_cost >= 0 rows = await session.execute( select(AgentChatMessage).where( AgentChatMessage.session_id == UUID(thread_id) ) ) assert len(list(rows.scalars().all())) >= 1 @pytest.mark.asyncio @pytest.mark.live async def test_agent_runs_events_history_live_with_image_input() -> None: if os.getenv("AGENT_LIVE_INTEGRATION") != "1": pytest.skip("set AGENT_LIVE_INTEGRATION=1 to run live integration test") image_data = base64.b64encode(FIXTURE_IMAGE_PATH.read_bytes()).decode("ascii") async with httpx.AsyncClient(timeout=30.0) as client: token = await _live_access_token(client) headers = {"Authorization": f"Bearer {token}"} thread_id = str(uuid4()) run_resp = await client.post( f"{BASE_URL}/api/v1/agent/runs", headers=headers, json={ "threadId": thread_id, "runId": "run-live-image-1", "state": {}, "messages": [ { "id": "u1", "role": "user", "content": [ {"type": "text", "text": "请描述图片里的内容"}, { "type": "binary", "data": image_data, "mimeType": "image/png", }, ], } ], "tools": [], "context": [], "forwardedProps": {}, }, ) assert run_resp.status_code == 202 events_url = f"{BASE_URL}/api/v1/agent/runs/{thread_id}/events" event_names: list[str] = [] async with client.stream( "GET", events_url, headers=headers, timeout=90.0 ) as sse_resp: assert sse_resp.status_code == 200 assert sse_resp.headers.get("content-type", "").startswith( "text/event-stream" ) async for line in sse_resp.aiter_lines(): if line.startswith("event:"): event_name = line.split(":", 1)[1].strip() event_names.append(event_name) if event_name in {"RUN_FINISHED", "RUN_ERROR"}: break assert "RUN_STARTED" in event_names assert "RUN_FINISHED" in event_names or "RUN_ERROR" in event_names history_resp = await client.get( f"{BASE_URL}/api/v1/agent/history", headers=headers, params={"threadId": thread_id}, ) assert history_resp.status_code == 200 history = history_resp.json() assert history.get("type") == "STATE_SNAPSHOT" snapshot = history.get("snapshot", {}) assert snapshot.get("scope") == "history_day" messages = snapshot.get("messages", []) user_messages = [ item for item in messages if isinstance(item, dict) and item.get("role") == "user" ] assert user_messages metadata = user_messages[0].get("metadata") assert isinstance(metadata, dict) user_attachment = metadata.get("user_message_attachments") assert isinstance(user_attachment, dict) assert isinstance(user_attachment.get("path"), str) async with AsyncSessionLocal() as session: session_row = await session.get(AgentChatSession, UUID(thread_id)) assert session_row is not None assert session_row.message_count >= 1 assert session_row.total_tokens >= 0 assert session_row.total_cost >= 0 rows = await session.execute( select(AgentChatMessage).where( AgentChatMessage.session_id == UUID(thread_id) ) ) all_messages = list(rows.scalars().all()) assert all_messages user_rows = [ row for row in all_messages if ( getattr(row.role, "value", row.role) == "user" or str(getattr(row.role, "value", row.role)) == "user" ) ] assert user_rows metadata = user_rows[0].metadata_json or {} user_attachment = metadata.get("user_message_attachments") assert isinstance(user_attachment, dict) assert isinstance(user_attachment.get("path"), str)