Files
social-app/backend/tests/integration/test_auth_routes.py
T

851 lines
26 KiB
Python
Raw Normal View History

from __future__ import annotations
from typing import Callable
import pytest
from fastapi import HTTPException
from fastapi.testclient import TestClient
from app import app
from v1.auth.dependencies import get_auth_service
from v1.auth.rate_limit import reset_rate_limit_state
from v1.auth.schemas import (
AuthUser,
PasswordResetConfirmRequest,
PasswordResetRequest,
SessionCreateRequest,
SessionRefreshRequest,
SessionResponse,
UserByEmailResponse,
VerificationCreateRequest,
VerificationCreateResponse,
VerificationResendRequest,
VerificationVerifyRequest,
)
from v1.auth.service import AuthService
@pytest.fixture(autouse=True)
def reset_auth_rate_limit_state() -> None:
reset_rate_limit_state()
class FakeAuthService(AuthService):
def __init__(self, token_response: SessionResponse) -> None:
self._token_response = token_response
async def create_verification(
self, request: VerificationCreateRequest
) -> VerificationCreateResponse:
if request.email == "exists@example.com":
raise HTTPException(status_code=422, detail="Invalid signup request")
return VerificationCreateResponse(email=request.email)
async def verify_verification(
self, request: VerificationVerifyRequest
) -> SessionResponse:
if request.token == "000000":
raise HTTPException(status_code=401, detail="Invalid verification code")
return self._token_response
async def resend_verification(self, request: VerificationResendRequest) -> None:
return None
async def create_session(self, request: SessionCreateRequest) -> SessionResponse:
raise HTTPException(status_code=401, detail="Invalid credentials")
async def refresh_session(self, request: SessionRefreshRequest) -> SessionResponse:
raise HTTPException(status_code=401, detail="Invalid refresh token")
async def delete_session(self, refresh_token: str | None) -> None:
return None
async def get_user_by_email(self, email: str) -> UserByEmailResponse:
if email == "missing@example.com":
raise HTTPException(status_code=404, detail="User not found")
return UserByEmailResponse(
id="user-1",
email=email,
created_at="2026-02-24T00:00:00Z",
email_confirmed_at=None,
)
async def request_password_reset(self, request: PasswordResetRequest) -> None:
return None
async def confirm_password_reset(
self, request: PasswordResetConfirmRequest
) -> None:
if request.token == "000000":
raise HTTPException(
status_code=401, detail="Invalid or expired verification code"
)
return None
def _override_auth_service(service: AuthService) -> Callable[[], AuthService]:
def _get_service() -> AuthService:
return service
return _get_service
def test_signup_start_returns_pending_response() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
},
)
assert response.status_code == 202
assert response.json() == {"email": "user@example.com"}
finally:
app.dependency_overrides = {}
def test_signup_verify_returns_token_response() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "123456"},
)
assert response.status_code == 200
body = response.json()
assert body["access_token"] == "access"
assert body["refresh_token"] == "refresh"
assert body["user"]["email"] == "user@example.com"
finally:
app.dependency_overrides = {}
def test_signup_resend_returns_generic_message() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/resend",
json={"type": "recovery", "email": "user@example.com"},
)
assert response.status_code == 204
assert response.content == b""
finally:
app.dependency_overrides = {}
def test_signup_verify_invalid_token_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "000000"},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
assert body["detail"] == "Invalid verification code"
finally:
app.dependency_overrides = {}
def test_signup_start_existing_email_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "exists@example.com",
"password": "secret123",
},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unprocessable Entity"
assert body["status"] == 422
assert body["detail"] == "Invalid signup request"
finally:
app.dependency_overrides = {}
def test_signup_verify_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
ok = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "123456"},
)
assert ok.status_code == 200
blocked = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "123456"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
def test_signup_resend_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(5):
ok = client.post(
"/api/v1/auth/resend",
json={"email": "user@example.com"},
)
assert ok.status_code == 204
blocked = client.post(
"/api/v1/auth/resend",
json={"email": "user@example.com"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
def test_signup_start_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(5):
ok = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
},
)
assert ok.status_code == 202
blocked = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
def test_login_invalid_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/sessions",
json={"email": "user@example.com", "password": "wrongpw"},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
assert body["detail"] == "Invalid credentials"
finally:
app.dependency_overrides = {}
def test_refresh_invalid_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid"},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
assert body["detail"] == "Invalid refresh token"
finally:
app.dependency_overrides = {}
def test_logout_returns_no_content() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh"},
)
assert response.status_code == 204
assert response.content == b""
finally:
app.dependency_overrides = {}
def test_login_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
blocked = client.post(
"/api/v1/auth/sessions",
json={"email": "user@example.com", "password": "wrongpw"},
)
assert blocked.status_code == 401
blocked = client.post(
"/api/v1/auth/sessions",
json={"email": "user@example.com", "password": "wrongpw"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
body = blocked.json()
assert body["detail"] == "Too many requests"
finally:
app.dependency_overrides = {}
def test_refresh_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid"},
)
assert blocked.status_code == 401
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
body = blocked.json()
assert body["detail"] == "Too many requests"
finally:
app.dependency_overrides = {}
def test_refresh_rate_limit_not_bypassed_by_changing_refresh_token() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for index in range(10):
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": f"invalid-{index}"},
)
assert blocked.status_code == 401
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid-extra"},
)
assert blocked.status_code == 429
finally:
app.dependency_overrides = {}
def test_logout_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
ok = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh"},
)
assert ok.status_code == 204
blocked = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
body = blocked.json()
assert body["detail"] == "Too many requests"
finally:
app.dependency_overrides = {}
def test_logout_rate_limit_not_bypassed_by_changing_refresh_token() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for index in range(10):
ok = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": f"refresh-{index}"},
)
assert ok.status_code == 204
blocked = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh-extra"},
)
assert blocked.status_code == 429
finally:
app.dependency_overrides = {}
def test_signup_start_validation_error_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post("/api/v1/auth/verifications", json={})
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unprocessable Entity"
assert body["status"] == 422
assert body["detail"] == "Invalid request"
finally:
app.dependency_overrides = {}
def test_signup_start_missing_username_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={"email": "user@example.com", "password": "secret123"},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unprocessable Entity"
assert body["status"] == 422
assert body["detail"] == "Invalid request"
finally:
app.dependency_overrides = {}
def test_password_reset_request_returns_204() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/resend",
json={"email": "user@example.com"},
)
assert response.status_code == 204
finally:
app.dependency_overrides = {}
def test_password_reset_confirm_returns_204() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={
"type": "recovery",
"email": "user@example.com",
"token": "123456",
"new_password": "newpassword123",
},
)
assert response.status_code == 204
finally:
app.dependency_overrides = {}
def test_password_reset_confirm_invalid_token_returns_401() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={
"type": "recovery",
"email": "user@example.com",
"token": "000000",
"new_password": "newpassword123",
},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
finally:
app.dependency_overrides = {}
def test_password_reset_confirm_weak_password_returns_422() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={
"type": "recovery",
"email": "user@example.com",
"token": "123456",
"new_password": "123",
},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
class TestInviteCodeSignup:
def test_signup_with_valid_invite_code_returns_202(self) -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
"invite_code": "A2B3",
},
)
assert response.status_code == 202
assert response.json() == {"email": "user@example.com"}
finally:
app.dependency_overrides = {}
def test_signup_with_invalid_invite_code_length_returns_202(self) -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
"invite_code": "ABC123",
},
)
assert response.status_code == 202
assert response.json() == {"email": "user@example.com"}
finally:
app.dependency_overrides = {}
def test_signup_with_invalid_invite_code_chars_returns_202(self) -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
"invite_code": "ABCD1234",
},
)
assert response.status_code == 202
assert response.json() == {"email": "user@example.com"}
finally:
app.dependency_overrides = {}