Files
social-app/backend/src/core/auth/jwt_verifier.py
T
qzl 6fe2e7b6c3 refactor: 迁移本地 Supabase 到云端,使用 JWKS 进行 JWT 验证
- 新增 JwtVerifier 支持 RS256 + JWKS 验证
- 简化 docker-compose,删除本地 Supabase 服务(kong/auth/storage等)
- 删除冗余的 Supabase 配置文件(volumes目录)
- 适配测试用例以支持新配置方式
- 更新运行时文档和迁移计划
2026-03-09 18:03:04 +08:00

53 lines
1.6 KiB
Python

from __future__ import annotations
from typing import Any, cast
import jwt
class TokenValidationError(Exception):
pass
class TokenVerifierUnavailableError(Exception):
pass
class JwtVerifier:
def __init__(self, jwks_url: str, issuer: str, audience: str) -> None:
self._issuer: str = issuer
self._audience: str = audience
self._jwks_client: jwt.PyJWKClient = jwt.PyJWKClient(jwks_url)
def verify(self, token: str) -> dict[str, Any]:
try:
key = self._jwks_client.get_signing_key_from_jwt(token)
except jwt.PyJWKClientConnectionError as exc:
raise TokenVerifierUnavailableError("Unable to fetch JWKS") from exc
except jwt.PyJWKClientError as exc:
raise TokenValidationError("Unable to resolve signing key") from exc
try:
payload = jwt.decode(
token,
key.key,
algorithms=["RS256"],
audience=self._audience,
issuer=self._issuer,
options={"require": ["sub", "aud", "iss", "exp"]},
)
except (
jwt.ExpiredSignatureError,
jwt.InvalidAudienceError,
jwt.InvalidIssuerError,
jwt.InvalidSignatureError,
jwt.DecodeError,
jwt.PyJWTError,
) as exc:
raise TokenValidationError("Token validation failed") from exc
if not isinstance(payload, dict):
raise TokenValidationError("Token payload must be a JSON object")
return cast(dict[str, Any], payload)