Files
social-app/backend/tests/integration/test_auth_routes.py
T
zl-q ec33bb0cee refactor: 统一认证端点并删除冗余 profile 模块
- 合并 auth 端点: /verifications/verify → /verify, /verifications/resend → /resend
- 整合密码重置到 /verify 端点 (type=recovery)
- 移除未使用的 /auth/users 端点
- 添加 redirect URL 白名单验证 (site_url + additional_redirect_urls)
- 限流改用 Redis + IP 标识,替代内存锁
- 删除 v1/profile 死代码模块
- 更新前端 auth_api 适配新端点
- 添加 supabase site_url 和 additional_redirect_urls 配置
2026-03-07 14:55:00 +08:00

858 lines
27 KiB
Python

from __future__ import annotations
from typing import Callable
from uuid import UUID
import pytest
from fastapi import HTTPException
from fastapi.testclient import TestClient
from app import app
from core.auth.models import CurrentUser
from v1.auth.dependencies import get_auth_service
from v1.users.dependencies import get_current_user
from v1.auth.rate_limit import reset_rate_limit_state
from v1.auth.schemas import (
AuthUser,
PasswordResetConfirmRequest,
PasswordResetRequest,
SessionCreateRequest,
SessionRefreshRequest,
SessionResponse,
UserByEmailResponse,
VerificationCreateRequest,
VerificationCreateResponse,
VerificationResendRequest,
VerificationVerifyRequest,
)
from v1.auth.service import AuthService
@pytest.fixture(autouse=True)
def reset_auth_rate_limit_state() -> None:
reset_rate_limit_state()
class FakeAuthService(AuthService):
def __init__(self, token_response: SessionResponse) -> None:
self._token_response = token_response
async def create_verification(
self, request: VerificationCreateRequest
) -> VerificationCreateResponse:
if request.email == "exists@example.com":
raise HTTPException(status_code=422, detail="Invalid signup request")
return VerificationCreateResponse(email=request.email)
async def verify_verification(
self, request: VerificationVerifyRequest
) -> SessionResponse:
if request.token == "000000":
raise HTTPException(status_code=401, detail="Invalid verification code")
return self._token_response
async def resend_verification(self, request: VerificationResendRequest) -> None:
return None
async def create_session(self, request: SessionCreateRequest) -> SessionResponse:
raise HTTPException(status_code=401, detail="Invalid credentials")
async def refresh_session(self, request: SessionRefreshRequest) -> SessionResponse:
raise HTTPException(status_code=401, detail="Invalid refresh token")
async def delete_session(self, refresh_token: str | None) -> None:
return None
async def get_user_by_email(self, email: str) -> UserByEmailResponse:
if email == "missing@example.com":
raise HTTPException(status_code=404, detail="User not found")
return UserByEmailResponse(
id="user-1",
email=email,
created_at="2026-02-24T00:00:00Z",
email_confirmed_at=None,
)
async def request_password_reset(self, request: PasswordResetRequest) -> None:
return None
async def confirm_password_reset(
self, request: PasswordResetConfirmRequest
) -> None:
if request.token == "000000":
raise HTTPException(
status_code=401, detail="Invalid or expired verification code"
)
return None
def _override_auth_service(service: AuthService) -> Callable[[], AuthService]:
def _get_service() -> AuthService:
return service
return _get_service
def test_signup_start_returns_pending_response() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
},
)
assert response.status_code == 202
assert response.json() == {"email": "user@example.com"}
finally:
app.dependency_overrides = {}
def test_signup_verify_returns_token_response() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "123456"},
)
assert response.status_code == 200
body = response.json()
assert body["access_token"] == "access"
assert body["refresh_token"] == "refresh"
assert body["user"]["email"] == "user@example.com"
finally:
app.dependency_overrides = {}
def test_signup_resend_returns_generic_message() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/resend",
json={"type": "recovery", "email": "user@example.com"},
)
assert response.status_code == 204
assert response.content == b""
finally:
app.dependency_overrides = {}
def test_signup_verify_invalid_token_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "000000"},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
assert body["detail"] == "Invalid verification code"
finally:
app.dependency_overrides = {}
def test_signup_start_existing_email_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "exists@example.com",
"password": "secret123",
},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unprocessable Entity"
assert body["status"] == 422
assert body["detail"] == "Invalid signup request"
finally:
app.dependency_overrides = {}
def test_signup_verify_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
ok = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "123456"},
)
assert ok.status_code == 200
blocked = client.post(
"/api/v1/auth/verify",
json={"email": "user@example.com", "token": "123456"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
def test_signup_resend_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(5):
ok = client.post(
"/api/v1/auth/resend",
json={"email": "user@example.com"},
)
assert ok.status_code == 204
blocked = client.post(
"/api/v1/auth/resend",
json={"email": "user@example.com"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
def test_signup_start_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(5):
ok = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
},
)
assert ok.status_code == 202
blocked = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
def test_login_invalid_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/sessions",
json={"email": "user@example.com", "password": "wrongpw"},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
assert body["detail"] == "Invalid credentials"
finally:
app.dependency_overrides = {}
def test_refresh_invalid_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid"},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
assert body["detail"] == "Invalid refresh token"
finally:
app.dependency_overrides = {}
def test_logout_returns_no_content() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh"},
)
assert response.status_code == 204
assert response.content == b""
finally:
app.dependency_overrides = {}
def test_login_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
blocked = client.post(
"/api/v1/auth/sessions",
json={"email": "user@example.com", "password": "wrongpw"},
)
assert blocked.status_code == 401
blocked = client.post(
"/api/v1/auth/sessions",
json={"email": "user@example.com", "password": "wrongpw"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
body = blocked.json()
assert body["detail"] == "Too many requests"
finally:
app.dependency_overrides = {}
def test_refresh_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid"},
)
assert blocked.status_code == 401
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
body = blocked.json()
assert body["detail"] == "Too many requests"
finally:
app.dependency_overrides = {}
def test_refresh_rate_limit_not_bypassed_by_changing_refresh_token() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for index in range(10):
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": f"invalid-{index}"},
)
assert blocked.status_code == 401
blocked = client.post(
"/api/v1/auth/sessions/refresh",
json={"refresh_token": "invalid-extra"},
)
assert blocked.status_code == 429
finally:
app.dependency_overrides = {}
def test_logout_rate_limited_after_too_many_attempts() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for _ in range(10):
ok = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh"},
)
assert ok.status_code == 204
blocked = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh"},
)
assert blocked.status_code == 429
assert blocked.headers["content-type"].startswith("application/problem+json")
body = blocked.json()
assert body["detail"] == "Too many requests"
finally:
app.dependency_overrides = {}
def test_logout_rate_limit_not_bypassed_by_changing_refresh_token() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
for index in range(10):
ok = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": f"refresh-{index}"},
)
assert ok.status_code == 204
blocked = client.request(
"DELETE",
"/api/v1/auth/sessions",
json={"refresh_token": "refresh-extra"},
)
assert blocked.status_code == 429
finally:
app.dependency_overrides = {}
def test_signup_start_validation_error_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post("/api/v1/auth/verifications", json={})
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unprocessable Entity"
assert body["status"] == 422
assert body["detail"] == "Invalid request"
finally:
app.dependency_overrides = {}
def test_signup_start_missing_username_returns_problem_details() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={"email": "user@example.com", "password": "secret123"},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unprocessable Entity"
assert body["status"] == 422
assert body["detail"] == "Invalid request"
finally:
app.dependency_overrides = {}
def test_password_reset_request_returns_204() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/resend",
json={"email": "user@example.com"},
)
assert response.status_code == 204
finally:
app.dependency_overrides = {}
def test_password_reset_confirm_returns_204() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={
"type": "recovery",
"email": "user@example.com",
"token": "123456",
"new_password": "newpassword123",
},
)
assert response.status_code == 204
finally:
app.dependency_overrides = {}
def test_password_reset_confirm_invalid_token_returns_401() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={
"type": "recovery",
"email": "user@example.com",
"token": "000000",
"new_password": "newpassword123",
},
)
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/problem+json")
body = response.json()
assert body["title"] == "Unauthorized"
assert body["status"] == 401
finally:
app.dependency_overrides = {}
def test_password_reset_confirm_weak_password_returns_422() -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verify",
json={
"type": "recovery",
"email": "user@example.com",
"token": "123456",
"new_password": "123",
},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith("application/problem+json")
finally:
app.dependency_overrides = {}
class TestInviteCodeSignup:
def test_signup_with_valid_invite_code_returns_202(self) -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
"invite_code": "A2B3C4D5",
},
)
assert response.status_code == 202
assert response.json() == {"email": "user@example.com"}
finally:
app.dependency_overrides = {}
def test_signup_with_invalid_invite_code_length_returns_422(self) -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
"invite_code": "ABC123",
},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith(
"application/problem+json"
)
finally:
app.dependency_overrides = {}
def test_signup_with_invalid_invite_code_chars_returns_422(self) -> None:
user = AuthUser(id="user-1", email="user@example.com")
token_response = SessionResponse(
access_token="access",
refresh_token="refresh",
expires_in=3600,
token_type="bearer",
user=user,
)
app.dependency_overrides[get_auth_service] = _override_auth_service(
FakeAuthService(token_response)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/auth/verifications",
json={
"username": "demo",
"email": "user@example.com",
"password": "secret123",
"invite_code": "ABCD1234",
},
)
assert response.status_code == 422
assert response.headers["content-type"].startswith(
"application/problem+json"
)
finally:
app.dependency_overrides = {}