from __future__ import annotations import argparse import asyncio from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path import secrets import string from uuid import UUID from openpyxl import Workbook from openpyxl.utils import get_column_letter from sqlalchemy import select from core.db.session import AsyncSessionLocal from models.redeem_code import RedeemCode from models.redeem_code_batch import RedeemCodeBatch from models.system_audit_log import SystemAuditLog from v1.payments.service import ProductMapping, _load_product_mappings CODE_ALPHABET = "".join( ch for ch in string.ascii_uppercase + string.digits if ch not in "0O1I" ) DEFAULT_COUNTS = (100, 40, 20) DEFAULT_CODE_LENGTH = 16 @dataclass(frozen=True) class PackageSpec: product_code: str name: str credits: int sort_order: int mapping: ProductMapping def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Generate redeem codes for regular packages." ) parser.add_argument( "--batch-key", default=None, help="Unique batch key. Defaults to redeem-YYYYmmddHHMMSS.", ) parser.add_argument( "--created-by", default="infra_script", help="Operator name stored on the redeem code batch.", ) parser.add_argument("--notes", default=None, help="Optional batch notes.") parser.add_argument( "--output-dir", default="var/redeem-codes", help="Directory for the generated xlsx file.", ) return parser.parse_args() def load_regular_packages() -> list[PackageSpec]: mappings = _load_product_mappings() packages = [ PackageSpec( product_code=code, name=code, credits=mapping.credits, sort_order=mapping.sort_order, mapping=mapping, ) for code, mapping in mappings.items() if mapping.enabled and mapping.type != "starter" ] packages.sort(key=lambda item: item.sort_order) if len(packages) != 3: raise RuntimeError( f"Expected exactly 3 non-starter packages, found {len(packages)}" ) return packages def generate_codes(total: int, *, length: int = DEFAULT_CODE_LENGTH) -> list[str]: codes: set[str] = set() while len(codes) < total: codes.add("".join(secrets.choice(CODE_ALPHABET) for _ in range(length))) return sorted(codes) def build_workbook( *, batch_id: UUID, batch_key: str, rows: list[RedeemCode], ) -> Workbook: workbook = Workbook() sheet = workbook.active if sheet is None: raise RuntimeError("Failed to create redeem code workbook sheet") sheet.title = "redeem_codes" sheet.append( [ "batch_id", "batch_key", "code_id", "code", "package_product_code", "package_name", "credits", "status", "redeemed_by_user_id", "redeemed_at", ] ) for row in rows: sheet.append( [ str(batch_id), batch_key, str(row.id), row.code, row.package_product_code, row.package_name_snapshot, int(row.credits), row.status, "", "", ] ) for column_index, column in enumerate(sheet.columns, start=1): column_letter = get_column_letter(column_index) max_length = max(len(str(cell.value or "")) for cell in column) sheet.column_dimensions[column_letter].width = min(max(max_length + 2, 12), 42) return workbook async def main() -> None: args = parse_args() now = datetime.now(timezone.utc) batch_key = args.batch_key or f"redeem-{now:%Y%m%d%H%M%S}" output_dir = Path(args.output_dir) output_path = output_dir / f"{batch_key}.xlsx" if output_path.exists(): raise RuntimeError(f"Output file already exists: {output_path}") packages = load_regular_packages() total = sum(DEFAULT_COUNTS) codes = generate_codes(total) async with AsyncSessionLocal() as session: existing_codes = ( ( await session.execute( select(RedeemCode.code).where(RedeemCode.code.in_(codes)) ) ) .scalars() .all() ) if existing_codes: raise RuntimeError("Generated redeem code collision; rerun the script") batch = RedeemCodeBatch( batch_key=batch_key, created_by=args.created_by, notes=args.notes, ) session.add(batch) await session.flush() rows: list[RedeemCode] = [] offset = 0 for package, count in zip(packages, DEFAULT_COUNTS, strict=True): for code in codes[offset : offset + count]: row = RedeemCode( batch_id=batch.id, code=code, package_product_code=package.product_code, package_type=package.mapping.type, package_name_snapshot=package.name, credits=package.credits, sort_order=package.sort_order, status="active", ) session.add(row) rows.append(row) offset += count session.add( SystemAuditLog( actor_user_id=None, target_user_id=None, action="redeem_code.batch_generate", entity_type="redeem_code_batch", entity_id=batch.id, metadata_json={ "batch_key": batch_key, "counts": { package.product_code: count for package, count in zip(packages, DEFAULT_COUNTS, strict=True) }, "total": total, "output_path": str(output_path), }, ) ) try: await session.flush() output_dir.mkdir(parents=True, exist_ok=True) workbook = build_workbook(batch_id=batch.id, batch_key=batch_key, rows=rows) workbook.save(output_path) await session.commit() except Exception: await session.rollback() if output_path.exists(): output_path.unlink() raise print(f"Generated {total} redeem codes") print(f"Batch key: {batch_key}") print(f"Excel: {output_path}") if __name__ == "__main__": asyncio.run(main())