diff --git a/backend/alembic/versions/20260411_0005_add_notification_static_sync_fields.py b/backend/alembic/versions/20260411_0005_add_notification_static_sync_fields.py new file mode 100644 index 0000000..60bef9b --- /dev/null +++ b/backend/alembic/versions/20260411_0005_add_notification_static_sync_fields.py @@ -0,0 +1,55 @@ +"""add notification static sync fields + +Revision ID: 20260411_0005 +Revises: 20260411_0004 +Create Date: 2026-04-11 16:00:00 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "20260411_0005" +down_revision: Union[str, Sequence[str], None] = "20260411_0004" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "notifications", + sa.Column( + "source", + sa.String(length=32), + server_default=sa.text("'manual'"), + nullable=False, + ), + ) + op.add_column( + "notifications", + sa.Column("source_key", sa.String(length=128), nullable=True), + ) + op.add_column( + "notifications", + sa.Column("source_version", sa.Integer(), nullable=True), + ) + op.add_column( + "notifications", + sa.Column("content_hash", sa.String(length=64), nullable=True), + ) + op.create_index( + "uq_notifications_source_source_key", + "notifications", + ["source", "source_key"], + unique=True, + postgresql_where=sa.text("source_key IS NOT NULL"), + ) + + +def downgrade() -> None: + op.drop_index("uq_notifications_source_source_key", table_name="notifications") + op.drop_column("notifications", "content_hash") + op.drop_column("notifications", "source_version") + op.drop_column("notifications", "source_key") + op.drop_column("notifications", "source") diff --git a/backend/src/core/config/notification/__init__.py b/backend/src/core/config/notification/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/backend/src/core/config/notification/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/backend/src/core/config/notification/static_schema.py b/backend/src/core/config/notification/static_schema.py new file mode 100644 index 0000000..309f1ae --- /dev/null +++ b/backend/src/core/config/notification/static_schema.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from datetime import datetime +from pathlib import Path +from typing import ClassVar +from typing import Literal +from uuid import UUID + +import yaml +from pydantic import BaseModel, ConfigDict, Field, ValidationError, model_validator + +from v1.notifications.schemas import ( + NotificationPayload, + NotificationPayloadNone, +) + + +class StaticNotificationDefinition(BaseModel): + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + source_key: str = Field(min_length=1, max_length=128) + version: int = Field(ge=1) + type: str = Field(default="system", min_length=1, max_length=32) + status: Literal["draft", "published", "revoked"] + deleted: bool = False + published_at: datetime | None = None + title: str = Field(min_length=1) + body: str = Field(min_length=1) + payload: NotificationPayload = NotificationPayloadNone(action="none") + + +class StaticNotificationTargets(BaseModel): + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + mode: Literal["all_users", "user_ids"] + user_ids: list[UUID] | None = None + + @model_validator(mode="after") + def validate_target_mode(self) -> StaticNotificationTargets: + if self.mode == "all_users" and self.user_ids is not None: + raise ValueError("targets.user_ids must be absent when mode=all_users") + if self.mode == "user_ids": + if self.user_ids is None or len(self.user_ids) == 0: + raise ValueError( + "targets.user_ids must be a non-empty list when mode=user_ids" + ) + return self + + +class StaticNotificationFile(BaseModel): + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + notification: StaticNotificationDefinition + targets: StaticNotificationTargets + + +class StaticNotificationDocument(BaseModel): + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + path: Path + config: StaticNotificationFile + + +def load_static_notification_file(path: Path) -> StaticNotificationFile: + with path.open("r", encoding="utf-8") as file: + loaded: object = yaml.safe_load(file) or {} + if not isinstance(loaded, dict): + raise ValueError(f"Invalid static notification format: {path}") + try: + return StaticNotificationFile.model_validate(loaded) + except ValidationError as exc: + raise ValueError(f"Invalid static notification data: {path}") from exc diff --git a/backend/src/core/config/notification/static_sync.py b/backend/src/core/config/notification/static_sync.py new file mode 100644 index 0000000..fb948d1 --- /dev/null +++ b/backend/src/core/config/notification/static_sync.py @@ -0,0 +1,489 @@ +from __future__ import annotations + +import hashlib +import json +from collections.abc import Iterable +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import cast +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy import delete +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.ext.asyncio import AsyncSession + +from core.db.session import AsyncSessionLocal +from core.logging import get_logger +from core.config.notification.static_schema import ( + StaticNotificationDocument, + StaticNotificationFile, + load_static_notification_file, +) +from models.auth_user import AuthUser +from models.notification import Notification +from models.user_notification import UserNotification + +logger = get_logger("core.config.notification.static_sync") + + +@dataclass(frozen=True) +class StaticNotificationSyncResult: + processed: int + created: int + updated: int + revoked: int + deleted: int + target_links_added: int + target_links_removed: int + dry_run: bool + + +def default_static_notification_path() -> Path: + return ( + Path(__file__).resolve().parents[1] + / "static" + / "notification" + / "notifications" + ) + + +def load_static_notification_documents( + *, path: Path | None = None, source_key: str | None = None +) -> list[StaticNotificationDocument]: + base_path = path or default_static_notification_path() + candidate_paths = list(_resolve_candidate_paths(base_path)) + documents: list[StaticNotificationDocument] = [] + seen_source_keys: dict[str, Path] = {} + + for candidate in candidate_paths: + config = load_static_notification_file(candidate) + document = StaticNotificationDocument(path=candidate, config=config) + key = document.config.notification.source_key + previous = seen_source_keys.get(key) + if previous is not None: + raise ValueError( + f"Duplicate static notification source_key '{key}' in {previous} and {candidate}" + ) + seen_source_keys[key] = candidate + if source_key is not None and key != source_key: + continue + documents.append(document) + + if source_key is not None and not documents: + raise ValueError(f"Static notification source_key not found: {source_key}") + return documents + + +async def sync_static_notifications( + *, + path: Path | None = None, + source_key: str | None = None, + dry_run: bool = False, + prune: bool = False, + reconcile_targets: bool = False, +) -> StaticNotificationSyncResult: + if source_key is not None and prune: + raise ValueError("--prune cannot be used together with --source-key") + documents = load_static_notification_documents(path=path, source_key=source_key) + created = 0 + updated = 0 + revoked = 0 + deleted_count = 0 + target_links_added = 0 + target_links_removed = 0 + source_keys = {document.config.notification.source_key for document in documents} + + async with AsyncSessionLocal() as session: + if dry_run: + for document in documents: + item_result = await _sync_document( + session=session, + document=document, + dry_run=True, + reconcile_targets=reconcile_targets, + ) + created += item_result.created + updated += item_result.updated + revoked += item_result.revoked + deleted_count += item_result.deleted + target_links_added += item_result.target_links_added + target_links_removed += item_result.target_links_removed + if prune: + prune_result = await _prune_missing_static_notifications( + session=session, + keep_source_keys=source_keys, + dry_run=True, + ) + deleted_count += prune_result.deleted + await session.rollback() + else: + async with session.begin(): + for document in documents: + item_result = await _sync_document( + session=session, + document=document, + dry_run=False, + reconcile_targets=reconcile_targets, + ) + created += item_result.created + updated += item_result.updated + revoked += item_result.revoked + deleted_count += item_result.deleted + target_links_added += item_result.target_links_added + target_links_removed += item_result.target_links_removed + if prune: + prune_result = await _prune_missing_static_notifications( + session=session, + keep_source_keys=source_keys, + dry_run=False, + ) + deleted_count += prune_result.deleted + + result = StaticNotificationSyncResult( + processed=len(documents), + created=created, + updated=updated, + revoked=revoked, + deleted=deleted_count, + target_links_added=target_links_added, + target_links_removed=target_links_removed, + dry_run=dry_run, + ) + logger.info( + "Static notification sync completed", + processed=result.processed, + created=result.created, + updated=result.updated, + revoked=result.revoked, + deleted=result.deleted, + target_links_added=result.target_links_added, + target_links_removed=result.target_links_removed, + dry_run=result.dry_run, + ) + return result + + +@dataclass(frozen=True) +class _ItemSyncResult: + created: int = 0 + updated: int = 0 + revoked: int = 0 + deleted: int = 0 + target_links_added: int = 0 + target_links_removed: int = 0 + + +async def _sync_document( + *, + session: AsyncSession, + document: StaticNotificationDocument, + dry_run: bool, + reconcile_targets: bool, +) -> _ItemSyncResult: + definition = document.config.notification + content_hash = build_static_notification_content_hash(document.config) + existing = await _get_notification_by_source_key( + session=session, + source="static", + source_key=definition.source_key, + ) + previous_status = existing.status if existing is not None else None + + created = 0 + updated = 0 + revoked = 0 + deleted_count = 0 + + if existing is None: + notification = Notification( + type=definition.type, + source="static", + source_key=definition.source_key, + source_version=definition.version, + content_hash=content_hash, + title=definition.title, + body=definition.body, + payload=_payload_to_dict(definition.payload), + status=definition.status, + published_at=_resolve_published_at(existing=None, config=definition), + revoked_at=_resolve_revoked_at(existing=None, config=definition), + deleted_at=_resolve_deleted_at(existing=None, config=definition), + ) + if not dry_run: + session.add(notification) + await session.flush() + created = 1 + if definition.deleted: + deleted_count = 1 + else: + notification = existing + changed = _apply_notification_updates( + notification=notification, + config=definition, + content_hash=content_hash, + ) + if changed: + updated = 1 + if definition.status == "revoked" and previous_status != "revoked": + revoked = 1 + elif definition.status == "revoked" and changed: + revoked = 1 + if definition.deleted: + deleted_count = 1 + + links_added = 0 + links_removed = 0 + if definition.status == "published" and not definition.deleted: + target_user_ids = await _resolve_target_user_ids( + session=session, + config=document.config, + ) + existing_target_ids = set() + if notification.id is not None: + existing_target_ids = await _get_existing_target_user_ids( + session=session, + notification_id=notification.id, + ) + if target_user_ids: + missing_target_ids = [ + user_id + for user_id in target_user_ids + if user_id not in existing_target_ids + ] + links_added = len(missing_target_ids) + if missing_target_ids and not dry_run: + await _insert_user_notifications( + session=session, + notification_id=notification.id, + user_ids=missing_target_ids, + ) + if reconcile_targets and notification.id is not None: + extra_target_ids = [ + user_id + for user_id in existing_target_ids + if user_id not in set(target_user_ids) + ] + links_removed = len(extra_target_ids) + if extra_target_ids and not dry_run: + await _delete_user_notifications( + session=session, + notification_id=notification.id, + user_ids=extra_target_ids, + ) + + logger.info( + "Processed static notification", + source_key=definition.source_key, + path=str(document.path), + created=created, + updated=updated, + revoked=revoked, + deleted=deleted_count, + target_links_added=links_added, + target_links_removed=links_removed, + dry_run=dry_run, + reconcile_targets=reconcile_targets, + ) + return _ItemSyncResult( + created=created, + updated=updated, + revoked=revoked, + deleted=deleted_count, + target_links_added=links_added, + target_links_removed=links_removed, + ) + + +def build_static_notification_content_hash(config: StaticNotificationFile) -> str: + normalized = config.model_dump(mode="json", exclude_none=True) + if normalized.get("targets", {}).get("mode") == "user_ids": + raw_user_ids = normalized["targets"].get("user_ids", []) + normalized["targets"]["user_ids"] = sorted(raw_user_ids) + payload = json.dumps(normalized, sort_keys=True, separators=(",", ":")) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +def _resolve_candidate_paths(path: Path) -> Iterable[Path]: + if path.is_file(): + if path.suffix.lower() not in {".yaml", ".yml"}: + raise ValueError(f"Unsupported static notification file: {path}") + yield path + return + if not path.exists(): + raise FileNotFoundError(f"Static notification path not found: {path}") + if not path.is_dir(): + raise ValueError( + f"Static notification path must be a file or directory: {path}" + ) + yield from sorted( + list(path.glob("*.yaml")) + list(path.glob("*.yml")), + key=lambda item: item.name, + ) + + +async def _get_notification_by_source_key( + *, session: AsyncSession, source: str, source_key: str +) -> Notification | None: + stmt = select(Notification).where( + Notification.source == source, + Notification.source_key == source_key, + ) + return (await session.execute(stmt)).scalar_one_or_none() + + +def _payload_to_dict(payload: object) -> dict[str, object]: + model_dump = getattr(payload, "model_dump", None) + if callable(model_dump): + return cast(dict[str, object], model_dump(mode="json", exclude_none=True)) + raise TypeError("Notification payload must be a Pydantic model") + + +def _resolve_published_at( + *, existing: Notification | None, config: object +) -> datetime | None: + published_at = getattr(config, "published_at", None) + status = getattr(config, "status") + if published_at is not None: + return published_at + if status == "published": + if existing is not None and existing.published_at is not None: + return existing.published_at + return datetime.now(timezone.utc) + return None + + +def _resolve_revoked_at( + *, existing: Notification | None, config: object +) -> datetime | None: + status = getattr(config, "status") + if status == "revoked": + if existing is not None and existing.revoked_at is not None: + return existing.revoked_at + return datetime.now(timezone.utc) + return None + + +def _resolve_deleted_at( + *, existing: Notification | None, config: object +) -> datetime | None: + deleted = bool(getattr(config, "deleted", False)) + if deleted: + if existing is not None and existing.deleted_at is not None: + return existing.deleted_at + return datetime.now(timezone.utc) + return None + + +def _apply_notification_updates( + *, notification: Notification, config: object, content_hash: str +) -> bool: + next_values = { + "type": getattr(config, "type"), + "source_version": getattr(config, "version"), + "content_hash": content_hash, + "title": getattr(config, "title"), + "body": getattr(config, "body"), + "payload": _payload_to_dict(getattr(config, "payload")), + "status": getattr(config, "status"), + "published_at": _resolve_published_at(existing=notification, config=config), + "revoked_at": _resolve_revoked_at(existing=notification, config=config), + "deleted_at": _resolve_deleted_at(existing=notification, config=config), + } + changed = False + for field_name, next_value in next_values.items(): + if getattr(notification, field_name) != next_value: + setattr(notification, field_name, next_value) + changed = True + return changed + + +async def _resolve_target_user_ids( + *, session: AsyncSession, config: StaticNotificationFile +) -> list[UUID]: + targets = config.targets + if targets.mode == "user_ids": + requested_user_ids = list(dict.fromkeys(targets.user_ids or [])) + result = await session.execute( + select(AuthUser.id).where(AuthUser.id.in_(requested_user_ids)) + ) + existing_user_ids = set(result.scalars().all()) + missing_user_ids = [ + str(user_id) + for user_id in requested_user_ids + if user_id not in existing_user_ids + ] + if missing_user_ids: + raise ValueError( + "Static notification target users not found: " + + ", ".join(sorted(missing_user_ids)) + ) + return requested_user_ids + result = await session.execute(select(AuthUser.id)) + return list(result.scalars().all()) + + +async def _get_existing_target_user_ids( + *, session: AsyncSession, notification_id: UUID +) -> set[UUID]: + result = await session.execute( + select(UserNotification.user_id).where( + UserNotification.notification_id == notification_id + ) + ) + return set(result.scalars().all()) + + +async def _insert_user_notifications( + *, session: AsyncSession, notification_id: UUID, user_ids: list[UUID] +) -> None: + if not user_ids: + return + stmt = insert(UserNotification).values( + [ + {"user_id": user_id, "notification_id": notification_id} + for user_id in user_ids + ] + ) + stmt = stmt.on_conflict_do_nothing(index_elements=["user_id", "notification_id"]) + await session.execute(stmt) + await session.flush() + + +async def _delete_user_notifications( + *, session: AsyncSession, notification_id: UUID, user_ids: list[UUID] +) -> None: + if not user_ids: + return + stmt = delete(UserNotification).where( + UserNotification.notification_id == notification_id, + UserNotification.user_id.in_(user_ids), + ) + await session.execute(stmt) + await session.flush() + + +async def _prune_missing_static_notifications( + *, session: AsyncSession, keep_source_keys: set[str], dry_run: bool +) -> _ItemSyncResult: + stmt = select(Notification).where(Notification.source == "static") + existing = list((await session.execute(stmt)).scalars().all()) + to_delete = [ + notification + for notification in existing + if notification.source_key is not None + and notification.source_key not in keep_source_keys + and notification.deleted_at is None + ] + if to_delete and not dry_run: + now = datetime.now(timezone.utc) + for notification in to_delete: + notification.deleted_at = now + if to_delete: + logger.info( + "Pruned missing static notifications", + source_keys=[notification.source_key for notification in to_delete], + dry_run=dry_run, + ) + return _ItemSyncResult(deleted=len(to_delete)) diff --git a/backend/src/core/config/static/notification/notifications/README.md b/backend/src/core/config/static/notification/notifications/README.md new file mode 100644 index 0000000..20905bc --- /dev/null +++ b/backend/src/core/config/static/notification/notifications/README.md @@ -0,0 +1,5 @@ +Place static notification YAML files in this directory. + +Protocol source of truth: + +- `docs/protocols/notification/static-notification-sync-protocol.md` diff --git a/backend/src/core/config/static/notification/notifications/welcome_points.yaml b/backend/src/core/config/static/notification/notifications/welcome_points.yaml new file mode 100644 index 0000000..c066140 --- /dev/null +++ b/backend/src/core/config/static/notification/notifications/welcome_points.yaml @@ -0,0 +1,14 @@ +notification: + source_key: welcome_points + version: 1 + type: system + status: published + title: 欢迎来到觅爻 + body: 你已获得新用户奖励,点击前往积分页查看当前余额。 + payload: + action: open_route + route: /points + tab: balance + +targets: + mode: all_users diff --git a/backend/src/core/runtime/cli.py b/backend/src/core/runtime/cli.py index f99b3d3..5b66397 100644 --- a/backend/src/core/runtime/cli.py +++ b/backend/src/core/runtime/cli.py @@ -1,10 +1,12 @@ from __future__ import annotations +import argparse import asyncio import subprocess import sys from pathlib import Path +from core.config.notification.static_sync import sync_static_notifications from core.config.initial.init_data import initialize_data from core.logging import get_logger @@ -96,11 +98,70 @@ async def bootstrap() -> bool: return True +async def run_sync_notifications( + *, + path: str | None = None, + source_key: str | None = None, + dry_run: bool = False, + prune: bool = False, + reconcile_targets: bool = False, +) -> bool: + logger.info( + "Running static notification sync", + path=path, + source_key=source_key, + dry_run=dry_run, + prune=prune, + reconcile_targets=reconcile_targets, + ) + try: + result = await sync_static_notifications( + path=Path(path) if path is not None else None, + source_key=source_key, + dry_run=dry_run, + prune=prune, + reconcile_targets=reconcile_targets, + ) + logger.info( + "Static notification sync finished", + processed=result.processed, + created=result.created, + updated=result.updated, + revoked=result.revoked, + deleted=result.deleted, + target_links_added=result.target_links_added, + target_links_removed=result.target_links_removed, + dry_run=result.dry_run, + ) + return True + except Exception as e: + logger.error("Static notification sync failed", error=str(e)) + return False + + +def _parse_sync_notifications_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog="python -m core.runtime.cli sync-notifications" + ) + parser.add_argument("--path", dest="path") + parser.add_argument("--source-key", dest="source_key") + parser.add_argument("--dry-run", dest="dry_run", action="store_true") + parser.add_argument("--prune", dest="prune", action="store_true") + parser.add_argument( + "--reconcile-targets", + dest="reconcile_targets", + action="store_true", + ) + return parser.parse_args(argv) + + def main() -> int: if len(sys.argv) < 2: logger.error("No command provided") logger.info("Usage: python -m core.runtime.cli ") - logger.info("Available commands: migrate, init-data, bootstrap") + logger.info( + "Available commands: migrate, init-data, bootstrap, sync-notifications" + ) return 1 command = sys.argv[1] @@ -111,9 +172,22 @@ def main() -> int: success = asyncio.run(run_init_data()) elif command == "bootstrap": success = asyncio.run(bootstrap()) + elif command == "sync-notifications": + args = _parse_sync_notifications_args(sys.argv[2:]) + success = asyncio.run( + run_sync_notifications( + path=args.path, + source_key=args.source_key, + dry_run=args.dry_run, + prune=args.prune, + reconcile_targets=args.reconcile_targets, + ) + ) else: logger.error("Unknown command", command=command) - logger.info("Available commands: migrate, init-data, bootstrap") + logger.info( + "Available commands: migrate, init-data, bootstrap, sync-notifications" + ) return 1 return 0 if success else 1 diff --git a/backend/src/models/notification.py b/backend/src/models/notification.py index 28c8e95..7a37ff1 100644 --- a/backend/src/models/notification.py +++ b/backend/src/models/notification.py @@ -31,6 +31,13 @@ class Notification(TimestampMixin, SoftDeleteMixin, Base): "ix_notifications_published_at", "published_at", ), + Index( + "uq_notifications_source_source_key", + "source", + "source_key", + unique=True, + postgresql_where=text("source_key IS NOT NULL"), + ), ) id: Mapped[uuid.UUID] = mapped_column( @@ -39,6 +46,12 @@ class Notification(TimestampMixin, SoftDeleteMixin, Base): type: Mapped[str] = mapped_column( String(32), nullable=False, server_default=text("'system'") ) + source: Mapped[str] = mapped_column( + String(32), nullable=False, server_default=text("'manual'") + ) + source_key: Mapped[str | None] = mapped_column(String(128), nullable=True) + source_version: Mapped[int | None] = mapped_column(nullable=True) + content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True) title: Mapped[str] = mapped_column(Text, nullable=False) body: Mapped[str] = mapped_column(Text, nullable=False) payload: Mapped[dict[str, object]] = mapped_column( diff --git a/backend/src/v1/points/repository.py b/backend/src/v1/points/repository.py index e008007..9ce6bbc 100644 --- a/backend/src/v1/points/repository.py +++ b/backend/src/v1/points/repository.py @@ -90,7 +90,7 @@ class PointsRepository: input_tokens=command.input_tokens, output_tokens=command.output_tokens, cost=command.cost, - metadata_json=command.metadata, + metadata_json=command.metadata.model_dump(mode="json", exclude_none=True), ) self._session.add(entry) await self._session.flush() diff --git a/backend/tests/unit/test_static_notification_sync.py b/backend/tests/unit/test_static_notification_sync.py new file mode 100644 index 0000000..3548fdf --- /dev/null +++ b/backend/tests/unit/test_static_notification_sync.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +from pathlib import Path +import textwrap + +import pytest + +from core.config.notification.static_schema import load_static_notification_file +from core.config.notification.static_sync import ( + build_static_notification_content_hash, + load_static_notification_documents, +) + + +def _write_yaml(path: Path, content: str) -> None: + path.write_text(textwrap.dedent(content).strip() + "\n", encoding="utf-8") + + +def test_load_static_notification_file_parses_valid_yaml(tmp_path: Path) -> None: + file_path = tmp_path / "welcome.yaml" + _write_yaml( + file_path, + """ + notification: + source_key: welcome_bonus + version: 1 + type: system + status: published + title: Welcome + body: Welcome to the app. + payload: + action: open_route + route: /points + tab: balance + targets: + mode: user_ids + user_ids: + - 11111111-1111-1111-1111-111111111111 + """, + ) + + loaded = load_static_notification_file(file_path) + + assert loaded.notification.source_key == "welcome_bonus" + assert loaded.notification.payload.action == "open_route" + assert loaded.targets.mode == "user_ids" + assert len(loaded.targets.user_ids or []) == 1 + + +def test_load_static_notification_file_rejects_invalid_targets(tmp_path: Path) -> None: + file_path = tmp_path / "invalid.yaml" + _write_yaml( + file_path, + """ + notification: + source_key: invalid_targets + version: 1 + type: system + status: published + title: Invalid + body: Invalid targets. + payload: + action: none + targets: + mode: all_users + user_ids: + - 11111111-1111-1111-1111-111111111111 + """, + ) + + with pytest.raises(ValueError, match="Invalid static notification data"): + load_static_notification_file(file_path) + + +def test_load_static_notification_documents_rejects_duplicate_source_key( + tmp_path: Path, +) -> None: + _write_yaml( + tmp_path / "first.yaml", + """ + notification: + source_key: duplicate_key + version: 1 + type: system + status: published + title: First + body: First body. + payload: + action: none + targets: + mode: all_users + """, + ) + _write_yaml( + tmp_path / "second.yaml", + """ + notification: + source_key: duplicate_key + version: 1 + type: system + status: published + title: Second + body: Second body. + payload: + action: none + targets: + mode: all_users + """, + ) + + with pytest.raises(ValueError, match="Duplicate static notification source_key"): + load_static_notification_documents(path=tmp_path) + + +def test_content_hash_changes_when_notification_changes(tmp_path: Path) -> None: + first_path = tmp_path / "first.yaml" + second_path = tmp_path / "second.yaml" + _write_yaml( + first_path, + """ + notification: + source_key: same_key + version: 1 + type: system + status: published + title: Title A + body: Body A. + payload: + action: none + targets: + mode: all_users + """, + ) + _write_yaml( + second_path, + """ + notification: + source_key: same_key + version: 1 + type: system + status: published + title: Title B + body: Body A. + payload: + action: none + targets: + mode: all_users + """, + ) + + first = load_static_notification_file(first_path) + second = load_static_notification_file(second_path) + + assert build_static_notification_content_hash( + first + ) != build_static_notification_content_hash(second) + + +def test_load_static_notification_file_supports_deleted_flag(tmp_path: Path) -> None: + file_path = tmp_path / "deleted.yaml" + _write_yaml( + file_path, + """ + notification: + source_key: deleted_notice + version: 1 + type: system + status: revoked + deleted: true + title: Deleted + body: Deleted body. + payload: + action: none + targets: + mode: all_users + """, + ) + + loaded = load_static_notification_file(file_path) + + assert loaded.notification.deleted is True diff --git a/docs/plans/static-notification-sync-plan.md b/docs/plans/static-notification-sync-plan.md index c799513..906b174 100644 --- a/docs/plans/static-notification-sync-plan.md +++ b/docs/plans/static-notification-sync-plan.md @@ -38,9 +38,7 @@ - 系统级离线推送 - 自动监听文件变化并实时同步 -- 通过文件删除自动删库 - 复杂运营后台 -- 严格对齐目标用户集合并自动删除既有投递记录 --- @@ -161,6 +159,7 @@ notification: body: 你已获得注册奖励,可前往积分中心查看。 payload: + deleted: false action: open_route route: /points entity_id: null @@ -222,6 +221,8 @@ targets: - 通知类型,当前默认 `system` - `status` - `draft/published/revoked` +- `deleted` + - 显式软删除主通知 - `published_at` - 发布时间 - `title/body/payload` @@ -239,6 +240,7 @@ targets: - `source_key` 必填且全局唯一 - `version >= 1` - `status` 只允许 `draft/published/revoked` +- `deleted` 为可选布尔值 - `payload` 必须符合现有通知 payload schema - `targets.mode='all_users'` 时不允许传 `user_ids` - `targets.mode='user_ids'` 时 `user_ids` 必填且不能为空 @@ -280,22 +282,39 @@ targets: ### 8.4 统一删除 -本阶段不使用“文件消失自动删库”语义。 +本阶段支持两种明确的下线方式: + +1. 在 YAML 中显式写 `deleted: true` +2. 执行同步时使用 `--prune`,将文件中已不存在的静态通知软删除 + +- `deleted: true` 语义: + +- 设置 `notifications.deleted_at` +- 不删除既有 `user_notifications` + +- `--prune` 语义: + +- 扫描范围内缺失的静态通知会被软删除 +- 不会删除非 `source='static'` 的通知 + +默认情况下,不因为文件消失自动删库。 原因: - 文件误删风险高 - 容易把版本控制操作误解释为业务删除 -如果需要下线,显式通过配置状态控制: +如果只是想临时停止用户可见,优先用: - `status: revoked` -如果未来确实需要静态配置触发软删除,再单独增加明确字段,不在本阶段默认启用。 +如果想做统一下线并保留审计主记录,可用: + +- `deleted: true` ### 8.5 目标用户变更 -第一阶段采用保守策略: +默认采用保守策略: - 新增目标用户时,补插入 `user_notifications` - 被移出目标集合的用户,不自动删除既有 `user_notifications` @@ -305,7 +324,9 @@ targets: - 防止误操作删除已投递历史 - 与“通知一旦发出就保留用户侧记录”的语义更一致 -如果未来需要严格对齐文件目标集合,再单独增加显式 `--reconcile-targets` 行为。 +如果执行同步时显式加上 `--reconcile-targets`,则: + +- 当前目标集合之外的既有 `user_notifications` 会被删除 --- @@ -377,8 +398,10 @@ PYTHONPATH=backend/src uv run python -m core.runtime.cli sync-notifications - `--path` - `--source-key` - `--dry-run` +- `--prune` +- `--reconcile-targets` -第一阶段不默认提供危险的全量清理参数。 +危险行为必须显式开启,不默认启用。 ### 10.2 infra 脚本 @@ -399,6 +422,7 @@ infra/scripts/register-notifications.sh ./infra/scripts/register-notifications.sh ./infra/scripts/register-notifications.sh --dry-run ./infra/scripts/register-notifications.sh --source-key welcome_bonus +./infra/scripts/register-notifications.sh --prune --reconcile-targets ``` --- @@ -435,8 +459,9 @@ infra/scripts/register-notifications.sh 6. 为通知模块补充按 `source/source_key` 查询与更新能力 7. 在 `core.runtime.cli` 中新增 `sync-notifications` 命令 8. 新增 `infra/scripts/register-notifications.sh` -9. 视需要补充 `notification_updated` Realtime 事件 -10. 编写最小测试和 dry-run 校验 +9. 支持 `--prune` 和 `--reconcile-targets` +10. 视需要补充 `notification_updated` Realtime 事件 +11. 编写最小测试和 dry-run 校验 --- @@ -447,7 +472,10 @@ infra/scripts/register-notifications.sh - [ ] 修改 `title/body/payload` 后,再同步可反映到数据库 - [ ] 用户侧已读状态在主通知内容更新后保持不变 - [ ] 将 `status` 改为 `revoked` 后,再同步可使通知在用户列表中失效 +- [ ] 将 `deleted` 改为 `true` 后,再同步可使通知从用户列表和未读数中消失 - [ ] `--dry-run` 可输出计划变更而不写库 +- [ ] `--prune` 可将文件中已不存在的静态通知软删除 +- [ ] `--reconcile-targets` 可严格对齐目标用户集合 - [ ] YAML 结构不合法时同步失败,并给出明确错误 - [ ] 脚本可按全量或按 `source_key` 手动触发同步 @@ -461,10 +489,13 @@ infra/scripts/register-notifications.sh - 新建通知同步 - 已有通知更新同步 - 撤销同步 +- 显式软删除同步 - 相同 `source_key` 幂等 upsert - 更新主通知时不重置 `user_notifications.is_read/read_at` - 新增目标用户时补插入接收关系 - 被移出目标集合时不删除既有接收关系 +- `--reconcile-targets` 下删除多余接收关系 +- `--prune` 下软删除缺失静态通知 脚本至少验证: @@ -479,6 +510,5 @@ infra/scripts/register-notifications.sh 只有在真实需求出现时,再考虑: - 用删除文件触发软删除 -- 严格对齐目标用户集合并清理历史接收关系 - 通过后台页面管理静态通知 - 将静态通知同步纳入更完整的发布工作流 diff --git a/docs/protocols/notification/static-notification-sync-protocol.md b/docs/protocols/notification/static-notification-sync-protocol.md new file mode 100644 index 0000000..cfa2e4b --- /dev/null +++ b/docs/protocols/notification/static-notification-sync-protocol.md @@ -0,0 +1,229 @@ +# Static Notification Sync Protocol + +This document defines the static notification file contract and database sync semantics. + +Protocol verification status: + +- Sync plan source: `docs/plans/static-notification-sync-plan.md` +- Static sync implementation source: `backend/src/core/config/notification/static_sync.py` +- Static sync schema source: `backend/src/core/config/notification/static_schema.py` + +## Compatibility strategy + +- Additive evolution only. +- Existing `source_key` values are stable identifiers and must not be repurposed. +- Changing notification content must not reset user read state. +- Removing a file has no effect by default; database pruning requires explicit CLI flag. + +## Static file location + +Static notification files live under: + +```text +backend/src/core/config/static/notification/notifications/*.yaml +``` + +The sync command scans all `*.yaml` files in that directory unless a specific `source_key` is requested. + +## File schema + +Each YAML file contains two top-level sections: + +- `notification` +- `targets` + +Example: + +```yaml +notification: + source_key: welcome_bonus + version: 1 + type: system + status: published + published_at: 2026-04-10T08:00:00Z + title: 新用户欢迎通知 + body: 你已获得注册奖励,可前往积分中心查看。 + payload: + action: open_route + route: /points + tab: balance + +targets: + mode: all_users +``` + +### notification + +- `source_key`: required, string, max 128, unique among static notifications +- `version`: required, integer, `>= 1` +- `type`: required, string, currently `system` +- `status`: required, one of `draft`, `published`, `revoked` +- `deleted`: optional, boolean, default `false`, soft-delete this notification +- `published_at`: optional ISO 8601 timestamp +- `title`: required, non-empty string +- `body`: required, non-empty string +- `payload`: required, must follow the notification payload protocol + +### targets + +- `mode`: required, one of `all_users`, `user_ids` +- `user_ids`: required only when `mode = user_ids` + +Rules: + +- `mode = all_users`: `user_ids` must be absent +- `mode = user_ids`: `user_ids` must be a non-empty UUID list + +## Payload contract + +`notification.payload` reuses the inbox notification payload schema. + +### action = "none" + +```yaml +payload: + action: none +``` + +### action = "open_route" + +```yaml +payload: + action: open_route + route: /points + entity_id: optional-id + tab: balance +``` + +Rules: + +- `route`: required, max 200 +- `entity_id`: optional, max 64 +- `tab`: optional, max 32 +- `url`: must be absent + +### action = "open_url" + +```yaml +payload: + action: open_url + url: https://example.com/page +``` + +Rules: + +- `url`: required, max 500 +- `route`, `entity_id`, `tab`: must be absent + +## Database mapping + +Static notifications map to `notifications` rows using: + +- `source = 'static'` +- `source_key = notification.source_key` + +Additional notification fields: + +- `source_version = notification.version` +- `content_hash = normalized content hash` + +Required uniqueness: + +- `UNIQUE(source, source_key)` where `source_key IS NOT NULL` + +## Sync semantics + +### Create + +If `(source='static', source_key=...)` does not exist: + +1. Create `notifications` +2. Create `user_notifications` for target users + +### Update + +If `(source='static', source_key=...)` already exists: + +1. Update `title`, `body`, `payload`, `status`, `published_at`, `source_version`, `content_hash` +2. Keep existing `user_notifications` +3. Do not reset `is_read` or `read_at` + +### Revoke + +If `notification.status = revoked`: + +1. Update `notifications.status = 'revoked'` +2. Set `revoked_at` +3. Keep existing `user_notifications` + +### Soft delete + +If `notification.deleted = true`: + +1. Set `notifications.deleted_at` +2. Keep existing `user_notifications` +3. Exclude the notification from list/unread queries + +### Draft + +If `notification.status = draft`: + +1. Keep or create the main `notifications` row +2. Do not expose it to users through list/unread queries +3. Do not create new `user_notifications` during sync + +### File deletion + +Deleting a YAML file has no database effect by default. + +Reason: + +- Prevent accidental revocation/deletion from filesystem changes + +If the CLI is executed with `--prune`, then static notifications missing from the scanned files are soft-deleted by setting `deleted_at`. + +### Target changes + +Default behavior: + +- Newly added targets receive new `user_notifications` +- Removed targets do not delete existing `user_notifications` + +This is intentionally non-destructive. + +If the CLI is executed with `--reconcile-targets`, existing `user_notifications` that are no longer part of the computed target set are deleted. + +## CLI contract + +Backend CLI command: + +```bash +PYTHONPATH=backend/src uv run python -m core.runtime.cli sync-notifications +``` + +Supported options: + +- `--path `: override static notification source path +- `--source-key `: sync only one notification +- `--dry-run`: validate and compute changes without writing to DB +- `--prune`: soft-delete static notifications missing from the scanned files +- `--reconcile-targets`: delete extra `user_notifications` not in the current target set + +Infra wrapper: + +```bash +./infra/scripts/register-notifications.sh +./infra/scripts/register-notifications.sh --dry-run +./infra/scripts/register-notifications.sh --source-key welcome_bonus +./infra/scripts/register-notifications.sh --prune --reconcile-targets +``` + +## Failure behavior + +- Invalid YAML structure must fail the sync run +- Duplicate `source_key` across files must fail the sync run +- Invalid payload structure must fail the sync run +- Missing target users are not auto-created +- Database write failure must fail the sync run + +No partial silent success is allowed. diff --git a/infra/scripts/register-notifications.sh b/infra/scripts/register-notifications.sh new file mode 100755 index 0000000..9f70151 --- /dev/null +++ b/infra/scripts/register-notifications.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)" +ENV_FILE="$ROOT_DIR/.env" +ENV_LOADER="$ROOT_DIR/infra/scripts/lib/env.sh" + +if [ ! -f "$ENV_FILE" ]; then + echo "Error: env file not found at $ENV_FILE" >&2 + exit 1 +fi + +# shellcheck disable=SC1090 +. "$ENV_LOADER" +load_env_file "$ENV_FILE" + +cd "$ROOT_DIR" + +PYTHONPATH=backend/src uv run python -m core.runtime.cli sync-notifications "$@"