feat: 静态通知同步 + 积分审计 JSONB 序列化修复

This commit is contained in:
qzl
2026-04-10 19:23:38 +08:00
parent 1cdaeb274e
commit 4b258bb4d0
13 changed files with 1196 additions and 14 deletions
@@ -0,0 +1 @@
from __future__ import annotations
@@ -0,0 +1,72 @@
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import ClassVar
from typing import Literal
from uuid import UUID
import yaml
from pydantic import BaseModel, ConfigDict, Field, ValidationError, model_validator
from v1.notifications.schemas import (
NotificationPayload,
NotificationPayloadNone,
)
class StaticNotificationDefinition(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
source_key: str = Field(min_length=1, max_length=128)
version: int = Field(ge=1)
type: str = Field(default="system", min_length=1, max_length=32)
status: Literal["draft", "published", "revoked"]
deleted: bool = False
published_at: datetime | None = None
title: str = Field(min_length=1)
body: str = Field(min_length=1)
payload: NotificationPayload = NotificationPayloadNone(action="none")
class StaticNotificationTargets(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
mode: Literal["all_users", "user_ids"]
user_ids: list[UUID] | None = None
@model_validator(mode="after")
def validate_target_mode(self) -> StaticNotificationTargets:
if self.mode == "all_users" and self.user_ids is not None:
raise ValueError("targets.user_ids must be absent when mode=all_users")
if self.mode == "user_ids":
if self.user_ids is None or len(self.user_ids) == 0:
raise ValueError(
"targets.user_ids must be a non-empty list when mode=user_ids"
)
return self
class StaticNotificationFile(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
notification: StaticNotificationDefinition
targets: StaticNotificationTargets
class StaticNotificationDocument(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
path: Path
config: StaticNotificationFile
def load_static_notification_file(path: Path) -> StaticNotificationFile:
with path.open("r", encoding="utf-8") as file:
loaded: object = yaml.safe_load(file) or {}
if not isinstance(loaded, dict):
raise ValueError(f"Invalid static notification format: {path}")
try:
return StaticNotificationFile.model_validate(loaded)
except ValidationError as exc:
raise ValueError(f"Invalid static notification data: {path}") from exc
@@ -0,0 +1,489 @@
from __future__ import annotations
import hashlib
import json
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import cast
from uuid import UUID
from sqlalchemy import select
from sqlalchemy import delete
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import AsyncSession
from core.db.session import AsyncSessionLocal
from core.logging import get_logger
from core.config.notification.static_schema import (
StaticNotificationDocument,
StaticNotificationFile,
load_static_notification_file,
)
from models.auth_user import AuthUser
from models.notification import Notification
from models.user_notification import UserNotification
logger = get_logger("core.config.notification.static_sync")
@dataclass(frozen=True)
class StaticNotificationSyncResult:
processed: int
created: int
updated: int
revoked: int
deleted: int
target_links_added: int
target_links_removed: int
dry_run: bool
def default_static_notification_path() -> Path:
return (
Path(__file__).resolve().parents[1]
/ "static"
/ "notification"
/ "notifications"
)
def load_static_notification_documents(
*, path: Path | None = None, source_key: str | None = None
) -> list[StaticNotificationDocument]:
base_path = path or default_static_notification_path()
candidate_paths = list(_resolve_candidate_paths(base_path))
documents: list[StaticNotificationDocument] = []
seen_source_keys: dict[str, Path] = {}
for candidate in candidate_paths:
config = load_static_notification_file(candidate)
document = StaticNotificationDocument(path=candidate, config=config)
key = document.config.notification.source_key
previous = seen_source_keys.get(key)
if previous is not None:
raise ValueError(
f"Duplicate static notification source_key '{key}' in {previous} and {candidate}"
)
seen_source_keys[key] = candidate
if source_key is not None and key != source_key:
continue
documents.append(document)
if source_key is not None and not documents:
raise ValueError(f"Static notification source_key not found: {source_key}")
return documents
async def sync_static_notifications(
*,
path: Path | None = None,
source_key: str | None = None,
dry_run: bool = False,
prune: bool = False,
reconcile_targets: bool = False,
) -> StaticNotificationSyncResult:
if source_key is not None and prune:
raise ValueError("--prune cannot be used together with --source-key")
documents = load_static_notification_documents(path=path, source_key=source_key)
created = 0
updated = 0
revoked = 0
deleted_count = 0
target_links_added = 0
target_links_removed = 0
source_keys = {document.config.notification.source_key for document in documents}
async with AsyncSessionLocal() as session:
if dry_run:
for document in documents:
item_result = await _sync_document(
session=session,
document=document,
dry_run=True,
reconcile_targets=reconcile_targets,
)
created += item_result.created
updated += item_result.updated
revoked += item_result.revoked
deleted_count += item_result.deleted
target_links_added += item_result.target_links_added
target_links_removed += item_result.target_links_removed
if prune:
prune_result = await _prune_missing_static_notifications(
session=session,
keep_source_keys=source_keys,
dry_run=True,
)
deleted_count += prune_result.deleted
await session.rollback()
else:
async with session.begin():
for document in documents:
item_result = await _sync_document(
session=session,
document=document,
dry_run=False,
reconcile_targets=reconcile_targets,
)
created += item_result.created
updated += item_result.updated
revoked += item_result.revoked
deleted_count += item_result.deleted
target_links_added += item_result.target_links_added
target_links_removed += item_result.target_links_removed
if prune:
prune_result = await _prune_missing_static_notifications(
session=session,
keep_source_keys=source_keys,
dry_run=False,
)
deleted_count += prune_result.deleted
result = StaticNotificationSyncResult(
processed=len(documents),
created=created,
updated=updated,
revoked=revoked,
deleted=deleted_count,
target_links_added=target_links_added,
target_links_removed=target_links_removed,
dry_run=dry_run,
)
logger.info(
"Static notification sync completed",
processed=result.processed,
created=result.created,
updated=result.updated,
revoked=result.revoked,
deleted=result.deleted,
target_links_added=result.target_links_added,
target_links_removed=result.target_links_removed,
dry_run=result.dry_run,
)
return result
@dataclass(frozen=True)
class _ItemSyncResult:
created: int = 0
updated: int = 0
revoked: int = 0
deleted: int = 0
target_links_added: int = 0
target_links_removed: int = 0
async def _sync_document(
*,
session: AsyncSession,
document: StaticNotificationDocument,
dry_run: bool,
reconcile_targets: bool,
) -> _ItemSyncResult:
definition = document.config.notification
content_hash = build_static_notification_content_hash(document.config)
existing = await _get_notification_by_source_key(
session=session,
source="static",
source_key=definition.source_key,
)
previous_status = existing.status if existing is not None else None
created = 0
updated = 0
revoked = 0
deleted_count = 0
if existing is None:
notification = Notification(
type=definition.type,
source="static",
source_key=definition.source_key,
source_version=definition.version,
content_hash=content_hash,
title=definition.title,
body=definition.body,
payload=_payload_to_dict(definition.payload),
status=definition.status,
published_at=_resolve_published_at(existing=None, config=definition),
revoked_at=_resolve_revoked_at(existing=None, config=definition),
deleted_at=_resolve_deleted_at(existing=None, config=definition),
)
if not dry_run:
session.add(notification)
await session.flush()
created = 1
if definition.deleted:
deleted_count = 1
else:
notification = existing
changed = _apply_notification_updates(
notification=notification,
config=definition,
content_hash=content_hash,
)
if changed:
updated = 1
if definition.status == "revoked" and previous_status != "revoked":
revoked = 1
elif definition.status == "revoked" and changed:
revoked = 1
if definition.deleted:
deleted_count = 1
links_added = 0
links_removed = 0
if definition.status == "published" and not definition.deleted:
target_user_ids = await _resolve_target_user_ids(
session=session,
config=document.config,
)
existing_target_ids = set()
if notification.id is not None:
existing_target_ids = await _get_existing_target_user_ids(
session=session,
notification_id=notification.id,
)
if target_user_ids:
missing_target_ids = [
user_id
for user_id in target_user_ids
if user_id not in existing_target_ids
]
links_added = len(missing_target_ids)
if missing_target_ids and not dry_run:
await _insert_user_notifications(
session=session,
notification_id=notification.id,
user_ids=missing_target_ids,
)
if reconcile_targets and notification.id is not None:
extra_target_ids = [
user_id
for user_id in existing_target_ids
if user_id not in set(target_user_ids)
]
links_removed = len(extra_target_ids)
if extra_target_ids and not dry_run:
await _delete_user_notifications(
session=session,
notification_id=notification.id,
user_ids=extra_target_ids,
)
logger.info(
"Processed static notification",
source_key=definition.source_key,
path=str(document.path),
created=created,
updated=updated,
revoked=revoked,
deleted=deleted_count,
target_links_added=links_added,
target_links_removed=links_removed,
dry_run=dry_run,
reconcile_targets=reconcile_targets,
)
return _ItemSyncResult(
created=created,
updated=updated,
revoked=revoked,
deleted=deleted_count,
target_links_added=links_added,
target_links_removed=links_removed,
)
def build_static_notification_content_hash(config: StaticNotificationFile) -> str:
normalized = config.model_dump(mode="json", exclude_none=True)
if normalized.get("targets", {}).get("mode") == "user_ids":
raw_user_ids = normalized["targets"].get("user_ids", [])
normalized["targets"]["user_ids"] = sorted(raw_user_ids)
payload = json.dumps(normalized, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _resolve_candidate_paths(path: Path) -> Iterable[Path]:
if path.is_file():
if path.suffix.lower() not in {".yaml", ".yml"}:
raise ValueError(f"Unsupported static notification file: {path}")
yield path
return
if not path.exists():
raise FileNotFoundError(f"Static notification path not found: {path}")
if not path.is_dir():
raise ValueError(
f"Static notification path must be a file or directory: {path}"
)
yield from sorted(
list(path.glob("*.yaml")) + list(path.glob("*.yml")),
key=lambda item: item.name,
)
async def _get_notification_by_source_key(
*, session: AsyncSession, source: str, source_key: str
) -> Notification | None:
stmt = select(Notification).where(
Notification.source == source,
Notification.source_key == source_key,
)
return (await session.execute(stmt)).scalar_one_or_none()
def _payload_to_dict(payload: object) -> dict[str, object]:
model_dump = getattr(payload, "model_dump", None)
if callable(model_dump):
return cast(dict[str, object], model_dump(mode="json", exclude_none=True))
raise TypeError("Notification payload must be a Pydantic model")
def _resolve_published_at(
*, existing: Notification | None, config: object
) -> datetime | None:
published_at = getattr(config, "published_at", None)
status = getattr(config, "status")
if published_at is not None:
return published_at
if status == "published":
if existing is not None and existing.published_at is not None:
return existing.published_at
return datetime.now(timezone.utc)
return None
def _resolve_revoked_at(
*, existing: Notification | None, config: object
) -> datetime | None:
status = getattr(config, "status")
if status == "revoked":
if existing is not None and existing.revoked_at is not None:
return existing.revoked_at
return datetime.now(timezone.utc)
return None
def _resolve_deleted_at(
*, existing: Notification | None, config: object
) -> datetime | None:
deleted = bool(getattr(config, "deleted", False))
if deleted:
if existing is not None and existing.deleted_at is not None:
return existing.deleted_at
return datetime.now(timezone.utc)
return None
def _apply_notification_updates(
*, notification: Notification, config: object, content_hash: str
) -> bool:
next_values = {
"type": getattr(config, "type"),
"source_version": getattr(config, "version"),
"content_hash": content_hash,
"title": getattr(config, "title"),
"body": getattr(config, "body"),
"payload": _payload_to_dict(getattr(config, "payload")),
"status": getattr(config, "status"),
"published_at": _resolve_published_at(existing=notification, config=config),
"revoked_at": _resolve_revoked_at(existing=notification, config=config),
"deleted_at": _resolve_deleted_at(existing=notification, config=config),
}
changed = False
for field_name, next_value in next_values.items():
if getattr(notification, field_name) != next_value:
setattr(notification, field_name, next_value)
changed = True
return changed
async def _resolve_target_user_ids(
*, session: AsyncSession, config: StaticNotificationFile
) -> list[UUID]:
targets = config.targets
if targets.mode == "user_ids":
requested_user_ids = list(dict.fromkeys(targets.user_ids or []))
result = await session.execute(
select(AuthUser.id).where(AuthUser.id.in_(requested_user_ids))
)
existing_user_ids = set(result.scalars().all())
missing_user_ids = [
str(user_id)
for user_id in requested_user_ids
if user_id not in existing_user_ids
]
if missing_user_ids:
raise ValueError(
"Static notification target users not found: "
+ ", ".join(sorted(missing_user_ids))
)
return requested_user_ids
result = await session.execute(select(AuthUser.id))
return list(result.scalars().all())
async def _get_existing_target_user_ids(
*, session: AsyncSession, notification_id: UUID
) -> set[UUID]:
result = await session.execute(
select(UserNotification.user_id).where(
UserNotification.notification_id == notification_id
)
)
return set(result.scalars().all())
async def _insert_user_notifications(
*, session: AsyncSession, notification_id: UUID, user_ids: list[UUID]
) -> None:
if not user_ids:
return
stmt = insert(UserNotification).values(
[
{"user_id": user_id, "notification_id": notification_id}
for user_id in user_ids
]
)
stmt = stmt.on_conflict_do_nothing(index_elements=["user_id", "notification_id"])
await session.execute(stmt)
await session.flush()
async def _delete_user_notifications(
*, session: AsyncSession, notification_id: UUID, user_ids: list[UUID]
) -> None:
if not user_ids:
return
stmt = delete(UserNotification).where(
UserNotification.notification_id == notification_id,
UserNotification.user_id.in_(user_ids),
)
await session.execute(stmt)
await session.flush()
async def _prune_missing_static_notifications(
*, session: AsyncSession, keep_source_keys: set[str], dry_run: bool
) -> _ItemSyncResult:
stmt = select(Notification).where(Notification.source == "static")
existing = list((await session.execute(stmt)).scalars().all())
to_delete = [
notification
for notification in existing
if notification.source_key is not None
and notification.source_key not in keep_source_keys
and notification.deleted_at is None
]
if to_delete and not dry_run:
now = datetime.now(timezone.utc)
for notification in to_delete:
notification.deleted_at = now
if to_delete:
logger.info(
"Pruned missing static notifications",
source_keys=[notification.source_key for notification in to_delete],
dry_run=dry_run,
)
return _ItemSyncResult(deleted=len(to_delete))
@@ -0,0 +1,5 @@
Place static notification YAML files in this directory.
Protocol source of truth:
- `docs/protocols/notification/static-notification-sync-protocol.md`
@@ -0,0 +1,14 @@
notification:
source_key: welcome_points
version: 1
type: system
status: published
title: 欢迎来到觅爻
body: 你已获得新用户奖励,点击前往积分页查看当前余额。
payload:
action: open_route
route: /points
tab: balance
targets:
mode: all_users
+76 -2
View File
@@ -1,10 +1,12 @@
from __future__ import annotations
import argparse
import asyncio
import subprocess
import sys
from pathlib import Path
from core.config.notification.static_sync import sync_static_notifications
from core.config.initial.init_data import initialize_data
from core.logging import get_logger
@@ -96,11 +98,70 @@ async def bootstrap() -> bool:
return True
async def run_sync_notifications(
*,
path: str | None = None,
source_key: str | None = None,
dry_run: bool = False,
prune: bool = False,
reconcile_targets: bool = False,
) -> bool:
logger.info(
"Running static notification sync",
path=path,
source_key=source_key,
dry_run=dry_run,
prune=prune,
reconcile_targets=reconcile_targets,
)
try:
result = await sync_static_notifications(
path=Path(path) if path is not None else None,
source_key=source_key,
dry_run=dry_run,
prune=prune,
reconcile_targets=reconcile_targets,
)
logger.info(
"Static notification sync finished",
processed=result.processed,
created=result.created,
updated=result.updated,
revoked=result.revoked,
deleted=result.deleted,
target_links_added=result.target_links_added,
target_links_removed=result.target_links_removed,
dry_run=result.dry_run,
)
return True
except Exception as e:
logger.error("Static notification sync failed", error=str(e))
return False
def _parse_sync_notifications_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="python -m core.runtime.cli sync-notifications"
)
parser.add_argument("--path", dest="path")
parser.add_argument("--source-key", dest="source_key")
parser.add_argument("--dry-run", dest="dry_run", action="store_true")
parser.add_argument("--prune", dest="prune", action="store_true")
parser.add_argument(
"--reconcile-targets",
dest="reconcile_targets",
action="store_true",
)
return parser.parse_args(argv)
def main() -> int:
if len(sys.argv) < 2:
logger.error("No command provided")
logger.info("Usage: python -m core.runtime.cli <command>")
logger.info("Available commands: migrate, init-data, bootstrap")
logger.info(
"Available commands: migrate, init-data, bootstrap, sync-notifications"
)
return 1
command = sys.argv[1]
@@ -111,9 +172,22 @@ def main() -> int:
success = asyncio.run(run_init_data())
elif command == "bootstrap":
success = asyncio.run(bootstrap())
elif command == "sync-notifications":
args = _parse_sync_notifications_args(sys.argv[2:])
success = asyncio.run(
run_sync_notifications(
path=args.path,
source_key=args.source_key,
dry_run=args.dry_run,
prune=args.prune,
reconcile_targets=args.reconcile_targets,
)
)
else:
logger.error("Unknown command", command=command)
logger.info("Available commands: migrate, init-data, bootstrap")
logger.info(
"Available commands: migrate, init-data, bootstrap, sync-notifications"
)
return 1
return 0 if success else 1
+13
View File
@@ -31,6 +31,13 @@ class Notification(TimestampMixin, SoftDeleteMixin, Base):
"ix_notifications_published_at",
"published_at",
),
Index(
"uq_notifications_source_source_key",
"source",
"source_key",
unique=True,
postgresql_where=text("source_key IS NOT NULL"),
),
)
id: Mapped[uuid.UUID] = mapped_column(
@@ -39,6 +46,12 @@ class Notification(TimestampMixin, SoftDeleteMixin, Base):
type: Mapped[str] = mapped_column(
String(32), nullable=False, server_default=text("'system'")
)
source: Mapped[str] = mapped_column(
String(32), nullable=False, server_default=text("'manual'")
)
source_key: Mapped[str | None] = mapped_column(String(128), nullable=True)
source_version: Mapped[int | None] = mapped_column(nullable=True)
content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True)
title: Mapped[str] = mapped_column(Text, nullable=False)
body: Mapped[str] = mapped_column(Text, nullable=False)
payload: Mapped[dict[str, object]] = mapped_column(
+1 -1
View File
@@ -90,7 +90,7 @@ class PointsRepository:
input_tokens=command.input_tokens,
output_tokens=command.output_tokens,
cost=command.cost,
metadata_json=command.metadata,
metadata_json=command.metadata.model_dump(mode="json", exclude_none=True),
)
self._session.add(entry)
await self._session.flush()