286 lines
9.2 KiB
Python
286 lines
9.2 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import time
|
||
|
|
from typing import Any
|
||
|
|
from uuid import UUID
|
||
|
|
|
||
|
|
import jwt
|
||
|
|
import pytest
|
||
|
|
from fastapi import HTTPException
|
||
|
|
|
||
|
|
from core.auth.models import CurrentUser
|
||
|
|
from v1.profile.dependencies import get_current_user
|
||
|
|
|
||
|
|
|
||
|
|
class TestGetCurrentUser:
|
||
|
|
"""Tests for JWT validation in get_current_user dependency."""
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def jwt_secret(self) -> str:
|
||
|
|
return "super-secret-jwt-token-with-at-least-32-characters"
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def valid_user_id(self) -> str:
|
||
|
|
return "00000000-0000-0000-0000-000000000123"
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def valid_payload(self, valid_user_id: str) -> dict[str, Any]:
|
||
|
|
"""Valid JWT payload with all required claims."""
|
||
|
|
now = int(time.time())
|
||
|
|
return {
|
||
|
|
"sub": valid_user_id,
|
||
|
|
"aud": "authenticated",
|
||
|
|
"iss": "http://localhost:8001/auth/v1",
|
||
|
|
"exp": now + 3600, # 1 hour from now
|
||
|
|
"iat": now,
|
||
|
|
}
|
||
|
|
|
||
|
|
def _create_token(self, payload: dict[str, Any], secret: str) -> str:
|
||
|
|
return jwt.encode(payload, secret, algorithm="HS256")
|
||
|
|
|
||
|
|
def test_valid_token_returns_current_user(
|
||
|
|
self,
|
||
|
|
jwt_secret: str,
|
||
|
|
valid_payload: dict[str, Any],
|
||
|
|
valid_user_id: str,
|
||
|
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
|
) -> None:
|
||
|
|
"""Valid JWT with correct aud/iss/exp should return CurrentUser."""
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.jwt_secret", jwt_secret
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_scheme",
|
||
|
|
"http",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_host",
|
||
|
|
"localhost",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.kong_http_port",
|
||
|
|
8001,
|
||
|
|
)
|
||
|
|
|
||
|
|
token = self._create_token(valid_payload, jwt_secret)
|
||
|
|
authorization = f"Bearer {token}"
|
||
|
|
|
||
|
|
result = get_current_user(authorization=authorization)
|
||
|
|
|
||
|
|
assert isinstance(result, CurrentUser)
|
||
|
|
assert result.id == UUID(valid_user_id)
|
||
|
|
|
||
|
|
def test_missing_authorization_raises_401(self) -> None:
|
||
|
|
"""Missing Authorization header should raise 401."""
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization=None)
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|
||
|
|
assert exc_info.value.detail == "Unauthorized"
|
||
|
|
|
||
|
|
def test_invalid_scheme_raises_401(self) -> None:
|
||
|
|
"""Non-Bearer scheme should raise 401."""
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization="Basic dXNlcjpwYXNz")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|
||
|
|
|
||
|
|
def test_expired_token_raises_401(
|
||
|
|
self,
|
||
|
|
jwt_secret: str,
|
||
|
|
valid_payload: dict[str, Any],
|
||
|
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
|
) -> None:
|
||
|
|
"""Expired JWT should raise 401."""
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.jwt_secret", jwt_secret
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_scheme",
|
||
|
|
"http",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_host",
|
||
|
|
"localhost",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.kong_http_port",
|
||
|
|
8001,
|
||
|
|
)
|
||
|
|
|
||
|
|
valid_payload["exp"] = int(time.time()) - 3600 # 1 hour ago
|
||
|
|
token = self._create_token(valid_payload, jwt_secret)
|
||
|
|
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization=f"Bearer {token}")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|
||
|
|
|
||
|
|
def test_invalid_audience_raises_401(
|
||
|
|
self,
|
||
|
|
jwt_secret: str,
|
||
|
|
valid_payload: dict[str, Any],
|
||
|
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
|
) -> None:
|
||
|
|
"""JWT with wrong audience should raise 401."""
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.jwt_secret", jwt_secret
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_scheme",
|
||
|
|
"http",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_host",
|
||
|
|
"localhost",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.kong_http_port",
|
||
|
|
8001,
|
||
|
|
)
|
||
|
|
|
||
|
|
valid_payload["aud"] = "wrong-audience"
|
||
|
|
token = self._create_token(valid_payload, jwt_secret)
|
||
|
|
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization=f"Bearer {token}")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|
||
|
|
|
||
|
|
def test_invalid_issuer_raises_401(
|
||
|
|
self,
|
||
|
|
jwt_secret: str,
|
||
|
|
valid_payload: dict[str, Any],
|
||
|
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
|
) -> None:
|
||
|
|
"""JWT with wrong issuer should raise 401."""
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.jwt_secret", jwt_secret
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_scheme",
|
||
|
|
"http",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_host",
|
||
|
|
"localhost",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.kong_http_port",
|
||
|
|
8001,
|
||
|
|
)
|
||
|
|
|
||
|
|
valid_payload["iss"] = "http://malicious-site.com/auth/v1"
|
||
|
|
token = self._create_token(valid_payload, jwt_secret)
|
||
|
|
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization=f"Bearer {token}")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|
||
|
|
|
||
|
|
def test_missing_subject_raises_401(
|
||
|
|
self,
|
||
|
|
jwt_secret: str,
|
||
|
|
valid_payload: dict[str, Any],
|
||
|
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
|
) -> None:
|
||
|
|
"""JWT without 'sub' claim should raise 401."""
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.jwt_secret", jwt_secret
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_scheme",
|
||
|
|
"http",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_host",
|
||
|
|
"localhost",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.kong_http_port",
|
||
|
|
8001,
|
||
|
|
)
|
||
|
|
|
||
|
|
del valid_payload["sub"]
|
||
|
|
token = self._create_token(valid_payload, jwt_secret)
|
||
|
|
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization=f"Bearer {token}")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|
||
|
|
|
||
|
|
def test_wrong_secret_raises_401(
|
||
|
|
self,
|
||
|
|
jwt_secret: str,
|
||
|
|
valid_payload: dict[str, Any],
|
||
|
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
|
) -> None:
|
||
|
|
"""JWT signed with wrong secret should raise 401."""
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.jwt_secret", jwt_secret
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_scheme",
|
||
|
|
"http",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_host",
|
||
|
|
"localhost",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.kong_http_port",
|
||
|
|
8001,
|
||
|
|
)
|
||
|
|
|
||
|
|
token = self._create_token(
|
||
|
|
valid_payload, "wrong-secret-key-that-is-long-enough"
|
||
|
|
)
|
||
|
|
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization=f"Bearer {token}")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|
||
|
|
|
||
|
|
def test_jwt_secret_not_configured_raises_503(
|
||
|
|
self, valid_payload: dict[str, Any], monkeypatch: pytest.MonkeyPatch
|
||
|
|
) -> None:
|
||
|
|
"""Missing JWT secret in config should raise 503."""
|
||
|
|
monkeypatch.setattr("v1.profile.dependencies.config.supabase.jwt_secret", None)
|
||
|
|
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization="Bearer some-token")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 503
|
||
|
|
assert exc_info.value.detail == "JWT secret not configured"
|
||
|
|
|
||
|
|
def test_invalid_uuid_in_subject_raises_401(
|
||
|
|
self,
|
||
|
|
jwt_secret: str,
|
||
|
|
valid_payload: dict[str, Any],
|
||
|
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
|
) -> None:
|
||
|
|
"""JWT with non-UUID 'sub' claim should raise 401."""
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.jwt_secret", jwt_secret
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_scheme",
|
||
|
|
"http",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.public_host",
|
||
|
|
"localhost",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
"v1.profile.dependencies.config.supabase.kong_http_port",
|
||
|
|
8001,
|
||
|
|
)
|
||
|
|
|
||
|
|
valid_payload["sub"] = "not-a-valid-uuid"
|
||
|
|
token = self._create_token(valid_payload, jwt_secret)
|
||
|
|
|
||
|
|
with pytest.raises(HTTPException) as exc_info:
|
||
|
|
get_current_user(authorization=f"Bearer {token}")
|
||
|
|
|
||
|
|
assert exc_info.value.status_code == 401
|