from __future__ import annotations import hashlib import hmac import json import logging from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from uuid import UUID, uuid4 import yaml from core.config.settings import config from core.http.errors import ApiProblemError, problem_payload from models.creem_transaction import CreemTransaction from schemas.domain.points import ( ApplyPointsChangeCommand, PurchaseLedgerMetadata, ) from schemas.enums import PointsBizType, PointsChangeType, PointsOperatorType from v1.payments.creem_client import CreemClient, CreemProduct from v1.payments.repository import PaymentRepository from v1.points.invite_rewards import grant_invite_rewards_for_creem_payment from v1.points.repository import PointsRepository logger = logging.getLogger(__name__) @dataclass(frozen=True) class CreemProductMapping: creem_product_id: str credits: int type: str sort_order: int = 0 enabled: bool = True _creem_product_mappings_cache: dict[str, CreemProductMapping] | None = None def _load_creem_product_mappings() -> dict[str, CreemProductMapping]: global _creem_product_mappings_cache if _creem_product_mappings_cache is not None: return _creem_product_mappings_cache mapping_path = ( Path(__file__).parent.parent.parent / "core/config/static/packages/mapping.yaml" ) with mapping_path.open("r", encoding="utf-8") as f: raw: Any = yaml.safe_load(f) or {} mappings: dict[str, CreemProductMapping] = {} product_mappings: Any = raw.get("product_mappings", {}) for code, entry in product_mappings.items(): if entry.get("creem_product_id"): mappings[str(code)] = CreemProductMapping( creem_product_id=str(entry["creem_product_id"]), credits=int(entry["credits"]), type=str(entry["type"]), sort_order=int(entry.get("sort_order", 0)), enabled=bool(entry.get("enabled", True)), ) _creem_product_mappings_cache = mappings return mappings def clear_creem_product_mappings_cache() -> None: global _creem_product_mappings_cache _creem_product_mappings_cache = None @dataclass(frozen=True) class PackageWithPrice: product_code: str creem_product_id: str credits: int type: str sort_order: int price_cents: int currency: str @dataclass(frozen=True) class CreateCheckoutResult: checkout_id: str checkout_url: str class CreemService: def __init__( self, *, payment_repo: PaymentRepository, points_repo: PointsRepository, client: CreemClient, ) -> None: self._payment_repo: PaymentRepository = payment_repo self._points_repo: PointsRepository = points_repo self._client: CreemClient = client async def get_packages_with_prices(self) -> list[PackageWithPrice]: """Get all packages with dynamic prices from CREEM API.""" mappings = _load_creem_product_mappings() products = await self._client.get_products() product_by_id: dict[str, CreemProduct] = {p.product_id: p for p in products} result: list[PackageWithPrice] = [] for code, mapping in mappings.items(): if not mapping.enabled: continue product = product_by_id.get(mapping.creem_product_id) if product is None: logger.warning( "CREEM product not found: code=%s product_id=%s", code, mapping.creem_product_id, ) continue result.append( PackageWithPrice( product_code=code, creem_product_id=mapping.creem_product_id, credits=mapping.credits, type=mapping.type, sort_order=mapping.sort_order, price_cents=product.price_cents, currency=product.currency, ) ) result.sort(key=lambda p: p.sort_order) return result async def create_checkout( self, *, user_id: UUID, user_email: str, product_code: str, ) -> CreateCheckoutResult: """Create a CREEM checkout session.""" mappings = _load_creem_product_mappings() mapping = mappings.get(product_code) if mapping is None: raise ApiProblemError( status_code=404, detail=problem_payload( code="PAYMENT_PRODUCT_NOT_FOUND", detail=f"Product not found: {product_code}", ), ) is_starter = mapping.type == "starter" normalized_email = user_email.strip().lower() email_hash = ( self._build_email_hash(normalized_email) if normalized_email else None ) if is_starter: if not email_hash: raise ApiProblemError( status_code=422, detail=problem_payload( code="PAYMENT_STARTER_PACK_INELIGIBLE", detail="Email required for starter pack purchase", ), ) claim = await self._payment_repo.get_register_bonus_claim( email_hash=email_hash ) if claim is not None and claim.has_purchased_starter_pack: raise ApiProblemError( status_code=409, detail=problem_payload( code="PAYMENT_STARTER_PACK_INELIGIBLE", detail="Starter pack already purchased for this email", ), ) product = await self._client.get_product(mapping.creem_product_id) if product is None: raise ApiProblemError( status_code=404, detail=problem_payload( code="PAYMENT_PRODUCT_NOT_FOUND", detail=f"CREEM product not found: {mapping.creem_product_id}", ), ) success_url = config.creem.success_url checkout = await self._client.create_checkout( product_id=mapping.creem_product_id, success_url=success_url, customer_email=normalized_email or None, metadata={ "user_id": str(user_id), "product_code": product_code, }, ) transaction = CreemTransaction( id=uuid4(), user_id=user_id, product_code=product_code, creem_product_id=mapping.creem_product_id, checkout_id=checkout.checkout_id, status="pending", credits=mapping.credits, amount_cents=product.price_cents, currency=product.currency, creem_payload={"checkout_url": checkout.checkout_url}, ) await self._payment_repo.insert_creem_transaction(transaction=transaction) await self._payment_repo.commit() logger.info( "CREEM checkout created: user_id=%s product_code=%s checkout_id=%s", user_id, product_code, checkout.checkout_id, ) return CreateCheckoutResult( checkout_id=checkout.checkout_id, checkout_url=checkout.checkout_url, ) async def handle_webhook( self, *, payload: bytes, signature: str, ) -> None: """Handle CREEM webhook notification.""" settings = config.creem secret = settings.webhook_secret if secret is None: logger.error("CREEM webhook_secret not configured") return secret_value = secret.get_secret_value() if not CreemClient.verify_webhook_signature(payload, signature, secret_value): logger.warning("CREEM webhook signature verification failed") return try: event: Any = json.loads(payload) except json.JSONDecodeError: logger.warning("CREEM webhook payload is not valid JSON") return event_type = event.get("eventType", "") obj = event.get("object", {}) if event_type == "checkout.completed": await self._handle_checkout_completed(obj) async def _handle_checkout_completed(self, obj: dict[str, Any]) -> None: # CREEM webhook structure: checkout_id is in "id", order_id in "order.id", customer_id in "customer.id" checkout_id = obj.get("id", "") order_obj = obj.get("order", {}) order_id = order_obj.get("id") if isinstance(order_obj, dict) else None customer_obj = obj.get("customer", {}) customer_id = customer_obj.get("id") if isinstance(customer_obj, dict) else None customer_email = ( customer_obj.get("email", "") if isinstance(customer_obj, dict) else "" ) txn = await self._payment_repo.get_creem_transaction_by_checkout_id( checkout_id=checkout_id ) if txn is None: logger.warning( "CREEM checkout.completed for unknown checkout_id: %s", checkout_id, ) return if txn.status == "completed": logger.info( "CREEM checkout already completed: checkout_id=%s", checkout_id, ) return user_id = txn.user_id credits = txn.credits account = await self._payment_repo.get_or_create_user_points_for_update( user_id=user_id ) balance = int(account.balance) new_balance = balance + credits account.balance = new_balance account.lifetime_earned = int(account.lifetime_earned) + credits account.version = int(account.version) + 1 event_id = f"payment.creem:{checkout_id}" metadata_obj = PurchaseLedgerMetadata( operator_type=PointsOperatorType.SYSTEM, run_id=event_id, ext={ "source": "creem", "platform": "web", "product_code": txn.product_code, "transaction_id": checkout_id, "creem_product_id": txn.creem_product_id, "order_id": order_id or "", "customer_id": customer_id or "", "creem_transaction_id": str(txn.id), }, ) ledger_command = ApplyPointsChangeCommand( user_id=user_id, change_type=PointsChangeType.PURCHASE, biz_type=PointsBizType.PAYMENT, biz_id=txn.id, event_id=event_id, amount=credits, direction=1, operator_id=None, metadata=metadata_obj, ) await self._points_repo.append_ledger( command=ledger_command, balance_after=new_balance, ) txn.order_id = order_id txn.customer_id = customer_id txn.status = "completed" txn.ledger_event_id = event_id txn.creem_payload = obj paid_at = datetime.now(timezone.utc) logger.info( "CREEM payment completed: user_id=%s checkout_id=%s credits=%d new_balance=%d", user_id, checkout_id, credits, new_balance, ) await grant_invite_rewards_for_creem_payment( repository=self._points_repo, invitee_user_id=user_id, invitee_email=str(customer_email), creem_transaction_id=txn.id, paid_at=paid_at, ) mappings = _load_creem_product_mappings() mapping = mappings.get(txn.product_code) if mapping and mapping.type == "starter": normalized_email = str(customer_email).strip().lower() if normalized_email: email_hash = self._build_email_hash(normalized_email) _ = await self._payment_repo.upsert_register_bonus_claim_for_starter_pack( email_hash=email_hash, user_email_snapshot=normalized_email, first_user_id_snapshot=user_id, ) await self._payment_repo.commit() @staticmethod def _build_email_hash(normalized_email: str) -> str: key = config.points_policy.register_bonus_hmac_key.get_secret_value().strip() digest = hmac.new( key=key.encode("utf-8"), msg=normalized_email.encode("utf-8"), digestmod=hashlib.sha256, ) return digest.hexdigest()