Files
social-app/backend/alembic/versions/20260226_0002_session_and_agent_tables.py
T

345 lines
12 KiB
Python
Raw Normal View History

"""initial schema part 2: session and agent tables
Revision ID: 202602260002
Revises: 202602260001
Create Date: 2026-02-26 20:11:00
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = "202602260002"
down_revision: Union[str, Sequence[str], None] = "202602260001"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"sessions",
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("user_id", sa.UUID(), nullable=False),
sa.Column("session_type", sa.String(length=20), nullable=False),
sa.Column("job_id", sa.UUID(), nullable=True),
sa.Column("title", sa.String(length=255), nullable=True),
sa.Column("status", sa.String(length=20), nullable=False),
sa.Column(
"last_activity_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"message_count", sa.Integer(), server_default=sa.text("0"), nullable=False
),
sa.Column(
"total_tokens", sa.Integer(), server_default=sa.text("0"), nullable=False
),
sa.Column(
"total_cost",
sa.Numeric(precision=12, scale=6),
server_default=sa.text("0"),
nullable=False,
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_sessions_job_id", "sessions", ["job_id"], unique=False)
op.execute(
"CREATE INDEX ix_sessions_user_session_type_last_activity ON sessions (user_id, session_type, last_activity_at DESC)"
)
op.execute(
"ALTER TABLE sessions ADD CONSTRAINT chk_session_type CHECK (session_type IN ('chat', 'automation'))"
)
op.execute(
"ALTER TABLE sessions ADD CONSTRAINT chk_sessions_type_job_consistency CHECK ((session_type = 'chat' AND job_id IS NULL) OR (session_type = 'automation' AND job_id IS NOT NULL))"
)
op.execute(
"ALTER TABLE sessions ADD CONSTRAINT chk_sessions_status CHECK (status IN ('pending', 'running', 'completed', 'failed'))"
)
op.create_foreign_key(
"fk_sessions_user_id",
"sessions",
"users",
["user_id"],
["id"],
referent_schema="auth",
ondelete="CASCADE",
)
op.create_foreign_key(
"fk_sessions_job_id_user_id",
"sessions",
"automation_jobs",
["job_id", "user_id"],
["id", "owner_id"],
ondelete="RESTRICT",
)
op.create_index(
"ix_sessions_job_user",
"sessions",
["job_id", "user_id"],
unique=False,
)
_enable_rls("sessions")
op.create_table(
"messages",
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("session_id", sa.UUID(), nullable=False),
sa.Column("seq", sa.Integer(), nullable=False),
sa.Column("role", sa.String(length=20), nullable=False),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("model_code", sa.String(length=50), nullable=True),
sa.Column("tool_name", sa.String(length=100), nullable=True),
sa.Column(
"input_tokens", sa.Integer(), nullable=False, server_default=sa.text("0")
),
sa.Column(
"output_tokens", sa.Integer(), nullable=False, server_default=sa.text("0")
),
sa.Column(
"cost",
sa.Numeric(precision=12, scale=6),
nullable=False,
server_default=sa.text("0"),
),
sa.Column(
"currency",
sa.String(length=3),
nullable=False,
server_default=sa.text("'USD'"),
),
sa.Column("latency_ms", sa.Integer(), nullable=True),
sa.Column("metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("session_id", "seq", name="uq_messages_session_seq"),
)
op.create_index("ix_messages_session_id", "messages", ["session_id"], unique=False)
op.execute(
"ALTER TABLE messages ADD CONSTRAINT chk_message_role CHECK (role IN ('user', 'assistant', 'system', 'tool'))"
)
op.create_foreign_key(
"fk_messages_session_id",
"messages",
"sessions",
["session_id"],
["id"],
ondelete="CASCADE",
)
_enable_rls("messages")
op.create_table(
"user_agents",
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("user_id", sa.UUID(), nullable=False),
sa.Column("llm_id", sa.UUID(), nullable=False),
sa.Column("agent_type", sa.String(length=20), nullable=False),
sa.Column(
"config",
postgresql.JSONB(astext_type=sa.Text()),
server_default="{}",
nullable=False,
),
sa.Column("status", sa.String(length=20), nullable=False),
sa.Column("created_by", sa.UUID(), nullable=True),
sa.Column("updated_by", sa.UUID(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("user_id", name="uq_user_agents_user_id"),
)
op.create_index(
"ix_user_agents_agent_type", "user_agents", ["agent_type"], unique=False
)
op.create_index("ix_user_agents_status", "user_agents", ["status"], unique=False)
op.execute(
"ALTER TABLE user_agents ADD CONSTRAINT chk_agent_type CHECK (agent_type IN ('INTENT_RECOGNITION', 'TASK_EXECUTION', 'RESULT_REPORTING'))"
)
op.create_foreign_key(
"fk_user_agents_user_id",
"user_agents",
"users",
["user_id"],
["id"],
referent_schema="auth",
ondelete="CASCADE",
)
op.create_foreign_key(
"fk_user_agents_llm_id",
"user_agents",
"llms",
["llm_id"],
["id"],
ondelete="RESTRICT",
)
op.create_foreign_key(
"fk_user_agents_created_by",
"user_agents",
"users",
["created_by"],
["id"],
referent_schema="auth",
ondelete="SET NULL",
)
op.create_foreign_key(
"fk_user_agents_updated_by",
"user_agents",
"users",
["updated_by"],
["id"],
referent_schema="auth",
ondelete="SET NULL",
)
_enable_rls("user_agents")
op.create_table(
"memories",
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("owner_id", sa.UUID(), nullable=False),
sa.Column("agent_id", sa.UUID(), nullable=True),
sa.Column("memory_type", sa.String(length=20), nullable=False),
sa.Column("title", sa.String(length=255), nullable=True),
sa.Column("content", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column("source", sa.String(length=20), nullable=False),
sa.Column("status", sa.String(length=20), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_memories_owner_type_status",
"memories",
["owner_id", "memory_type", "status"],
unique=False,
)
op.create_index(
"ix_memories_agent_type_status",
"memories",
["agent_id", "memory_type", "status"],
unique=False,
)
op.execute(
"ALTER TABLE memories ADD CONSTRAINT chk_memory_type_agent_id CHECK ((memory_type = 'work' AND agent_id IS NOT NULL) OR (memory_type = 'user' AND agent_id IS NULL))"
)
op.create_foreign_key(
"fk_memories_owner_id",
"memories",
"users",
["owner_id"],
["id"],
referent_schema="auth",
ondelete="CASCADE",
)
op.create_foreign_key(
"fk_memories_agent_id",
"memories",
"user_agents",
["agent_id"],
["id"],
ondelete="CASCADE",
)
_enable_rls("memories")
def downgrade() -> None:
_drop_rls("memories")
op.drop_constraint("fk_memories_agent_id", "memories", type_="foreignkey")
op.drop_constraint("fk_memories_owner_id", "memories", type_="foreignkey")
op.drop_table("memories")
_drop_rls("user_agents")
op.drop_constraint("fk_user_agents_updated_by", "user_agents", type_="foreignkey")
op.drop_constraint("fk_user_agents_created_by", "user_agents", type_="foreignkey")
op.drop_constraint("fk_user_agents_llm_id", "user_agents", type_="foreignkey")
op.drop_constraint("fk_user_agents_user_id", "user_agents", type_="foreignkey")
op.drop_table("user_agents")
_drop_rls("messages")
op.drop_constraint("fk_messages_session_id", "messages", type_="foreignkey")
op.drop_table("messages")
_drop_rls("sessions")
op.drop_constraint("fk_sessions_job_id_user_id", "sessions", type_="foreignkey")
op.drop_constraint("fk_sessions_user_id", "sessions", type_="foreignkey")
op.drop_table("sessions")
def _enable_rls(table_name: str) -> None:
for role in ["anon", "authenticated"]:
for action in ["select", "insert", "update", "delete"]:
op.execute(
f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}"
)
op.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY")
for role in ["anon", "authenticated"]:
op.execute(
f"CREATE POLICY {role}_select_{table_name} ON {table_name} FOR SELECT TO {role} USING (false)"
)
op.execute(
f"CREATE POLICY {role}_insert_{table_name} ON {table_name} FOR INSERT TO {role} WITH CHECK (false)"
)
op.execute(
f"CREATE POLICY {role}_update_{table_name} ON {table_name} FOR UPDATE TO {role} USING (false) WITH CHECK (false)"
)
op.execute(
f"CREATE POLICY {role}_delete_{table_name} ON {table_name} FOR DELETE TO {role} USING (false)"
)
def _drop_rls(table_name: str) -> None:
for role in ["anon", "authenticated"]:
op.execute(f"DROP POLICY IF EXISTS {role}_delete_{table_name} ON {table_name}")
op.execute(f"DROP POLICY IF EXISTS {role}_update_{table_name} ON {table_name}")
op.execute(f"DROP POLICY IF EXISTS {role}_insert_{table_name} ON {table_name}")
op.execute(f"DROP POLICY IF EXISTS {role}_select_{table_name} ON {table_name}")
op.execute(f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY")