"""add notifications and user_notifications tables Revision ID: 20260411_0004 Revises: 20260411_0003 Create Date: 2026-04-11 12:00:00 """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql revision: str = "20260411_0004" down_revision: Union[str, Sequence[str], None] = "20260411_0003" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: op.create_table( "notifications", sa.Column( "id", sa.UUID(), server_default=sa.text("gen_random_uuid()"), nullable=False ), sa.Column( "type", sa.String(length=32), server_default=sa.text("'system'"), nullable=False, ), sa.Column("title", sa.Text(), nullable=False), sa.Column("body", sa.Text(), nullable=False), sa.Column( "payload", postgresql.JSONB(astext_type=sa.Text()), server_default=sa.text("'{}'::jsonb"), nullable=False, ), sa.Column( "status", sa.String(length=16), server_default=sa.text("'published'"), nullable=False, ), sa.Column("published_at", sa.DateTime(timezone=True), nullable=True), sa.Column("revoked_at", sa.DateTime(timezone=True), nullable=True), sa.Column( "created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False, ), sa.Column( "updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False, ), sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True), sa.CheckConstraint( "status IN ('draft', 'published', 'revoked')", name="ck_notifications_status", ), sa.CheckConstraint( "jsonb_typeof(payload) = 'object'", name="ck_notifications_payload_object", ), sa.PrimaryKeyConstraint("id"), ) op.create_index( "ix_notifications_status_created_at", "notifications", ["status", sa.text("created_at DESC")], ) op.create_index( "ix_notifications_published_at", "notifications", [sa.text("published_at DESC")], ) _enable_rls("notifications") op.create_table( "user_notifications", sa.Column( "id", sa.UUID(), server_default=sa.text("gen_random_uuid()"), nullable=False ), sa.Column("user_id", sa.UUID(), nullable=False), sa.Column("notification_id", sa.UUID(), nullable=False), sa.Column( "is_read", sa.Boolean(), server_default=sa.text("false"), nullable=False ), sa.Column("read_at", sa.DateTime(timezone=True), nullable=True), sa.Column( "created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False, ), sa.Column( "updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False, ), sa.ForeignKeyConstraint(["user_id"], ["auth.users.id"], ondelete="CASCADE"), sa.ForeignKeyConstraint( ["notification_id"], ["notifications.id"], ondelete="CASCADE" ), sa.PrimaryKeyConstraint("id"), sa.UniqueConstraint( "user_id", "notification_id", name="uq_user_notifications_user_notification" ), ) op.create_index( "ix_user_notifications_user_created_at", "user_notifications", ["user_id", sa.text("created_at DESC")], ) op.create_index( "ix_user_notifications_user_unread", "user_notifications", ["user_id", "is_read"], ) _enable_rls("user_notifications") def downgrade() -> None: _drop_rls("user_notifications") op.drop_index("ix_user_notifications_user_unread", table_name="user_notifications") op.drop_index( "ix_user_notifications_user_created_at", table_name="user_notifications" ) op.drop_table("user_notifications") _drop_rls("notifications") op.drop_index("ix_notifications_published_at", table_name="notifications") op.drop_index("ix_notifications_status_created_at", table_name="notifications") op.drop_table("notifications") def _enable_rls(table_name: str) -> None: for role in ["anon", "authenticated"]: for action in ["select", "insert", "update", "delete"]: op.execute( f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}" ) op.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY") for role in ["anon", "authenticated"]: op.execute( f"CREATE POLICY {role}_select_{table_name} ON {table_name} FOR SELECT TO {role} USING (false)" ) op.execute( f"CREATE POLICY {role}_insert_{table_name} ON {table_name} FOR INSERT TO {role} WITH CHECK (false)" ) op.execute( f"CREATE POLICY {role}_update_{table_name} ON {table_name} FOR UPDATE TO {role} USING (false) WITH CHECK (false)" ) op.execute( f"CREATE POLICY {role}_delete_{table_name} ON {table_name} FOR DELETE TO {role} USING (false)" ) def _drop_rls(table_name: str) -> None: for role in ["anon", "authenticated"]: for action in ["select", "insert", "update", "delete"]: op.execute( f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}" ) op.execute(f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY")