from __future__ import annotations from dataclasses import dataclass from decimal import Decimal from uuid import UUID, uuid4 import pytest from core.config.settings import config from schemas.domain.points import AppendAuditLedgerCommand, ApplyPointsChangeCommand from schemas.domain.points import PointsChargeSnapshot from v1.points.service import PointsService @dataclass class _FakeAccount: balance: int = 100 frozen_balance: int = 0 lifetime_earned: int = 0 lifetime_spent: int = 0 version: int = 0 class _FakePointsRepository: def __init__(self, *, usage_snapshot: PointsChargeSnapshot | None) -> None: self.account = _FakeAccount() self.usage_snapshot = usage_snapshot self.appended_ledger: list[ApplyPointsChangeCommand] = [] self.appended_audit: list[AppendAuditLedgerCommand] = [] self.claimed: bool = False async def get_or_create_user_points_for_update( self, *, user_id: UUID ) -> _FakeAccount: del user_id return self.account async def has_ledger_event(self, *, user_id: UUID, event_id: str) -> bool: del user_id, event_id return False async def append_ledger( self, *, command: ApplyPointsChangeCommand, balance_after: int, ) -> None: del balance_after self.appended_ledger.append(command) async def append_audit_ledger(self, *, command: AppendAuditLedgerCommand) -> None: self.appended_audit.append(command) async def has_audit_event(self, *, event_id: str) -> bool: del event_id return False async def get_run_usage_snapshot( self, *, session_id: UUID, run_id: str, ) -> PointsChargeSnapshot | None: del session_id, run_id return self.usage_snapshot async def claim_register_bonus( self, *, email_hash: str, user_email_snapshot: str, first_user_id: UUID, grant_event_id: str, ) -> bool: del email_hash, user_email_snapshot, first_user_id, grant_event_id if self.claimed: return False self.claimed = True return True @pytest.mark.asyncio async def test_consume_successful_run_points_writes_real_usage_to_audit() -> None: usage = PointsChargeSnapshot( message_id=uuid4(), message_seq=3, model_code="doubao-1.5-pro", input_tokens=123, output_tokens=456, cost=Decimal("0.023456"), ) repository = _FakePointsRepository(usage_snapshot=usage) service = PointsService(repository=repository) # type: ignore[arg-type] user_id = uuid4() session_id = uuid4() run_id = "run_123" result = await service.consume_successful_run_points( user_id=user_id, session_id=session_id, run_id=run_id, operator_id=user_id, user_email="User@Example.com", ) assert result.charged is True assert result.amount == 20 assert repository.account.balance == 80 assert len(repository.appended_ledger) == 1 assert len(repository.appended_audit) == 1 audit = repository.appended_audit[0] assert audit.billed_to == "user" assert audit.input_tokens == 123 assert audit.output_tokens == 456 assert audit.cost == Decimal("0.023456") assert audit.direction == -1 assert audit.amount == 20 assert audit.user_email_snapshot == "user@example.com" @pytest.mark.asyncio async def test_record_failed_run_platform_cost_writes_platform_audit_only() -> None: usage = PointsChargeSnapshot( message_id=uuid4(), message_seq=1, model_code="doubao-1.5-pro", input_tokens=100, output_tokens=20, cost=Decimal("0.012300"), ) repository = _FakePointsRepository(usage_snapshot=usage) service = PointsService(repository=repository) # type: ignore[arg-type] result = await service.record_failed_run_platform_cost( user_id=uuid4(), session_id=uuid4(), run_id="run_failed", operator_id=None, user_email="test@example.com", failure_kind="failed", ) assert result.audited is True assert result.cost == Decimal("0.012300") assert len(repository.appended_ledger) == 0 assert len(repository.appended_audit) == 1 audit = repository.appended_audit[0] assert audit.billed_to == "platform" assert audit.direction == 0 assert audit.amount == 0 assert audit.cost == Decimal("0.012300") @pytest.mark.asyncio async def test_grant_register_bonus_if_eligible_first_time_grants() -> None: repository = _FakePointsRepository(usage_snapshot=None) service = PointsService(repository=repository) # type: ignore[arg-type] result = await service.grant_register_bonus_if_eligible( user_id=uuid4(), user_email="NewUser@Example.com", ) expected_bonus = int(config.points_policy.register_bonus_points) assert result.granted is True assert result.amount == expected_bonus assert repository.account.balance == 100 + expected_bonus assert len(repository.appended_ledger) == 1 assert len(repository.appended_audit) == 1 assert repository.appended_audit[0].billed_to == "user" assert repository.appended_audit[0].change_type.value == "register" @pytest.mark.asyncio async def test_grant_register_bonus_if_eligible_second_time_skips() -> None: repository = _FakePointsRepository(usage_snapshot=None) repository.claimed = True service = PointsService(repository=repository) # type: ignore[arg-type] result = await service.grant_register_bonus_if_eligible( user_id=uuid4(), user_email="dup@example.com", ) assert result.granted is False assert result.amount == 0 assert len(repository.appended_ledger) == 0 assert len(repository.appended_audit) == 0