Files
eryao/backend/src/v1/notifications/router.py
T
ZL-Q a940f2ea47 feat(notification): 通知标题和正文支持多语言
- 通知静态配置支持 title/body i18n
- 前端通知列表和详情页展示本地化内容
- 新增数据库迁移脚本
- 更新通知协议文档
2026-04-28 17:20:17 +08:00

153 lines
4.9 KiB
Python

from __future__ import annotations
from datetime import datetime
from typing import Annotated
from fastapi import APIRouter, Depends, Query
from core.logging import get_logger
from core.auth.models import CurrentUser
from core.http.errors import ApiProblemError, problem_payload
from v1.notifications.dependencies import get_notification_service
from v1.notifications.schemas import (
MarkAllReadResponse,
NotificationItemResponse,
NotificationListResponse,
UnreadCountResponse,
)
from v1.notifications.service import NotificationService, normalize_locale
from v1.users.dependencies import get_current_user
router = APIRouter(prefix="/notifications", tags=["notifications"])
logger = get_logger("v1.notifications.router")
def _parse_cursor(cursor: str | None) -> datetime | None:
if cursor is None:
return None
try:
return datetime.fromisoformat(cursor.replace("Z", "+00:00"))
except ValueError as exc:
raise ApiProblemError(
status_code=422,
detail=problem_payload(
code="NOTIFICATION_INVALID_CURSOR",
detail="Notification cursor must be an ISO 8601 datetime",
params={"cursor": cursor},
),
) from exc
@router.get("", response_model=NotificationListResponse)
async def list_notifications(
service: Annotated[NotificationService, Depends(get_notification_service)],
current_user: Annotated[CurrentUser, Depends(get_current_user)],
limit: int = Query(default=20, ge=1, le=50),
cursor: str | None = Query(default=None),
locale: str | None = Query(default=None),
) -> NotificationListResponse:
result = await service.list_notifications(
user_id=current_user.id,
limit=limit,
cursor=_parse_cursor(cursor),
locale=normalize_locale(locale),
)
logger.info(
"Notification list fetched",
user_id=str(current_user.id),
limit=limit,
item_count=len(result.items),
has_more=result.has_more,
)
items = []
for item in result.items:
items.append(
NotificationItemResponse(
id=str(item.id),
notificationId=str(item.notification_id),
type=item.type,
title=item.title,
body=item.body,
payload=item.payload,
isRead=item.is_read,
readAt=item.read_at,
createdAt=item.created_at,
)
)
return NotificationListResponse(
items=items,
nextCursor=result.next_cursor.isoformat() if result.next_cursor else None,
hasMore=result.has_more,
)
@router.get("/unread-count", response_model=UnreadCountResponse)
async def get_unread_count(
service: Annotated[NotificationService, Depends(get_notification_service)],
current_user: Annotated[CurrentUser, Depends(get_current_user)],
) -> UnreadCountResponse:
count = await service.get_unread_count(user_id=current_user.id)
logger.info(
"Notification unread count fetched",
user_id=str(current_user.id),
count=count,
)
return UnreadCountResponse(count=count)
@router.patch("/{notification_id}/read", response_model=NotificationItemResponse)
async def mark_notification_read(
notification_id: str,
service: Annotated[NotificationService, Depends(get_notification_service)],
current_user: Annotated[CurrentUser, Depends(get_current_user)],
locale: str | None = Query(default=None),
) -> NotificationItemResponse:
from uuid import UUID
try:
uid = UUID(notification_id)
except ValueError:
raise ApiProblemError(
status_code=404,
detail=problem_payload(
code="NOTIFICATION_NOT_FOUND",
detail="Notification not found or not owned by current user",
),
)
item = await service.mark_read(
user_notification_id=uid,
user_id=current_user.id,
locale=normalize_locale(locale),
)
logger.info(
"Notification marked as read",
user_id=str(current_user.id),
user_notification_id=str(uid),
)
return NotificationItemResponse(
id=str(item.id),
notificationId=str(item.notification_id),
type=item.type,
title=item.title,
body=item.body,
payload=item.payload,
isRead=item.is_read,
readAt=item.read_at,
createdAt=item.created_at,
)
@router.patch("/mark-all-read", response_model=MarkAllReadResponse)
async def mark_all_read(
service: Annotated[NotificationService, Depends(get_notification_service)],
current_user: Annotated[CurrentUser, Depends(get_current_user)],
) -> MarkAllReadResponse:
updated_count = await service.mark_all_read(user_id=current_user.id)
logger.info(
"All notifications marked as read",
user_id=str(current_user.id),
updated_count=updated_count,
)
return MarkAllReadResponse(updatedCount=updated_count)