Files
eryao/infra/scripts/generate_redeem_codes.py
2026-05-21 16:26:58 +08:00

227 lines
6.6 KiB
Python

from __future__ import annotations
import argparse
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import secrets
import string
from uuid import UUID
from openpyxl import Workbook
from openpyxl.utils import get_column_letter
from sqlalchemy import select
from core.db.session import AsyncSessionLocal
from models.redeem_code import RedeemCode
from models.redeem_code_batch import RedeemCodeBatch
from models.system_audit_log import SystemAuditLog
from v1.payments.service import ProductMapping, _load_product_mappings
CODE_ALPHABET = "".join(
ch for ch in string.ascii_uppercase + string.digits if ch not in "0O1I"
)
DEFAULT_COUNTS = (100, 40, 20)
DEFAULT_CODE_LENGTH = 16
@dataclass(frozen=True)
class PackageSpec:
product_code: str
name: str
credits: int
sort_order: int
mapping: ProductMapping
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate redeem codes for regular packages."
)
parser.add_argument(
"--batch-key",
default=None,
help="Unique batch key. Defaults to redeem-YYYYmmddHHMMSS.",
)
parser.add_argument(
"--created-by",
default="infra_script",
help="Operator name stored on the redeem code batch.",
)
parser.add_argument("--notes", default=None, help="Optional batch notes.")
parser.add_argument(
"--output-dir",
default="var/redeem-codes",
help="Directory for the generated xlsx file.",
)
return parser.parse_args()
def load_regular_packages() -> list[PackageSpec]:
mappings = _load_product_mappings()
packages = [
PackageSpec(
product_code=code,
name=code,
credits=mapping.credits,
sort_order=mapping.sort_order,
mapping=mapping,
)
for code, mapping in mappings.items()
if mapping.enabled and mapping.type != "starter"
]
packages.sort(key=lambda item: item.sort_order)
if len(packages) != 3:
raise RuntimeError(
f"Expected exactly 3 non-starter packages, found {len(packages)}"
)
return packages
def generate_codes(total: int, *, length: int = DEFAULT_CODE_LENGTH) -> list[str]:
codes: set[str] = set()
while len(codes) < total:
codes.add("".join(secrets.choice(CODE_ALPHABET) for _ in range(length)))
return sorted(codes)
def build_workbook(
*,
batch_id: UUID,
batch_key: str,
rows: list[RedeemCode],
) -> Workbook:
workbook = Workbook()
sheet = workbook.active
if sheet is None:
raise RuntimeError("Failed to create redeem code workbook sheet")
sheet.title = "redeem_codes"
sheet.append(
[
"batch_id",
"batch_key",
"code_id",
"code",
"package_product_code",
"package_name",
"credits",
"status",
"redeemed_by_user_id",
"redeemed_at",
]
)
for row in rows:
sheet.append(
[
str(batch_id),
batch_key,
str(row.id),
row.code,
row.package_product_code,
row.package_name_snapshot,
int(row.credits),
row.status,
"",
"",
]
)
for column_index, column in enumerate(sheet.columns, start=1):
column_letter = get_column_letter(column_index)
max_length = max(len(str(cell.value or "")) for cell in column)
sheet.column_dimensions[column_letter].width = min(max(max_length + 2, 12), 42)
return workbook
async def main() -> None:
args = parse_args()
now = datetime.now(timezone.utc)
batch_key = args.batch_key or f"redeem-{now:%Y%m%d%H%M%S}"
output_dir = Path(args.output_dir)
output_path = output_dir / f"{batch_key}.xlsx"
if output_path.exists():
raise RuntimeError(f"Output file already exists: {output_path}")
packages = load_regular_packages()
total = sum(DEFAULT_COUNTS)
codes = generate_codes(total)
async with AsyncSessionLocal() as session:
existing_codes = (
(
await session.execute(
select(RedeemCode.code).where(RedeemCode.code.in_(codes))
)
)
.scalars()
.all()
)
if existing_codes:
raise RuntimeError("Generated redeem code collision; rerun the script")
batch = RedeemCodeBatch(
batch_key=batch_key,
created_by=args.created_by,
notes=args.notes,
)
session.add(batch)
await session.flush()
rows: list[RedeemCode] = []
offset = 0
for package, count in zip(packages, DEFAULT_COUNTS, strict=True):
for code in codes[offset : offset + count]:
row = RedeemCode(
batch_id=batch.id,
code=code,
package_product_code=package.product_code,
package_type=package.mapping.type,
package_name_snapshot=package.name,
credits=package.credits,
sort_order=package.sort_order,
status="active",
)
session.add(row)
rows.append(row)
offset += count
session.add(
SystemAuditLog(
actor_user_id=None,
target_user_id=None,
action="redeem_code.batch_generate",
entity_type="redeem_code_batch",
entity_id=batch.id,
metadata_json={
"batch_key": batch_key,
"counts": {
package.product_code: count
for package, count in zip(packages, DEFAULT_COUNTS, strict=True)
},
"total": total,
"output_path": str(output_path),
},
)
)
try:
await session.flush()
output_dir.mkdir(parents=True, exist_ok=True)
workbook = build_workbook(batch_id=batch.id, batch_key=batch_key, rows=rows)
workbook.save(output_path)
await session.commit()
except Exception:
await session.rollback()
if output_path.exists():
output_path.unlink()
raise
print(f"Generated {total} redeem codes")
print(f"Batch key: {batch_key}")
print(f"Excel: {output_path}")
if __name__ == "__main__":
asyncio.run(main())