from __future__ import annotations import json import socket import threading import time from playwright.sync_api import sync_playwright import uvicorn from app import app from v1.auth.dependencies import get_auth_service from v1.auth.schemas import ( AuthTokenResponse, AuthUser, LoginRequest, RefreshRequest, SignupRequest, ) from v1.auth.service import AuthService class FakeE2EAuthService(AuthService): def __init__(self) -> None: self._user = AuthUser(id="user-1", email="user@example.com") async def signup(self, request: SignupRequest) -> AuthTokenResponse: return AuthTokenResponse( access_token="access-1", refresh_token="refresh-1", expires_in=3600, token_type="bearer", user=self._user, ) async def login(self, request: LoginRequest) -> AuthTokenResponse: return AuthTokenResponse( access_token="access-2", refresh_token="refresh-2", expires_in=3600, token_type="bearer", user=self._user, ) async def refresh(self, request: RefreshRequest) -> AuthTokenResponse: return AuthTokenResponse( access_token="access-3", refresh_token="refresh-3", expires_in=3600, token_type="bearer", user=self._user, ) async def logout(self, refresh_token: str | None) -> None: return None def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) return sock.getsockname()[1] def _wait_for_port(host: str, port: int, timeout: float = 5.0) -> None: deadline = time.time() + timeout while time.time() < deadline: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: if sock.connect_ex((host, port)) == 0: return time.sleep(0.05) raise RuntimeError("Server did not start in time") def _start_server(host: str, port: int): config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) thread = threading.Thread(target=server.run, daemon=True) thread.start() _wait_for_port(host, port) return server, thread def test_auth_flow_e2e() -> None: app.dependency_overrides[get_auth_service] = lambda: FakeE2EAuthService() host = "127.0.0.1" port = _find_free_port() server, thread = _start_server(host, port) try: with sync_playwright() as playwright: request_context = playwright.request.new_context( base_url=f"http://{host}:{port}" ) try: signup = request_context.post( "/api/v1/auth/signup", data=json.dumps( { "username": "demo", "email": "user@example.com", "password": "secret123", } ), headers={"Content-Type": "application/json"}, ) assert signup.status == 200 assert signup.json()["access_token"] == "access-1" login = request_context.post( "/api/v1/auth/login", data=json.dumps( {"email": "user@example.com", "password": "secret123"} ), headers={"Content-Type": "application/json"}, ) assert login.status == 200 assert login.json()["access_token"] == "access-2" refresh = request_context.post( "/api/v1/auth/refresh", data=json.dumps({"refresh_token": "refresh-2"}), headers={"Content-Type": "application/json"}, ) assert refresh.status == 200 assert refresh.json()["access_token"] == "access-3" logout = request_context.post( "/api/v1/auth/logout", data=json.dumps({"refresh_token": "refresh-3"}), headers={"Content-Type": "application/json"}, ) assert logout.status == 204 finally: request_context.dispose() finally: app.dependency_overrides = {} server.should_exit = True thread.join(timeout=5)