refactor: unify skills+cli runtime and streamline ag-ui flow
This commit is contained in:
@@ -0,0 +1,143 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from uuid import uuid4
|
||||
|
||||
import jwt
|
||||
import pytest
|
||||
|
||||
from core.auth.credential_issuer import ToolCredentialIssuer
|
||||
from core.auth.jwt_verifier import TokenValidationError
|
||||
|
||||
|
||||
_ISSUER = "https://example.com/auth/v1"
|
||||
_SECRET = "test-secret-key-for-testing-only"
|
||||
_ALGORITHM = "HS256"
|
||||
_TTL = 600
|
||||
|
||||
|
||||
def _make_issuer(**overrides) -> ToolCredentialIssuer:
|
||||
kwargs = {
|
||||
"jwt_secret": _SECRET,
|
||||
"jwt_algorithm": _ALGORITHM,
|
||||
"jwt_issuer": _ISSUER,
|
||||
"ttl_seconds": _TTL,
|
||||
}
|
||||
kwargs.update(overrides)
|
||||
return ToolCredentialIssuer(**kwargs)
|
||||
|
||||
|
||||
class TestToolCredentialIssuerIssue:
|
||||
def test_issue_returns_valid_jwt(self) -> None:
|
||||
issuer = _make_issuer()
|
||||
owner_id = str(uuid4())
|
||||
token = issuer.issue(owner_id=owner_id, mode="chat")
|
||||
assert isinstance(token, str)
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
_SECRET,
|
||||
algorithms=[_ALGORITHM],
|
||||
audience="agent-tool-runtime",
|
||||
)
|
||||
assert payload["sub"] == owner_id
|
||||
assert payload["aud"] == "agent-tool-runtime"
|
||||
assert payload["iss"] == _ISSUER
|
||||
assert payload["purpose"] == "agent_tool_runtime"
|
||||
assert payload["mode"] == "chat"
|
||||
assert "exp" in payload
|
||||
assert "iat" in payload
|
||||
|
||||
def test_issue_defaults_mode_to_chat(self) -> None:
|
||||
issuer = _make_issuer()
|
||||
token = issuer.issue(owner_id=str(uuid4()))
|
||||
payload = jwt.decode(token, _SECRET, algorithms=[_ALGORITHM], audience="agent-tool-runtime")
|
||||
assert payload["mode"] == "chat"
|
||||
|
||||
def test_issue_automation_mode(self) -> None:
|
||||
issuer = _make_issuer()
|
||||
token = issuer.issue(owner_id=str(uuid4()), mode="automation")
|
||||
payload = jwt.decode(token, _SECRET, algorithms=[_ALGORITHM], audience="agent-tool-runtime")
|
||||
assert payload["mode"] == "automation"
|
||||
|
||||
def test_issue_rejects_unsupported_algorithm(self) -> None:
|
||||
with pytest.raises(TokenValidationError, match="Unsupported"):
|
||||
_make_issuer(jwt_algorithm="RS256")
|
||||
|
||||
|
||||
class TestToolCredentialIssuerVerify:
|
||||
def test_verify_returns_claims_for_valid_token(self) -> None:
|
||||
issuer = _make_issuer()
|
||||
owner_id = str(uuid4())
|
||||
token = issuer.issue(owner_id=owner_id, mode="chat")
|
||||
claims = issuer.verify(token)
|
||||
assert claims["sub"] == owner_id
|
||||
assert claims["purpose"] == "agent_tool_runtime"
|
||||
|
||||
def test_verify_rejects_expired_token(self) -> None:
|
||||
issuer = _make_issuer(ttl_seconds=0)
|
||||
token = issuer.issue(owner_id=str(uuid4()))
|
||||
time.sleep(1)
|
||||
with pytest.raises(TokenValidationError, match="expired"):
|
||||
issuer.verify(token)
|
||||
|
||||
def test_verify_rejects_wrong_secret(self) -> None:
|
||||
issuer = _make_issuer()
|
||||
token = issuer.issue(owner_id=str(uuid4()))
|
||||
wrong_issuer = _make_issuer(jwt_secret="wrong-secret")
|
||||
with pytest.raises(TokenValidationError, match="signature"):
|
||||
wrong_issuer.verify(token)
|
||||
|
||||
def test_verify_rejects_wrong_audience(self) -> None:
|
||||
payload = {
|
||||
"sub": str(uuid4()),
|
||||
"aud": "wrong-audience",
|
||||
"iss": _ISSUER,
|
||||
"purpose": "agent_tool_runtime",
|
||||
"exp": int(time.time()) + 600,
|
||||
"iat": int(time.time()),
|
||||
}
|
||||
token = jwt.encode(payload, _SECRET, algorithm=_ALGORITHM)
|
||||
issuer = _make_issuer()
|
||||
with pytest.raises(TokenValidationError):
|
||||
issuer.verify(token)
|
||||
|
||||
def test_verify_rejects_wrong_purpose(self) -> None:
|
||||
payload = {
|
||||
"sub": str(uuid4()),
|
||||
"aud": "agent-tool-runtime",
|
||||
"iss": _ISSUER,
|
||||
"purpose": "wrong_purpose",
|
||||
"exp": int(time.time()) + 600,
|
||||
"iat": int(time.time()),
|
||||
}
|
||||
token = jwt.encode(payload, _SECRET, algorithm=_ALGORITHM)
|
||||
issuer = _make_issuer()
|
||||
with pytest.raises(TokenValidationError, match="purpose"):
|
||||
issuer.verify(token)
|
||||
|
||||
def test_verify_rejects_wrong_issuer(self) -> None:
|
||||
issuer_a = _make_issuer()
|
||||
token = issuer_a.issue(owner_id=str(uuid4()))
|
||||
issuer_b = _make_issuer(jwt_issuer="https://other.example.com/auth/v1")
|
||||
with pytest.raises(TokenValidationError, match="issuer"):
|
||||
issuer_b.verify(token)
|
||||
|
||||
def test_verify_rejects_malformed_token(self) -> None:
|
||||
issuer = _make_issuer()
|
||||
with pytest.raises(TokenValidationError, match="decode"):
|
||||
issuer.verify("not-a-valid-jwt")
|
||||
|
||||
|
||||
class TestToolCredentialContext:
|
||||
def test_set_and_get_credential(self) -> None:
|
||||
from core.auth.tool_credential_context import (
|
||||
set_tool_credential,
|
||||
reset_tool_credential,
|
||||
get_tool_credential,
|
||||
)
|
||||
|
||||
assert get_tool_credential() is None
|
||||
token = set_tool_credential("test-credential")
|
||||
assert get_tool_credential() == "test-credential"
|
||||
reset_tool_credential(token)
|
||||
assert get_tool_credential() is None
|
||||
Reference in New Issue
Block a user