from __future__ import annotations import json import socket import threading import time from uuid import UUID from playwright.sync_api import sync_playwright import uvicorn from app import app from v1.agent_chat.dependencies import get_agent_chat_service from v1.agent_chat.schemas import ( AgentChatEvent, AgentChatRunRequest, AgentChatRunResponse, ) from v1.agent_chat.service import AgentChatService class FakeE2EAgentChatService(AgentChatService): def __init__(self) -> None: return None async def run(self, payload: AgentChatRunRequest) -> AgentChatRunResponse: session_id = payload.session_id or UUID("00000000-0000-0000-0000-000000000001") return AgentChatRunResponse( session_id=session_id, output=payload.message, events=[ AgentChatEvent(type="run.started", run_id=str(session_id)), AgentChatEvent( type="message.delta", message_id="m1", delta=payload.message ), AgentChatEvent( type="run.completed", run_id=str(session_id), output=payload.message ), ], ) def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) return sock.getsockname()[1] def _wait_for_port(host: str, port: int, timeout: float = 5.0) -> None: deadline = time.time() + timeout while time.time() < deadline: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: if sock.connect_ex((host, port)) == 0: return time.sleep(0.05) raise RuntimeError("Server did not start in time") def _start_server(host: str, port: int): config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) thread = threading.Thread(target=server.run, daemon=True) thread.start() _wait_for_port(host, port) return server, thread def test_agent_chat_flow_e2e() -> None: app.dependency_overrides[get_agent_chat_service] = lambda: FakeE2EAgentChatService() host = "127.0.0.1" port = _find_free_port() server, thread = _start_server(host, port) try: with sync_playwright() as playwright: request_context = playwright.request.new_context( base_url=f"http://{host}:{port}" ) try: response = request_context.post( "/api/v1/agent-chat", data=json.dumps({"message": "hello"}), headers={"Content-Type": "application/json"}, ) assert response.status == 200 body = response.json() assert body["output"] == "hello" assert [event["type"] for event in body["events"]] == [ "run.started", "message.delta", "run.completed", ] finally: request_context.dispose() finally: app.dependency_overrides = {} server.should_exit = True thread.join(timeout=5)