669 lines
21 KiB
Python
669 lines
21 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Callable
|
|
from uuid import UUID
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
from fastapi.testclient import TestClient
|
|
|
|
from app import app
|
|
from core.auth.models import CurrentUser
|
|
from v1.auth.dependencies import get_auth_service
|
|
from v1.users.dependencies import get_current_user
|
|
from v1.auth.rate_limit import reset_rate_limit_state
|
|
from v1.auth.schemas import (
|
|
AuthUser,
|
|
SessionCreateRequest,
|
|
SessionRefreshRequest,
|
|
SessionResponse,
|
|
UserByEmailResponse,
|
|
VerificationCreateRequest,
|
|
VerificationCreateResponse,
|
|
VerificationResendRequest,
|
|
VerificationVerifyRequest,
|
|
)
|
|
from v1.auth.service import AuthService
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_auth_rate_limit_state() -> None:
|
|
reset_rate_limit_state()
|
|
|
|
|
|
class FakeAuthService(AuthService):
|
|
def __init__(self, token_response: SessionResponse) -> None:
|
|
self._token_response = token_response
|
|
|
|
async def create_verification(
|
|
self, request: VerificationCreateRequest
|
|
) -> VerificationCreateResponse:
|
|
if request.email == "exists@example.com":
|
|
raise HTTPException(status_code=422, detail="Invalid signup request")
|
|
return VerificationCreateResponse(email=request.email)
|
|
|
|
async def verify_verification(
|
|
self, request: VerificationVerifyRequest
|
|
) -> SessionResponse:
|
|
if request.token == "000000":
|
|
raise HTTPException(status_code=401, detail="Invalid verification code")
|
|
return self._token_response
|
|
|
|
async def resend_verification(self, request: VerificationResendRequest) -> None:
|
|
return None
|
|
|
|
async def create_session(self, request: SessionCreateRequest) -> SessionResponse:
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
async def refresh_session(self, request: SessionRefreshRequest) -> SessionResponse:
|
|
raise HTTPException(status_code=401, detail="Invalid refresh token")
|
|
|
|
async def delete_session(self, refresh_token: str | None) -> None:
|
|
return None
|
|
|
|
async def get_user_by_email(self, email: str) -> UserByEmailResponse:
|
|
if email == "missing@example.com":
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return UserByEmailResponse(
|
|
id="user-1",
|
|
email=email,
|
|
created_at="2026-02-24T00:00:00Z",
|
|
email_confirmed_at=None,
|
|
)
|
|
|
|
|
|
def _override_auth_service(service: AuthService) -> Callable[[], AuthService]:
|
|
def _get_service() -> AuthService:
|
|
return service
|
|
|
|
return _get_service
|
|
|
|
|
|
def test_signup_start_returns_pending_response() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/verifications",
|
|
json={
|
|
"username": "demo",
|
|
"email": "user@example.com",
|
|
"password": "secret123",
|
|
},
|
|
)
|
|
assert response.status_code == 202
|
|
body = response.json()
|
|
assert body["email"] == "user@example.com"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_verify_returns_token_response() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/verifications/verify",
|
|
json={"email": "user@example.com", "token": "123456"},
|
|
)
|
|
assert response.status_code == 200
|
|
body = response.json()
|
|
assert body["access_token"] == "access"
|
|
assert body["refresh_token"] == "refresh"
|
|
assert body["user"]["email"] == "user@example.com"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_resend_returns_generic_message() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/verifications/resend",
|
|
json={"email": "user@example.com"},
|
|
)
|
|
assert response.status_code == 204
|
|
assert response.content == b""
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_verify_invalid_token_returns_problem_details() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/verifications/verify",
|
|
json={"email": "user@example.com", "token": "000000"},
|
|
)
|
|
assert response.status_code == 401
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Unauthorized"
|
|
assert body["status"] == 401
|
|
assert body["detail"] == "Invalid verification code"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_start_existing_email_returns_problem_details() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/verifications",
|
|
json={
|
|
"username": "demo",
|
|
"email": "exists@example.com",
|
|
"password": "secret123",
|
|
},
|
|
)
|
|
assert response.status_code == 422
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Unprocessable Content"
|
|
assert body["status"] == 422
|
|
assert body["detail"] == "Invalid signup request"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_verify_rate_limited_after_too_many_attempts() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
for _ in range(10):
|
|
ok = client.post(
|
|
"/api/v1/auth/verifications/verify",
|
|
json={"email": "user@example.com", "token": "123456"},
|
|
)
|
|
assert ok.status_code == 200
|
|
|
|
blocked = client.post(
|
|
"/api/v1/auth/verifications/verify",
|
|
json={"email": "user@example.com", "token": "123456"},
|
|
)
|
|
assert blocked.status_code == 429
|
|
assert blocked.headers["content-type"].startswith("application/problem+json")
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_resend_rate_limited_after_too_many_attempts() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
for _ in range(5):
|
|
ok = client.post(
|
|
"/api/v1/auth/verifications/resend",
|
|
json={"email": "user@example.com"},
|
|
)
|
|
assert ok.status_code == 204
|
|
|
|
blocked = client.post(
|
|
"/api/v1/auth/verifications/resend",
|
|
json={"email": "user@example.com"},
|
|
)
|
|
assert blocked.status_code == 429
|
|
assert blocked.headers["content-type"].startswith("application/problem+json")
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_start_rate_limited_after_too_many_attempts() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
for _ in range(5):
|
|
ok = client.post(
|
|
"/api/v1/auth/verifications",
|
|
json={
|
|
"username": "demo",
|
|
"email": "user@example.com",
|
|
"password": "secret123",
|
|
},
|
|
)
|
|
assert ok.status_code == 202
|
|
|
|
blocked = client.post(
|
|
"/api/v1/auth/verifications",
|
|
json={
|
|
"username": "demo",
|
|
"email": "user@example.com",
|
|
"password": "secret123",
|
|
},
|
|
)
|
|
assert blocked.status_code == 429
|
|
assert blocked.headers["content-type"].startswith("application/problem+json")
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_login_invalid_returns_problem_details() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/sessions",
|
|
json={"email": "user@example.com", "password": "wrongpw"},
|
|
)
|
|
assert response.status_code == 401
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Unauthorized"
|
|
assert body["status"] == 401
|
|
assert body["detail"] == "Invalid credentials"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_refresh_invalid_returns_problem_details() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/sessions/refresh",
|
|
json={"refresh_token": "invalid"},
|
|
)
|
|
assert response.status_code == 401
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Unauthorized"
|
|
assert body["status"] == 401
|
|
assert body["detail"] == "Invalid refresh token"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_logout_returns_no_content() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.request(
|
|
"DELETE",
|
|
"/api/v1/auth/sessions",
|
|
json={"refresh_token": "refresh"},
|
|
)
|
|
assert response.status_code == 204
|
|
assert response.content == b""
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_login_rate_limited_after_too_many_attempts() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
for _ in range(10):
|
|
blocked = client.post(
|
|
"/api/v1/auth/sessions",
|
|
json={"email": "user@example.com", "password": "wrongpw"},
|
|
)
|
|
assert blocked.status_code == 401
|
|
|
|
blocked = client.post(
|
|
"/api/v1/auth/sessions",
|
|
json={"email": "user@example.com", "password": "wrongpw"},
|
|
)
|
|
assert blocked.status_code == 429
|
|
assert blocked.headers["content-type"].startswith("application/problem+json")
|
|
body = blocked.json()
|
|
assert body["detail"] == "Too many requests"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_refresh_rate_limited_after_too_many_attempts() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
for _ in range(10):
|
|
blocked = client.post(
|
|
"/api/v1/auth/sessions/refresh",
|
|
json={"refresh_token": "invalid"},
|
|
)
|
|
assert blocked.status_code == 401
|
|
|
|
blocked = client.post(
|
|
"/api/v1/auth/sessions/refresh",
|
|
json={"refresh_token": "invalid"},
|
|
)
|
|
assert blocked.status_code == 429
|
|
assert blocked.headers["content-type"].startswith("application/problem+json")
|
|
body = blocked.json()
|
|
assert body["detail"] == "Too many requests"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_logout_rate_limited_after_too_many_attempts() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
for _ in range(10):
|
|
ok = client.request(
|
|
"DELETE",
|
|
"/api/v1/auth/sessions",
|
|
json={"refresh_token": "refresh"},
|
|
)
|
|
assert ok.status_code == 204
|
|
|
|
blocked = client.request(
|
|
"DELETE",
|
|
"/api/v1/auth/sessions",
|
|
json={"refresh_token": "refresh"},
|
|
)
|
|
assert blocked.status_code == 429
|
|
assert blocked.headers["content-type"].startswith("application/problem+json")
|
|
body = blocked.json()
|
|
assert body["detail"] == "Too many requests"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_start_validation_error_returns_problem_details() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post("/api/v1/auth/verifications", json={})
|
|
assert response.status_code == 422
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Unprocessable Content"
|
|
assert body["status"] == 422
|
|
assert body["detail"] == "Invalid request"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_signup_start_missing_username_returns_problem_details() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.post(
|
|
"/api/v1/auth/verifications",
|
|
json={"email": "user@example.com", "password": "secret123"},
|
|
)
|
|
assert response.status_code == 422
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Unprocessable Content"
|
|
assert body["status"] == 422
|
|
assert body["detail"] == "Invalid request"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_get_user_by_email_returns_user() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
app.dependency_overrides[get_current_user] = lambda: CurrentUser(
|
|
id=UUID("00000000-0000-0000-0000-000000000001"),
|
|
email="user@example.com",
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.get(
|
|
"/api/v1/auth/users",
|
|
params={"email": "user@example.com"},
|
|
)
|
|
assert response.status_code == 200
|
|
body = response.json()
|
|
assert body["email"] == "user@example.com"
|
|
assert body["id"] == "user-1"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_get_user_by_email_not_found_returns_problem_details() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
app.dependency_overrides[get_current_user] = lambda: CurrentUser(
|
|
id=UUID("00000000-0000-0000-0000-000000000001"),
|
|
email="missing@example.com",
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.get(
|
|
"/api/v1/auth/users",
|
|
params={"email": "missing@example.com"},
|
|
)
|
|
assert response.status_code == 404
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Not Found"
|
|
assert body["status"] == 404
|
|
assert body["detail"] == "User not found"
|
|
finally:
|
|
app.dependency_overrides = {}
|
|
|
|
|
|
def test_get_user_by_email_forbidden_when_querying_other_user() -> None:
|
|
user = AuthUser(id="user-1", email="user@example.com")
|
|
token_response = SessionResponse(
|
|
access_token="access",
|
|
refresh_token="refresh",
|
|
expires_in=3600,
|
|
token_type="bearer",
|
|
user=user,
|
|
)
|
|
app.dependency_overrides[get_auth_service] = _override_auth_service(
|
|
FakeAuthService(token_response)
|
|
)
|
|
app.dependency_overrides[get_current_user] = lambda: CurrentUser(
|
|
id=UUID("00000000-0000-0000-0000-000000000001"),
|
|
email="self@example.com",
|
|
)
|
|
|
|
client = TestClient(app)
|
|
try:
|
|
response = client.get(
|
|
"/api/v1/auth/users",
|
|
params={"email": "target@example.com"},
|
|
)
|
|
assert response.status_code == 403
|
|
assert response.headers["content-type"].startswith("application/problem+json")
|
|
body = response.json()
|
|
assert body["title"] == "Forbidden"
|
|
assert body["status"] == 403
|
|
assert body["detail"] == "Forbidden"
|
|
finally:
|
|
app.dependency_overrides = {}
|