fix: 后端 JWT 验证改为 HS256 方式提升认证可靠性

This commit is contained in:
qzl
2026-03-10 17:43:55 +08:00
parent 5d839192ab
commit 95d6927724
4 changed files with 177 additions and 310 deletions
+137 -265
View File
@@ -1,8 +1,6 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from types import SimpleNamespace
from typing import Any, cast
from uuid import uuid4
import jwt
@@ -10,300 +8,174 @@ import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from core.auth.jwt_verifier import (
JwtVerifier,
TokenValidationError,
TokenVerifierUnavailableError,
)
from core.auth.jwt_verifier import JwtVerifier, TokenValidationError
def test_jwks_client_uses_supabase_auth_headers(
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, Any] = {}
class _FakePyJWKClient:
def __init__(
self,
uri: str,
*,
headers: dict[str, Any] | None = None,
**_: Any,
) -> None:
captured["uri"] = uri
captured["headers"] = headers
monkeypatch.setattr("core.auth.jwt_verifier.jwt.PyJWKClient", _FakePyJWKClient)
JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer="https://example.supabase.co/auth/v1",
audience="authenticated",
apikey="anon-key-value",
)
assert (
captured["uri"] == "https://example.supabase.co/auth/v1/.well-known/jwks.json"
)
assert captured["headers"] == {
"apikey": "anon-key-value",
"Authorization": "Bearer anon-key-value",
def _build_hs256_token(
*,
secret: str,
sub: str,
issuer: str | None = None,
audience: str | None = "authenticated",
) -> str:
now = datetime.now(UTC)
payload = {
"sub": sub,
"exp": now + timedelta(minutes=5),
}
if audience is not None:
payload["aud"] = audience
if issuer is not None:
payload["iss"] = issuer
return jwt.encode(payload, secret, algorithm="HS256")
def _set_jwks_client(verifier: JwtVerifier, client: Any) -> None:
cast(Any, verifier)._jwks_client = client
def _build_rsa_key_pair() -> tuple[str, str]:
def _build_rs256_token(
*, sub: str, issuer: str, audience: str = "authenticated"
) -> str:
now = datetime.now(UTC)
payload = {
"sub": sub,
"iss": issuer,
"aud": audience,
"exp": now + timedelta(minutes=5),
}
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
public_pem = (
private_key.public_key()
.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
.decode("utf-8")
)
return private_pem, public_pem
return jwt.encode(payload, private_pem, algorithm="RS256", headers={"kid": "kid-1"})
def _build_token(*, private_key: str, sub: str, audience: str, issuer: str) -> str:
now = datetime.now(UTC)
payload = {
"sub": sub,
"aud": audience,
"iss": issuer,
"exp": now + timedelta(minutes=5),
}
return jwt.encode(payload, private_key, algorithm="RS256", headers={"kid": "kid-1"})
def _build_expired_token(
*, private_key: str, sub: str, audience: str, issuer: str
) -> str:
now = datetime.now(UTC)
payload = {
"sub": sub,
"aud": audience,
"iss": issuer,
"exp": now - timedelta(minutes=1),
}
return jwt.encode(payload, private_key, algorithm="RS256", headers={"kid": "kid-1"})
def _build_hs256_token(*, secret: str, sub: str, audience: str, issuer: str) -> str:
now = datetime.now(UTC)
payload = {
"sub": sub,
"aud": audience,
"iss": issuer,
"exp": now + timedelta(minutes=5),
}
return jwt.encode(payload, secret, algorithm="HS256", headers={"kid": "kid-1"})
def test_verify_token_with_jwks_success() -> None:
user_id = uuid4()
audience = "authenticated"
issuer = "https://example.supabase.co/auth/v1"
private_key, public_key = _build_rsa_key_pair()
token = _build_token(
private_key=private_key,
sub=str(user_id),
audience=audience,
issuer=issuer,
)
def test_verify_hs256_token_success() -> None:
verifier = JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer=issuer,
audience=audience,
apikey="anon-key",
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="HS256",
)
_set_jwks_client(
verifier,
SimpleNamespace(
get_signing_key_from_jwt=lambda _: SimpleNamespace(key=public_key)
),
token = _build_hs256_token(
secret="test-secret",
sub=str(uuid4()),
issuer="https://example.supabase.co/auth/v1",
)
claims = verifier.verify(token)
assert claims["sub"] == str(user_id)
assert "sub" in claims
def test_verify_token_rejects_invalid_issuer() -> None:
audience = "authenticated"
issuer = "https://example.supabase.co/auth/v1"
private_key, public_key = _build_rsa_key_pair()
token_with_wrong_iss = _build_token(
private_key=private_key,
def test_verify_rejects_invalid_issuer() -> None:
verifier = JwtVerifier(
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="HS256",
)
token = _build_hs256_token(
secret="test-secret",
sub=str(uuid4()),
audience=audience,
issuer="https://wrong-issuer.example.com/auth/v1",
)
verifier = JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer=issuer,
audience=audience,
apikey="anon-key",
)
_set_jwks_client(
verifier,
SimpleNamespace(
get_signing_key_from_jwt=lambda _: SimpleNamespace(key=public_key)
),
)
with pytest.raises(TokenValidationError):
verifier.verify(token_with_wrong_iss)
verifier.verify(token)
def test_verify_token_rejects_hs256_token() -> None:
audience = "authenticated"
issuer = "https://example.supabase.co/auth/v1"
_, public_key = _build_rsa_key_pair()
hs_token = _build_hs256_token(
def test_verify_rejects_missing_audience() -> None:
verifier = JwtVerifier(
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="HS256",
)
token = _build_hs256_token(
secret="test-secret",
sub=str(uuid4()),
audience=audience,
issuer=issuer,
)
verifier = JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer=issuer,
audience=audience,
apikey="anon-key",
)
_set_jwks_client(
verifier,
SimpleNamespace(
get_signing_key_from_jwt=lambda _: SimpleNamespace(key=public_key)
),
audience=None,
)
with pytest.raises(TokenValidationError):
verifier.verify(hs_token)
def test_verify_token_rejects_expired_token() -> None:
audience = "authenticated"
issuer = "https://example.supabase.co/auth/v1"
private_key, public_key = _build_rsa_key_pair()
expired_token = _build_expired_token(
private_key=private_key,
sub=str(uuid4()),
audience=audience,
issuer=issuer,
)
verifier = JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer=issuer,
audience=audience,
apikey="anon-key",
)
_set_jwks_client(
verifier,
SimpleNamespace(
get_signing_key_from_jwt=lambda _: SimpleNamespace(key=public_key)
),
)
with pytest.raises(TokenValidationError):
verifier.verify(expired_token)
def test_verify_token_rejects_invalid_audience() -> None:
audience = "authenticated"
issuer = "https://example.supabase.co/auth/v1"
private_key, public_key = _build_rsa_key_pair()
wrong_aud_token = _build_token(
private_key=private_key,
sub=str(uuid4()),
audience="anon",
issuer=issuer,
)
verifier = JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer=issuer,
audience=audience,
apikey="anon-key",
)
_set_jwks_client(
verifier,
SimpleNamespace(
get_signing_key_from_jwt=lambda _: SimpleNamespace(key=public_key)
),
)
with pytest.raises(TokenValidationError):
verifier.verify(wrong_aud_token)
def test_verify_token_rejects_invalid_signature() -> None:
audience = "authenticated"
issuer = "https://example.supabase.co/auth/v1"
private_key, public_key = _build_rsa_key_pair()
valid_token = _build_token(
private_key=private_key,
sub=str(uuid4()),
audience=audience,
issuer=issuer,
)
tampered_token = f"{valid_token}x"
verifier = JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer=issuer,
audience=audience,
apikey="anon-key",
)
_set_jwks_client(
verifier,
SimpleNamespace(
get_signing_key_from_jwt=lambda _: SimpleNamespace(key=public_key)
),
)
with pytest.raises(TokenValidationError):
verifier.verify(tampered_token)
def test_verify_token_maps_jwks_connection_error() -> None:
audience = "authenticated"
issuer = "https://example.supabase.co/auth/v1"
private_key, _ = _build_rsa_key_pair()
token = _build_token(
private_key=private_key,
sub=str(uuid4()),
audience=audience,
issuer=issuer,
)
verifier = JwtVerifier(
jwks_url="https://example.supabase.co/auth/v1/.well-known/jwks.json",
issuer=issuer,
audience=audience,
apikey="anon-key",
)
def _raise_connection_error(_: str) -> SimpleNamespace:
raise jwt.PyJWKClientConnectionError("network down")
_set_jwks_client(
verifier,
SimpleNamespace(get_signing_key_from_jwt=_raise_connection_error),
)
with pytest.raises(TokenVerifierUnavailableError):
verifier.verify(token)
def test_verify_accepts_token_without_issuer_claim() -> None:
verifier = JwtVerifier(
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="HS256",
)
token = _build_hs256_token(
secret="test-secret",
sub=str(uuid4()),
)
claims = verifier.verify(token)
assert "sub" in claims
def test_verify_accepts_list_audience() -> None:
verifier = JwtVerifier(
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="HS256",
)
token = jwt.encode(
{
"sub": str(uuid4()),
"aud": ["anonymous", "authenticated"],
"exp": datetime.now(UTC) + timedelta(minutes=5),
},
"test-secret",
algorithm="HS256",
)
claims = verifier.verify(token)
assert claims["aud"] == ["anonymous", "authenticated"]
def test_verify_rejects_rs256_token() -> None:
verifier = JwtVerifier(
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="HS256",
)
token = _build_rs256_token(
sub=str(uuid4()),
issuer="https://example.supabase.co/auth/v1",
)
with pytest.raises(TokenValidationError):
verifier.verify(token)
def test_verify_rejects_expired_token() -> None:
verifier = JwtVerifier(
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="HS256",
)
now = datetime.now(UTC)
token = jwt.encode(
{
"sub": str(uuid4()),
"iss": "https://example.supabase.co/auth/v1",
"aud": "authenticated",
"exp": now - timedelta(minutes=1),
},
"test-secret",
algorithm="HS256",
)
with pytest.raises(TokenValidationError):
verifier.verify(token)
def test_verify_rejects_unsupported_algorithm_setting() -> None:
with pytest.raises(TokenValidationError):
JwtVerifier(
issuer="https://example.supabase.co/auth/v1",
jwt_secret="test-secret",
jwt_algorithm="RS256",
)