"""Create creem_transactions table for CREEM payment integration. Revision ID: 20260511_0001 Revises: 20260428_0004 Create Date: 2026-05-11 00:01:00 """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql revision: str = "20260511_0001" down_revision: Union[str, Sequence[str], None] = "20260428_0004" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: op.create_table( "creem_transactions", sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), sa.Column("user_id", postgresql.UUID(as_uuid=True), nullable=False), sa.Column("product_code", sa.String(length=32), nullable=False), sa.Column("creem_product_id", sa.String(length=128), nullable=False), sa.Column("checkout_id", sa.String(length=128), nullable=False), sa.Column("order_id", sa.String(length=128), nullable=True), sa.Column("customer_id", sa.String(length=128), nullable=True), sa.Column("status", sa.String(length=24), nullable=False), sa.Column("credits", sa.BigInteger(), nullable=False), sa.Column("amount_cents", sa.BigInteger(), nullable=False), sa.Column("currency", sa.String(length=8), nullable=False), sa.Column("creem_payload", postgresql.JSONB(astext_type=sa.Text()), server_default=sa.text("'{}'::jsonb"), nullable=False), sa.Column("ledger_event_id", sa.String(length=128), nullable=True), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), sa.CheckConstraint("status in ('pending', 'completed', 'failed', 'refunded')", name="ck_creem_transactions_status"), sa.PrimaryKeyConstraint("id"), sa.UniqueConstraint("checkout_id", name="uq_creem_transactions_checkout_id"), ) op.create_index("ix_creem_transactions_user_created_at", "creem_transactions", ["user_id", sa.text("created_at DESC")]) op.create_index("ix_creem_transactions_status_updated_at", "creem_transactions", ["status", sa.text("updated_at DESC")]) _enable_service_only_rls("creem_transactions") def downgrade() -> None: _drop_service_only_rls("creem_transactions") op.drop_table("creem_transactions") def _enable_service_only_rls(table_name: str) -> None: for role in ["anon", "authenticated"]: for action in ["select", "insert", "update", "delete"]: op.execute(f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}") op.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY") for role in ["anon", "authenticated"]: op.execute(f"CREATE POLICY {role}_select_{table_name} ON {table_name} FOR SELECT TO {role} USING (false)") op.execute(f"CREATE POLICY {role}_insert_{table_name} ON {table_name} FOR INSERT TO {role} WITH CHECK (false)") op.execute(f"CREATE POLICY {role}_update_{table_name} ON {table_name} FOR UPDATE TO {role} USING (false) WITH CHECK (false)") op.execute(f"CREATE POLICY {role}_delete_{table_name} ON {table_name} FOR DELETE TO {role} USING (false)") def _drop_service_only_rls(table_name: str) -> None: for role in ["anon", "authenticated"]: for action in ["select", "insert", "update", "delete"]: op.execute(f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}") op.execute(f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY")