Files
eryao/backend/alembic/versions/20260411_0003_notifications.py
T

102 lines
5.6 KiB
Python
Raw Normal View History

"""Create notification inbox schema.
Revision ID: 20260428_squash_0003
Revises: 20260428_squash_0002
Create Date: 2026-04-11 12:00:00
Squashed history: creates notifications with static-sync fields, target_mode,
and final i18n jsonb title/body columns in one step.
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = "20260428_squash_0003"
down_revision: Union[str, Sequence[str], None] = "20260428_squash_0002"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"notifications",
sa.Column("id", sa.UUID(), server_default=sa.text("gen_random_uuid()"), nullable=False),
sa.Column("type", sa.String(length=32), server_default=sa.text("'system'"), nullable=False),
sa.Column("source", sa.String(length=32), server_default=sa.text("'manual'"), nullable=False),
sa.Column("source_key", sa.String(length=128), nullable=True),
sa.Column("source_version", sa.Integer(), nullable=True),
sa.Column("content_hash", sa.String(length=64), nullable=True),
sa.Column("title", postgresql.JSONB(astext_type=sa.Text()), server_default=sa.text("'{}'::jsonb"), nullable=False),
sa.Column("body", postgresql.JSONB(astext_type=sa.Text()), server_default=sa.text("'{}'::jsonb"), nullable=False),
sa.Column("payload", postgresql.JSONB(astext_type=sa.Text()), server_default=sa.text("'{}'::jsonb"), nullable=False),
sa.Column("status", sa.String(length=16), server_default=sa.text("'published'"), nullable=False),
sa.Column("target_mode", sa.String(length=32), server_default=sa.text("'all_users'"), nullable=False),
sa.Column("published_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("revoked_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True),
sa.CheckConstraint("status IN ('draft', 'published', 'revoked')", name="ck_notifications_status"),
sa.CheckConstraint("target_mode IN ('new_users', 'exist_users', 'all_users', 'user_ids')", name="ck_notifications_target_mode"),
sa.CheckConstraint("jsonb_typeof(payload) = 'object'", name="ck_notifications_payload_object"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_notifications_status_created_at", "notifications", ["status", sa.text("created_at DESC")])
op.create_index("ix_notifications_published_at", "notifications", [sa.text("published_at DESC")])
op.create_index(
"uq_notifications_source_source_key",
"notifications",
["source", "source_key"],
unique=True,
postgresql_where=sa.text("source_key IS NOT NULL"),
)
_enable_service_only_rls("notifications")
op.create_table(
"user_notifications",
sa.Column("id", sa.UUID(), server_default=sa.text("gen_random_uuid()"), nullable=False),
sa.Column("user_id", sa.UUID(), nullable=False),
sa.Column("notification_id", sa.UUID(), nullable=False),
sa.Column("is_read", sa.Boolean(), server_default=sa.text("false"), nullable=False),
sa.Column("read_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(["user_id"], ["auth.users.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["notification_id"], ["notifications.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("user_id", "notification_id", name="uq_user_notifications_user_notification"),
)
op.create_index("ix_user_notifications_user_created_at", "user_notifications", ["user_id", sa.text("created_at DESC")])
op.create_index("ix_user_notifications_user_unread", "user_notifications", ["user_id", "is_read"])
_enable_service_only_rls("user_notifications")
def downgrade() -> None:
_drop_service_only_rls("user_notifications")
op.drop_table("user_notifications")
_drop_service_only_rls("notifications")
op.drop_table("notifications")
def _enable_service_only_rls(table_name: str) -> None:
for role in ["anon", "authenticated"]:
for action in ["select", "insert", "update", "delete"]:
op.execute(f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}")
op.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY")
for role in ["anon", "authenticated"]:
op.execute(f"CREATE POLICY {role}_select_{table_name} ON {table_name} FOR SELECT TO {role} USING (false)")
op.execute(f"CREATE POLICY {role}_insert_{table_name} ON {table_name} FOR INSERT TO {role} WITH CHECK (false)")
op.execute(f"CREATE POLICY {role}_update_{table_name} ON {table_name} FOR UPDATE TO {role} USING (false) WITH CHECK (false)")
op.execute(f"CREATE POLICY {role}_delete_{table_name} ON {table_name} FOR DELETE TO {role} USING (false)")
def _drop_service_only_rls(table_name: str) -> None:
for role in ["anon", "authenticated"]:
for action in ["select", "insert", "update", "delete"]:
op.execute(f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}")
op.execute(f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY")