diff --git a/.env.example b/.env.example index ded410f..5b97a2d 100644 --- a/.env.example +++ b/.env.example @@ -8,6 +8,7 @@ SOCIAL_RUNTIME__ENVIRONMENT=dev SOCIAL_RUNTIME__DEBUG=true SOCIAL_RUNTIME__LOG_LEVEL=INFO SOCIAL_RUNTIME__SQL_LOG_QUERIES=false +SOCIAL_RUNTIME__TRUSTED_PROXY_IPS=[] ############ # Web 服务器配置(Uvicorn) @@ -36,6 +37,13 @@ SOCIAL_WORKER__GROUPS__DEFAULT__CONCURRENCY=2 SOCIAL_WORKER__GROUPS__BULK__CONCURRENCY=1 +############ +# Automation 调度器配置 +############ +SOCIAL_AUTOMATION_SCHEDULER__ENABLED=true +SOCIAL_AUTOMATION_SCHEDULER__INTERVAL_SECONDS=60 +SOCIAL_AUTOMATION_SCHEDULER__BATCH_LIMIT=100 + ############ # Taskiq(可选,默认回落到 Redis URL) ############ @@ -87,5 +95,5 @@ SOCIAL_APP_VERSION__DOWNLOAD_BASE_URL= ############ # Test相关 ############ -SOCIAL_TEST__EMAIL=test@xunmee.com +SOCIAL_TEST__PHONE=+8613812345678 SOCIAL_TEST__PASSWORD=Test@123456 diff --git a/backend/alembic/versions/20260319_0002_auth_phone_session_and_profile_username.py b/backend/alembic/versions/20260319_0002_auth_phone_session_and_profile_username.py new file mode 100644 index 0000000..9349b75 --- /dev/null +++ b/backend/alembic/versions/20260319_0002_auth_phone_session_and_profile_username.py @@ -0,0 +1,179 @@ +"""auth phone-session and profile username trigger update + +Revision ID: 202603190002 +Revises: 20260313_0001 +Create Date: 2026-03-19 15:30:00 +""" + +from typing import Sequence, Union + +from alembic import op + +revision: str = "202603190002" +down_revision: Union[str, Sequence[str], None] = "20260313_0001" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute("DROP INDEX IF EXISTS ix_profiles_referred_by") + op.execute("ALTER TABLE profiles DROP COLUMN IF EXISTS referred_by") + + op.execute( + """ + CREATE OR REPLACE FUNCTION public.generate_profile_username_suffix(seed TEXT) + RETURNS TEXT + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = '' + AS $$ + DECLARE + hashed TEXT; + BEGIN + hashed := lower(encode(extensions.digest(seed, 'sha256'), 'base64')); + hashed := regexp_replace(hashed, '[^a-z0-9]', '', 'g'); + IF length(hashed) < 6 THEN + hashed := hashed || '000000'; + END IF; + RETURN substr(hashed, 1, 6); + END; + $$; + """ + ) + + op.execute( + """ + CREATE OR REPLACE FUNCTION public.create_profile_for_new_user() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = '' + AS $$ + DECLARE + base_seed TEXT; + candidate_username TEXT; + attempt INT := 0; + BEGIN + base_seed := coalesce(NEW.phone, NEW.id::text); + + LOOP + candidate_username := 'user_' || public.generate_profile_username_suffix(base_seed || ':' || attempt::text); + EXIT WHEN NOT EXISTS ( + SELECT 1 FROM public.profiles p WHERE p.username = candidate_username + ); + attempt := attempt + 1; + IF attempt >= 50 THEN + candidate_username := 'user_' || substr(replace(NEW.id::text, '-', ''), 1, 6); + EXIT; + END IF; + END LOOP; + + INSERT INTO public.profiles (id, username, avatar_url, bio, settings, created_at, updated_at) + VALUES ( + NEW.id, + candidate_username, + NULL, + NULL, + '{}'::jsonb, + now(), + now() + ) + ON CONFLICT (id) DO NOTHING; + + RETURN NEW; + END; + $$; + """ + ) + + +def downgrade() -> None: + op.execute( + """ + ALTER TABLE profiles ADD COLUMN referred_by UUID REFERENCES profiles(id) ON DELETE SET NULL + """ + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_profiles_referred_by ON profiles(referred_by)" + ) + + op.execute( + """ + CREATE OR REPLACE FUNCTION public.create_profile_for_new_user() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = '' + AS $$ + DECLARE + invite_code_value TEXT; + referrer_id UUID; + new_code TEXT; + attempts INT := 0; + BEGIN + INSERT INTO public.profiles (id, username, avatar_url, bio, settings, referred_by, created_at, updated_at) + VALUES ( + NEW.id, + COALESCE( + NEW.raw_user_meta_data ->> 'username', + split_part(NEW.email, '@', 1), + 'user_' || substring(NEW.id::text, 1, 8) + ), + NULL, + NULL, + '{}'::jsonb, + NULL, + now(), + now() + ) + ON CONFLICT (id) DO NOTHING; + + LOOP + BEGIN + new_code := public.generate_invite_code(); + INSERT INTO public.invite_codes (code, owner_id, status, used_count, max_uses, expires_at, reward_config) + VALUES ( + new_code, + NEW.id, + 'active', + 0, + NULL, + NULL, + '{}'::jsonb + ); + EXIT; + EXCEPTION WHEN unique_violation THEN + attempts := attempts + 1; + IF attempts >= 100 THEN + RAISE EXCEPTION 'Failed to generate unique invite code after 100 attempts'; + END IF; + END; + END LOOP; + + invite_code_value := NEW.raw_user_meta_data ->> 'invite_code'; + IF invite_code_value IS NOT NULL AND length(invite_code_value) = 4 THEN + invite_code_value := upper(invite_code_value); + IF invite_code_value ~ '^[ABCDEFGHJKMNPQRSTUVWXYZ23456789]{4}$' THEN + UPDATE public.invite_codes + SET used_count = used_count + 1 + WHERE code = invite_code_value + AND status = 'active' + AND (max_uses IS NULL OR used_count < max_uses) + AND (expires_at IS NULL OR expires_at > NOW()) + RETURNING owner_id INTO referrer_id; + + IF referrer_id IS NOT NULL THEN + UPDATE public.profiles + SET referred_by = referrer_id + WHERE id = NEW.id; + END IF; + END IF; + END IF; + + RETURN NEW; + END; + $$; + """ + ) + + op.execute("DROP FUNCTION IF EXISTS public.generate_profile_username_suffix(TEXT)") diff --git a/backend/alembic/versions/20260319_0003_add_messages_visibility_mask.py b/backend/alembic/versions/20260319_0003_add_messages_visibility_mask.py new file mode 100644 index 0000000..df37b2f --- /dev/null +++ b/backend/alembic/versions/20260319_0003_add_messages_visibility_mask.py @@ -0,0 +1,43 @@ +"""add_messages_visibility_mask + +Revision ID: 20260319_0003 +Revises: 202603190002 +Create Date: 2026-03-19 18:10:00 + +""" + +from typing import Sequence + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "20260319_0003" +down_revision: str | Sequence[str] | None = "202603190002" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column( + "messages", + sa.Column( + "visibility_mask", + sa.BigInteger(), + nullable=False, + server_default=sa.text("0"), + ), + ) + op.create_index( + "ix_messages_session_seq_visibility", + "messages", + ["session_id", "seq", "visibility_mask"], + unique=False, + ) + op.execute("UPDATE messages SET visibility_mask = 1 WHERE visibility_mask = 0") + + +def downgrade() -> None: + op.drop_index("ix_messages_session_seq_visibility", table_name="messages") + op.drop_column("messages", "visibility_mask") diff --git a/backend/alembic/versions/20260319_0004_automation_job_config_for_memory.py b/backend/alembic/versions/20260319_0004_automation_job_config_for_memory.py new file mode 100644 index 0000000..90d7ffb --- /dev/null +++ b/backend/alembic/versions/20260319_0004_automation_job_config_for_memory.py @@ -0,0 +1,237 @@ +"""automation_job_config_for_memory + +Revision ID: 20260319_0004 +Revises: 20260319_0003 +Create Date: 2026-03-19 20:10:00 +""" + +from typing import Sequence + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +revision: str = "20260319_0004" +down_revision: str | Sequence[str] | None = "20260319_0003" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column( + "automation_jobs", + sa.Column( + "config", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default=sa.text("'{}'::jsonb"), + ), + ) + + op.execute( + """ + UPDATE automation_jobs + SET config = jsonb_build_object( + 'agent_type', 'memory', + 'model_code', 'qwen3.5-flash', + 'enabled_tools', jsonb_build_array('calendar.read', 'user.lookup'), + 'input_template', prompt, + 'context', jsonb_build_object( + 'source', 'latest_chat', + 'window_mode', 'day', + 'window_count', 2 + ) + ) + """ + ) + + op.drop_column("automation_jobs", "prompt") + + op.execute( + """ + WITH ranked AS ( + SELECT + id, + row_number() OVER ( + PARTITION BY owner_id + ORDER BY updated_at DESC, created_at DESC, id DESC + ) AS rn + FROM public.automation_jobs + WHERE deleted_at IS NULL + AND config->>'agent_type' = 'memory' + ) + UPDATE public.automation_jobs aj + SET deleted_at = now() + FROM ranked r + WHERE aj.id = r.id + AND r.rn > 1 + """ + ) + + op.execute( + """ + CREATE UNIQUE INDEX IF NOT EXISTS ux_automation_jobs_owner_memory_active + ON public.automation_jobs(owner_id) + WHERE deleted_at IS NULL + AND config->>'agent_type' = 'memory' + """ + ) + + op.execute( + """ + CREATE OR REPLACE FUNCTION public.create_profile_for_new_user() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = '' + AS $$ + DECLARE + base_seed TEXT; + candidate_username TEXT; + attempt INT := 0; + BEGIN + base_seed := coalesce(NEW.phone, NEW.id::text); + + LOOP + candidate_username := 'user_' || public.generate_profile_username_suffix(base_seed || ':' || attempt::text); + EXIT WHEN NOT EXISTS ( + SELECT 1 FROM public.profiles p WHERE p.username = candidate_username + ); + attempt := attempt + 1; + IF attempt >= 50 THEN + candidate_username := 'user_' || substr(replace(NEW.id::text, '-', ''), 1, 6); + EXIT; + END IF; + END LOOP; + + INSERT INTO public.profiles (id, username, avatar_url, bio, settings, created_at, updated_at) + VALUES ( + NEW.id, + candidate_username, + NULL, + NULL, + '{}'::jsonb, + now(), + now() + ) + ON CONFLICT (id) DO NOTHING; + + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM public.automation_jobs aj + WHERE aj.owner_id = NEW.id + AND aj.deleted_at IS NULL + AND aj.config->>'agent_type' = 'memory' + ) THEN + INSERT INTO public.automation_jobs ( + id, + owner_id, + title, + config, + schedule_type, + run_at, + next_run_at, + timezone, + status, + created_by, + created_at, + updated_at + ) VALUES ( + gen_random_uuid(), + NEW.id, + 'Memory Agent', + jsonb_build_object( + 'agent_type', 'memory', + 'model_code', 'qwen3.5-flash', + 'enabled_tools', jsonb_build_array('calendar.read', 'user.lookup'), + 'input_template', '请基于最近聊天上下文生成一段可执行的记忆总结与建议。', + 'context', jsonb_build_object( + 'source', 'latest_chat', + 'window_mode', 'day', + 'window_count', 2 + ) + ), + 'daily', + now(), + now() + interval '1 day', + 'UTC', + 'active', + NEW.id, + now(), + now() + ); + END IF; + EXCEPTION WHEN unique_violation THEN + NULL; + END; + + RETURN NEW; + END; + $$; + """ + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS ux_automation_jobs_owner_memory_active") + + op.add_column( + "automation_jobs", + sa.Column("prompt", sa.Text(), nullable=False, server_default=sa.text("''")), + ) + + op.execute( + """ + UPDATE automation_jobs + SET prompt = COALESCE(config->>'input_template', '') + """ + ) + + op.drop_column("automation_jobs", "config") + + op.execute( + """ + CREATE OR REPLACE FUNCTION public.create_profile_for_new_user() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = '' + AS $$ + DECLARE + base_seed TEXT; + candidate_username TEXT; + attempt INT := 0; + BEGIN + base_seed := coalesce(NEW.phone, NEW.id::text); + + LOOP + candidate_username := 'user_' || public.generate_profile_username_suffix(base_seed || ':' || attempt::text); + EXIT WHEN NOT EXISTS ( + SELECT 1 FROM public.profiles p WHERE p.username = candidate_username + ); + attempt := attempt + 1; + IF attempt >= 50 THEN + candidate_username := 'user_' || substr(replace(NEW.id::text, '-', ''), 1, 6); + EXIT; + END IF; + END LOOP; + + INSERT INTO public.profiles (id, username, avatar_url, bio, settings, created_at, updated_at) + VALUES ( + NEW.id, + candidate_username, + NULL, + NULL, + '{}'::jsonb, + now(), + now() + ) + ON CONFLICT (id) DO NOTHING; + + RETURN NEW; + END; + $$; + """ + ) diff --git a/infra/scripts/app.sh b/infra/scripts/app.sh index 83714c2..d267f4a 100755 --- a/infra/scripts/app.sh +++ b/infra/scripts/app.sh @@ -162,6 +162,7 @@ ${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}" WORKER_CRITICAL_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-critical uv run taskiq worker core.taskiq.app:critical_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__CRITICAL__CONCURRENCY:-2}" WORKER_DEFAULT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-default uv run taskiq worker core.taskiq.app:default_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__DEFAULT__CONCURRENCY:-2}" WORKER_BULK_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-bulk uv run taskiq worker core.taskiq.app:bulk_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__BULK__CONCURRENCY:-1}" + AUTOMATION_SCHEDULER_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=automation-scheduler uv run python -m core.runtime.cli automation-scheduler" echo "Starting tmux workers in session '$SESSION_NAME'..." @@ -169,6 +170,7 @@ ${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}" tmux new-window -t "$SESSION_NAME" -n worker-critical "bash -lc \"$WORKER_CRITICAL_CMD; echo '[worker-critical] exited'; exec bash\"" tmux new-window -t "$SESSION_NAME" -n worker-default "bash -lc \"$WORKER_DEFAULT_CMD; echo '[worker-default] exited'; exec bash\"" tmux new-window -t "$SESSION_NAME" -n worker-bulk "bash -lc \"$WORKER_BULK_CMD; echo '[worker-bulk] exited'; exec bash\"" + tmux new-window -t "$SESSION_NAME" -n automation-scheduler "bash -lc \"$AUTOMATION_SCHEDULER_CMD; echo '[automation-scheduler] exited'; exec bash\"" echo "" echo "=== App Started ===" @@ -177,6 +179,7 @@ ${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}" echo " - worker-critical.log, worker-critical.error.log" echo " - worker-default.log, worker-default.error.log" echo " - worker-bulk.log, worker-bulk.error.log" + echo " - automation-scheduler.log, automation-scheduler.error.log" echo "" echo "tmux attach -t $SESSION_NAME" echo "tmux list-windows -t $SESSION_NAME" @@ -199,6 +202,7 @@ stop() { kill_matching_processes "uvicorn" "uv run uvicorn app:app" kill_matching_processes "taskiq workers" "uv run taskiq worker core.taskiq.app:" + kill_matching_processes "automation scheduler" "python -m core.runtime.cli automation-scheduler" kill_listening_processes "port ${WEB_PORT} listeners" "$WEB_PORT"