from __future__ import annotations from typing import Callable import pytest from fastapi import HTTPException from fastapi.testclient import TestClient from app import app from v1.auth.dependencies import get_auth_service from v1.auth.rate_limit import reset_rate_limit_state from v1.auth.schemas import ( AuthUser, PasswordResetConfirmRequest, PasswordResetRequest, SessionCreateRequest, SessionRefreshRequest, SessionResponse, UserByEmailResponse, VerificationCreateRequest, VerificationCreateResponse, VerificationResendRequest, VerificationVerifyRequest, ) from v1.auth.service import AuthService @pytest.fixture(autouse=True) def reset_auth_rate_limit_state() -> None: reset_rate_limit_state() class FakeAuthService(AuthService): def __init__(self, token_response: SessionResponse) -> None: self._token_response = token_response async def create_verification( self, request: VerificationCreateRequest ) -> VerificationCreateResponse: if request.email == "exists@example.com": raise HTTPException(status_code=422, detail="Invalid signup request") return VerificationCreateResponse(email=request.email) async def verify_verification( self, request: VerificationVerifyRequest ) -> SessionResponse: if request.token == "000000": raise HTTPException(status_code=401, detail="Invalid verification code") return self._token_response async def resend_verification(self, request: VerificationResendRequest) -> None: return None async def create_session(self, request: SessionCreateRequest) -> SessionResponse: raise HTTPException(status_code=401, detail="Invalid credentials") async def refresh_session(self, request: SessionRefreshRequest) -> SessionResponse: raise HTTPException(status_code=401, detail="Invalid refresh token") async def delete_session(self, refresh_token: str | None) -> None: return None async def get_user_by_email(self, email: str) -> UserByEmailResponse: if email == "missing@example.com": raise HTTPException(status_code=404, detail="User not found") return UserByEmailResponse( id="user-1", email=email, created_at="2026-02-24T00:00:00Z", email_confirmed_at=None, ) async def request_password_reset(self, request: PasswordResetRequest) -> None: return None async def confirm_password_reset( self, request: PasswordResetConfirmRequest ) -> None: if request.token == "000000": raise HTTPException( status_code=401, detail="Invalid or expired verification code" ) return None def _override_auth_service(service: AuthService) -> Callable[[], AuthService]: def _get_service() -> AuthService: return service return _get_service def test_signup_start_returns_pending_response() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verifications", json={ "username": "demo", "email": "user@example.com", "password": "secret123", }, ) assert response.status_code == 202 assert response.json() == {"email": "user@example.com"} finally: app.dependency_overrides = {} def test_signup_verify_returns_token_response() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verify", json={"email": "user@example.com", "token": "123456"}, ) assert response.status_code == 200 body = response.json() assert body["access_token"] == "access" assert body["refresh_token"] == "refresh" assert body["user"]["email"] == "user@example.com" finally: app.dependency_overrides = {} def test_signup_resend_returns_generic_message() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/resend", json={"type": "recovery", "email": "user@example.com"}, ) assert response.status_code == 204 assert response.content == b"" finally: app.dependency_overrides = {} def test_signup_verify_invalid_token_returns_problem_details() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verify", json={"email": "user@example.com", "token": "000000"}, ) assert response.status_code == 401 assert response.headers["content-type"].startswith("application/problem+json") body = response.json() assert body["title"] == "Unauthorized" assert body["status"] == 401 assert body["detail"] == "Invalid verification code" finally: app.dependency_overrides = {} def test_signup_start_existing_email_returns_problem_details() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verifications", json={ "username": "demo", "email": "exists@example.com", "password": "secret123", }, ) assert response.status_code == 422 assert response.headers["content-type"].startswith("application/problem+json") body = response.json() assert body["title"] == "Unprocessable Entity" assert body["status"] == 422 assert body["detail"] == "Invalid signup request" finally: app.dependency_overrides = {} def test_signup_verify_rate_limited_after_too_many_attempts() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for _ in range(10): ok = client.post( "/api/v1/auth/verify", json={"email": "user@example.com", "token": "123456"}, ) assert ok.status_code == 200 blocked = client.post( "/api/v1/auth/verify", json={"email": "user@example.com", "token": "123456"}, ) assert blocked.status_code == 429 assert blocked.headers["content-type"].startswith("application/problem+json") finally: app.dependency_overrides = {} def test_signup_resend_rate_limited_after_too_many_attempts() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for _ in range(5): ok = client.post( "/api/v1/auth/resend", json={"email": "user@example.com"}, ) assert ok.status_code == 204 blocked = client.post( "/api/v1/auth/resend", json={"email": "user@example.com"}, ) assert blocked.status_code == 429 assert blocked.headers["content-type"].startswith("application/problem+json") finally: app.dependency_overrides = {} def test_signup_start_rate_limited_after_too_many_attempts() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for _ in range(5): ok = client.post( "/api/v1/auth/verifications", json={ "username": "demo", "email": "user@example.com", "password": "secret123", }, ) assert ok.status_code == 202 blocked = client.post( "/api/v1/auth/verifications", json={ "username": "demo", "email": "user@example.com", "password": "secret123", }, ) assert blocked.status_code == 429 assert blocked.headers["content-type"].startswith("application/problem+json") finally: app.dependency_overrides = {} def test_login_invalid_returns_problem_details() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/sessions", json={"email": "user@example.com", "password": "wrongpw"}, ) assert response.status_code == 401 assert response.headers["content-type"].startswith("application/problem+json") body = response.json() assert body["title"] == "Unauthorized" assert body["status"] == 401 assert body["detail"] == "Invalid credentials" finally: app.dependency_overrides = {} def test_refresh_invalid_returns_problem_details() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/sessions/refresh", json={"refresh_token": "invalid"}, ) assert response.status_code == 401 assert response.headers["content-type"].startswith("application/problem+json") body = response.json() assert body["title"] == "Unauthorized" assert body["status"] == 401 assert body["detail"] == "Invalid refresh token" finally: app.dependency_overrides = {} def test_logout_returns_no_content() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.request( "DELETE", "/api/v1/auth/sessions", json={"refresh_token": "refresh"}, ) assert response.status_code == 204 assert response.content == b"" finally: app.dependency_overrides = {} def test_login_rate_limited_after_too_many_attempts() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for _ in range(10): blocked = client.post( "/api/v1/auth/sessions", json={"email": "user@example.com", "password": "wrongpw"}, ) assert blocked.status_code == 401 blocked = client.post( "/api/v1/auth/sessions", json={"email": "user@example.com", "password": "wrongpw"}, ) assert blocked.status_code == 429 assert blocked.headers["content-type"].startswith("application/problem+json") body = blocked.json() assert body["detail"] == "Too many requests" finally: app.dependency_overrides = {} def test_refresh_rate_limited_after_too_many_attempts() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for _ in range(10): blocked = client.post( "/api/v1/auth/sessions/refresh", json={"refresh_token": "invalid"}, ) assert blocked.status_code == 401 blocked = client.post( "/api/v1/auth/sessions/refresh", json={"refresh_token": "invalid"}, ) assert blocked.status_code == 429 assert blocked.headers["content-type"].startswith("application/problem+json") body = blocked.json() assert body["detail"] == "Too many requests" finally: app.dependency_overrides = {} def test_refresh_rate_limit_not_bypassed_by_changing_refresh_token() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for index in range(10): blocked = client.post( "/api/v1/auth/sessions/refresh", json={"refresh_token": f"invalid-{index}"}, ) assert blocked.status_code == 401 blocked = client.post( "/api/v1/auth/sessions/refresh", json={"refresh_token": "invalid-extra"}, ) assert blocked.status_code == 429 finally: app.dependency_overrides = {} def test_logout_rate_limited_after_too_many_attempts() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for _ in range(10): ok = client.request( "DELETE", "/api/v1/auth/sessions", json={"refresh_token": "refresh"}, ) assert ok.status_code == 204 blocked = client.request( "DELETE", "/api/v1/auth/sessions", json={"refresh_token": "refresh"}, ) assert blocked.status_code == 429 assert blocked.headers["content-type"].startswith("application/problem+json") body = blocked.json() assert body["detail"] == "Too many requests" finally: app.dependency_overrides = {} def test_logout_rate_limit_not_bypassed_by_changing_refresh_token() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: for index in range(10): ok = client.request( "DELETE", "/api/v1/auth/sessions", json={"refresh_token": f"refresh-{index}"}, ) assert ok.status_code == 204 blocked = client.request( "DELETE", "/api/v1/auth/sessions", json={"refresh_token": "refresh-extra"}, ) assert blocked.status_code == 429 finally: app.dependency_overrides = {} def test_signup_start_validation_error_returns_problem_details() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post("/api/v1/auth/verifications", json={}) assert response.status_code == 422 assert response.headers["content-type"].startswith("application/problem+json") body = response.json() assert body["title"] == "Unprocessable Entity" assert body["status"] == 422 assert body["detail"] == "Invalid request" finally: app.dependency_overrides = {} def test_signup_start_missing_username_returns_problem_details() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verifications", json={"email": "user@example.com", "password": "secret123"}, ) assert response.status_code == 422 assert response.headers["content-type"].startswith("application/problem+json") body = response.json() assert body["title"] == "Unprocessable Entity" assert body["status"] == 422 assert body["detail"] == "Invalid request" finally: app.dependency_overrides = {} def test_password_reset_request_returns_204() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/resend", json={"email": "user@example.com"}, ) assert response.status_code == 204 finally: app.dependency_overrides = {} def test_password_reset_confirm_returns_204() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verify", json={ "type": "recovery", "email": "user@example.com", "token": "123456", "new_password": "newpassword123", }, ) assert response.status_code == 204 finally: app.dependency_overrides = {} def test_password_reset_confirm_invalid_token_returns_401() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verify", json={ "type": "recovery", "email": "user@example.com", "token": "000000", "new_password": "newpassword123", }, ) assert response.status_code == 401 assert response.headers["content-type"].startswith("application/problem+json") body = response.json() assert body["title"] == "Unauthorized" assert body["status"] == 401 finally: app.dependency_overrides = {} def test_password_reset_confirm_weak_password_returns_422() -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verify", json={ "type": "recovery", "email": "user@example.com", "token": "123456", "new_password": "123", }, ) assert response.status_code == 422 assert response.headers["content-type"].startswith("application/problem+json") finally: app.dependency_overrides = {} class TestInviteCodeSignup: def test_signup_with_valid_invite_code_returns_202(self) -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verifications", json={ "username": "demo", "email": "user@example.com", "password": "secret123", "invite_code": "A2B3", }, ) assert response.status_code == 202 assert response.json() == {"email": "user@example.com"} finally: app.dependency_overrides = {} def test_signup_with_invalid_invite_code_length_returns_202(self) -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verifications", json={ "username": "demo", "email": "user@example.com", "password": "secret123", "invite_code": "ABC123", }, ) assert response.status_code == 202 assert response.json() == {"email": "user@example.com"} finally: app.dependency_overrides = {} def test_signup_with_invalid_invite_code_chars_returns_202(self) -> None: user = AuthUser(id="user-1", email="user@example.com") token_response = SessionResponse( access_token="access", refresh_token="refresh", expires_in=3600, token_type="bearer", user=user, ) app.dependency_overrides[get_auth_service] = _override_auth_service( FakeAuthService(token_response) ) client = TestClient(app) try: response = client.post( "/api/v1/auth/verifications", json={ "username": "demo", "email": "user@example.com", "password": "secret123", "invite_code": "ABCD1234", }, ) assert response.status_code == 202 assert response.json() == {"email": "user@example.com"} finally: app.dependency_overrides = {}