227 lines
6.6 KiB
Python
227 lines
6.6 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import asyncio
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
import secrets
|
||
|
|
import string
|
||
|
|
from uuid import UUID
|
||
|
|
|
||
|
|
from openpyxl import Workbook
|
||
|
|
from openpyxl.utils import get_column_letter
|
||
|
|
from sqlalchemy import select
|
||
|
|
|
||
|
|
from core.db.session import AsyncSessionLocal
|
||
|
|
from models.redeem_code import RedeemCode
|
||
|
|
from models.redeem_code_batch import RedeemCodeBatch
|
||
|
|
from models.system_audit_log import SystemAuditLog
|
||
|
|
from v1.payments.service import ProductMapping, _load_product_mappings
|
||
|
|
|
||
|
|
CODE_ALPHABET = "".join(
|
||
|
|
ch for ch in string.ascii_uppercase + string.digits if ch not in "0O1I"
|
||
|
|
)
|
||
|
|
DEFAULT_COUNTS = (100, 40, 20)
|
||
|
|
DEFAULT_CODE_LENGTH = 16
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class PackageSpec:
|
||
|
|
product_code: str
|
||
|
|
name: str
|
||
|
|
credits: int
|
||
|
|
sort_order: int
|
||
|
|
mapping: ProductMapping
|
||
|
|
|
||
|
|
|
||
|
|
def parse_args() -> argparse.Namespace:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Generate redeem codes for regular packages."
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--batch-key",
|
||
|
|
default=None,
|
||
|
|
help="Unique batch key. Defaults to redeem-YYYYmmddHHMMSS.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--created-by",
|
||
|
|
default="infra_script",
|
||
|
|
help="Operator name stored on the redeem code batch.",
|
||
|
|
)
|
||
|
|
parser.add_argument("--notes", default=None, help="Optional batch notes.")
|
||
|
|
parser.add_argument(
|
||
|
|
"--output-dir",
|
||
|
|
default="var/redeem-codes",
|
||
|
|
help="Directory for the generated xlsx file.",
|
||
|
|
)
|
||
|
|
return parser.parse_args()
|
||
|
|
|
||
|
|
|
||
|
|
def load_regular_packages() -> list[PackageSpec]:
|
||
|
|
mappings = _load_product_mappings()
|
||
|
|
packages = [
|
||
|
|
PackageSpec(
|
||
|
|
product_code=code,
|
||
|
|
name=code,
|
||
|
|
credits=mapping.credits,
|
||
|
|
sort_order=mapping.sort_order,
|
||
|
|
mapping=mapping,
|
||
|
|
)
|
||
|
|
for code, mapping in mappings.items()
|
||
|
|
if mapping.enabled and mapping.type != "starter"
|
||
|
|
]
|
||
|
|
packages.sort(key=lambda item: item.sort_order)
|
||
|
|
if len(packages) != 3:
|
||
|
|
raise RuntimeError(
|
||
|
|
f"Expected exactly 3 non-starter packages, found {len(packages)}"
|
||
|
|
)
|
||
|
|
return packages
|
||
|
|
|
||
|
|
|
||
|
|
def generate_codes(total: int, *, length: int = DEFAULT_CODE_LENGTH) -> list[str]:
|
||
|
|
codes: set[str] = set()
|
||
|
|
while len(codes) < total:
|
||
|
|
codes.add("".join(secrets.choice(CODE_ALPHABET) for _ in range(length)))
|
||
|
|
return sorted(codes)
|
||
|
|
|
||
|
|
|
||
|
|
def build_workbook(
|
||
|
|
*,
|
||
|
|
batch_id: UUID,
|
||
|
|
batch_key: str,
|
||
|
|
rows: list[RedeemCode],
|
||
|
|
) -> Workbook:
|
||
|
|
workbook = Workbook()
|
||
|
|
sheet = workbook.active
|
||
|
|
if sheet is None:
|
||
|
|
raise RuntimeError("Failed to create redeem code workbook sheet")
|
||
|
|
sheet.title = "redeem_codes"
|
||
|
|
sheet.append(
|
||
|
|
[
|
||
|
|
"batch_id",
|
||
|
|
"batch_key",
|
||
|
|
"code_id",
|
||
|
|
"code",
|
||
|
|
"package_product_code",
|
||
|
|
"package_name",
|
||
|
|
"credits",
|
||
|
|
"status",
|
||
|
|
"redeemed_by_user_id",
|
||
|
|
"redeemed_at",
|
||
|
|
]
|
||
|
|
)
|
||
|
|
for row in rows:
|
||
|
|
sheet.append(
|
||
|
|
[
|
||
|
|
str(batch_id),
|
||
|
|
batch_key,
|
||
|
|
str(row.id),
|
||
|
|
row.code,
|
||
|
|
row.package_product_code,
|
||
|
|
row.package_name_snapshot,
|
||
|
|
int(row.credits),
|
||
|
|
row.status,
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
for column_index, column in enumerate(sheet.columns, start=1):
|
||
|
|
column_letter = get_column_letter(column_index)
|
||
|
|
max_length = max(len(str(cell.value or "")) for cell in column)
|
||
|
|
sheet.column_dimensions[column_letter].width = min(max(max_length + 2, 12), 42)
|
||
|
|
return workbook
|
||
|
|
|
||
|
|
|
||
|
|
async def main() -> None:
|
||
|
|
args = parse_args()
|
||
|
|
now = datetime.now(timezone.utc)
|
||
|
|
batch_key = args.batch_key or f"redeem-{now:%Y%m%d%H%M%S}"
|
||
|
|
output_dir = Path(args.output_dir)
|
||
|
|
output_path = output_dir / f"{batch_key}.xlsx"
|
||
|
|
if output_path.exists():
|
||
|
|
raise RuntimeError(f"Output file already exists: {output_path}")
|
||
|
|
|
||
|
|
packages = load_regular_packages()
|
||
|
|
total = sum(DEFAULT_COUNTS)
|
||
|
|
codes = generate_codes(total)
|
||
|
|
|
||
|
|
async with AsyncSessionLocal() as session:
|
||
|
|
existing_codes = (
|
||
|
|
(
|
||
|
|
await session.execute(
|
||
|
|
select(RedeemCode.code).where(RedeemCode.code.in_(codes))
|
||
|
|
)
|
||
|
|
)
|
||
|
|
.scalars()
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
if existing_codes:
|
||
|
|
raise RuntimeError("Generated redeem code collision; rerun the script")
|
||
|
|
|
||
|
|
batch = RedeemCodeBatch(
|
||
|
|
batch_key=batch_key,
|
||
|
|
created_by=args.created_by,
|
||
|
|
notes=args.notes,
|
||
|
|
)
|
||
|
|
session.add(batch)
|
||
|
|
await session.flush()
|
||
|
|
|
||
|
|
rows: list[RedeemCode] = []
|
||
|
|
offset = 0
|
||
|
|
for package, count in zip(packages, DEFAULT_COUNTS, strict=True):
|
||
|
|
for code in codes[offset : offset + count]:
|
||
|
|
row = RedeemCode(
|
||
|
|
batch_id=batch.id,
|
||
|
|
code=code,
|
||
|
|
package_product_code=package.product_code,
|
||
|
|
package_type=package.mapping.type,
|
||
|
|
package_name_snapshot=package.name,
|
||
|
|
credits=package.credits,
|
||
|
|
sort_order=package.sort_order,
|
||
|
|
status="active",
|
||
|
|
)
|
||
|
|
session.add(row)
|
||
|
|
rows.append(row)
|
||
|
|
offset += count
|
||
|
|
|
||
|
|
session.add(
|
||
|
|
SystemAuditLog(
|
||
|
|
actor_user_id=None,
|
||
|
|
target_user_id=None,
|
||
|
|
action="redeem_code.batch_generate",
|
||
|
|
entity_type="redeem_code_batch",
|
||
|
|
entity_id=batch.id,
|
||
|
|
metadata_json={
|
||
|
|
"batch_key": batch_key,
|
||
|
|
"counts": {
|
||
|
|
package.product_code: count
|
||
|
|
for package, count in zip(packages, DEFAULT_COUNTS, strict=True)
|
||
|
|
},
|
||
|
|
"total": total,
|
||
|
|
"output_path": str(output_path),
|
||
|
|
},
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
await session.flush()
|
||
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
workbook = build_workbook(batch_id=batch.id, batch_key=batch_key, rows=rows)
|
||
|
|
workbook.save(output_path)
|
||
|
|
await session.commit()
|
||
|
|
except Exception:
|
||
|
|
await session.rollback()
|
||
|
|
if output_path.exists():
|
||
|
|
output_path.unlink()
|
||
|
|
raise
|
||
|
|
|
||
|
|
print(f"Generated {total} redeem codes")
|
||
|
|
print(f"Batch key: {batch_key}")
|
||
|
|
print(f"Excel: {output_path}")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
asyncio.run(main())
|