96 lines
3.7 KiB
Python
96 lines
3.7 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from typing import Annotated
|
||
|
|
from uuid import UUID
|
||
|
|
|
||
|
|
import jwt
|
||
|
|
from fastapi import Depends, Header, HTTPException
|
||
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
|
|
||
|
|
from core.auth.models import CurrentUser
|
||
|
|
from core.config.settings import config
|
||
|
|
from core.db import get_db
|
||
|
|
from core.logging import get_logger
|
||
|
|
from v1.users.repository import SQLAlchemyUserRepository
|
||
|
|
from v1.users.service import UserService
|
||
|
|
|
||
|
|
logger = get_logger("v1.users.dependencies")
|
||
|
|
|
||
|
|
|
||
|
|
def get_current_user(authorization: str | None = Header(default=None)) -> CurrentUser:
|
||
|
|
if not authorization:
|
||
|
|
logger.warning("JWT validation failed: missing authorization header")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
|
||
|
|
scheme, _, token = authorization.partition(" ")
|
||
|
|
if scheme.lower() != "bearer" or not token:
|
||
|
|
logger.warning("JWT validation failed: invalid authorization scheme")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
|
||
|
|
secret = config.supabase.jwt_secret
|
||
|
|
if not secret:
|
||
|
|
logger.error("JWT validation failed: secret not configured")
|
||
|
|
raise HTTPException(status_code=503, detail="JWT secret not configured")
|
||
|
|
|
||
|
|
supabase_url = config.supabase.public_url.rstrip("/")
|
||
|
|
expected_issuer = f"{supabase_url}/auth/v1"
|
||
|
|
|
||
|
|
try:
|
||
|
|
payload = jwt.decode(
|
||
|
|
token,
|
||
|
|
secret,
|
||
|
|
algorithms=["HS256"],
|
||
|
|
audience="authenticated",
|
||
|
|
issuer=expected_issuer,
|
||
|
|
options={
|
||
|
|
"verify_aud": True,
|
||
|
|
"verify_iss": True,
|
||
|
|
"verify_exp": True,
|
||
|
|
"require": ["sub", "aud", "iss", "exp"],
|
||
|
|
},
|
||
|
|
)
|
||
|
|
except jwt.ExpiredSignatureError:
|
||
|
|
logger.warning("JWT validation failed: token expired")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
except jwt.InvalidAudienceError:
|
||
|
|
logger.warning("JWT validation failed: invalid audience")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
except jwt.InvalidIssuerError:
|
||
|
|
logger.warning("JWT validation failed: invalid issuer")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
except jwt.InvalidSignatureError:
|
||
|
|
logger.warning("JWT validation failed: invalid signature")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
except jwt.DecodeError:
|
||
|
|
logger.warning("JWT validation failed: malformed token")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
except jwt.PyJWTError as exc:
|
||
|
|
logger.warning(
|
||
|
|
"JWT validation failed: unknown error", error_type=type(exc).__name__
|
||
|
|
)
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized") from exc
|
||
|
|
|
||
|
|
subject = payload.get("sub")
|
||
|
|
if not isinstance(subject, str) or not subject:
|
||
|
|
logger.warning("JWT validation failed: missing or invalid subject claim")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
|
||
|
|
try:
|
||
|
|
user_id = UUID(subject)
|
||
|
|
except ValueError:
|
||
|
|
logger.warning("JWT validation failed: invalid UUID in subject")
|
||
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
|
|
||
|
|
logger.debug("JWT validation successful", user_id=str(user_id))
|
||
|
|
email = payload.get("email") if isinstance(payload.get("email"), str) else None
|
||
|
|
role = payload.get("role") if isinstance(payload.get("role"), str) else None
|
||
|
|
return CurrentUser(id=user_id, email=email, role=role)
|
||
|
|
|
||
|
|
|
||
|
|
def get_user_service(
|
||
|
|
session: Annotated[AsyncSession, Depends(get_db)],
|
||
|
|
user: Annotated[CurrentUser, Depends(get_current_user)],
|
||
|
|
) -> UserService:
|
||
|
|
repository = SQLAlchemyUserRepository(session)
|
||
|
|
return UserService(repository=repository, session=session, current_user=user)
|