from __future__ import annotations import json import socket import threading import time from pathlib import Path from fastapi import FastAPI from playwright.sync_api import sync_playwright import uvicorn from core.config.settings import Settings from core.logging.config import configure_logging from core.logging.middleware import ( RequestContextMiddleware, register_exception_handlers, ) def _read_json_lines(path: Path) -> list[dict[str, object]]: return [json.loads(line) for line in path.read_text().splitlines() if line.strip()] def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) return sock.getsockname()[1] def _wait_for_port(host: str, port: int, timeout: float = 5.0) -> None: deadline = time.time() + timeout while time.time() < deadline: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: if sock.connect_ex((host, port)) == 0: return time.sleep(0.05) raise RuntimeError("Server did not start in time") def _start_server(app: FastAPI, host: str, port: int): config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) thread = threading.Thread(target=server.run, daemon=True) thread.start() _wait_for_port(host, port) return server, thread def test_e2e_error_logging(tmp_path: Path) -> None: settings = Settings() runtime = settings.runtime.model_copy( update={ "log_dir": str(tmp_path), "log_error_dir": str(tmp_path / "errors"), "log_rotation": "size", "log_rotation_max_bytes": 2048, } ) configure_logging(settings.model_copy(update={"runtime": runtime})) app = FastAPI() app.add_middleware(RequestContextMiddleware) # type: ignore[arg-type] register_exception_handlers(app) @app.get("/boom") async def boom() -> dict[str, str]: raise RuntimeError("boom") host = "127.0.0.1" port = _find_free_port() server, thread = _start_server(app, host, port) try: with sync_playwright() as playwright: request_context = playwright.request.new_context( base_url=f"http://{host}:{port}" ) response = request_context.get( "/boom", headers={"X-Request-ID": "e2e-5000"}, ) assert response.status == 500 request_context.dispose() finally: server.should_exit = True thread.join(timeout=5) error_entries = _read_json_lines(Path(tmp_path) / "errors" / "error.log") entry = next( item for item in error_entries if item.get("message") == "Unhandled exception" ) assert entry["request_id"] == "e2e-5000" exception = str(entry["exception"]) assert "Traceback" in exception