Files
eryao/backend/src/v1/notifications/service.py
T

153 lines
4.7 KiB
Python
Raw Normal View History

2026-04-10 18:50:08 +08:00
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
2026-04-10 18:50:08 +08:00
from uuid import UUID
from core.http.errors import ApiProblemError, problem_payload
from v1.notifications.repository import NotificationRepository
from v1.notifications.schemas import (
NotificationPayloadNone,
NotificationPayloadRoute,
NotificationPayloadUrl,
NotificationPayload,
)
@dataclass(frozen=True)
class NotificationListItem:
id: UUID
notification_id: UUID
type: str
title: str
body: str
payload: NotificationPayload
is_read: bool
read_at: datetime | None
created_at: datetime
@dataclass(frozen=True)
class NotificationListResult:
items: list[NotificationListItem]
next_cursor: datetime | None
has_more: bool
class NotificationService:
def __init__(self, repository: NotificationRepository) -> None:
self._repository = repository
async def list_notifications(
self,
*,
user_id: UUID,
limit: int = 20,
cursor: datetime | None = None,
) -> NotificationListResult:
actual_limit = min(limit, 50)
rows = await self._repository.list_notifications(
user_id=user_id,
limit=actual_limit + 1,
cursor=cursor,
)
has_more = len(rows) > actual_limit
items = rows[:actual_limit]
next_cursor = None
if has_more and items:
next_cursor = items[-1][0].created_at
list_items = []
for un, n in items:
payload = _parse_payload(n.payload)
list_items.append(
NotificationListItem(
id=un.id,
notification_id=n.id,
type=n.type,
title=n.title,
body=n.body,
payload=payload,
is_read=un.is_read,
read_at=un.read_at,
created_at=un.created_at,
)
)
return NotificationListResult(
items=list_items,
next_cursor=next_cursor,
has_more=has_more,
)
async def get_unread_count(self, *, user_id: UUID) -> int:
return await self._repository.get_unread_count(user_id=user_id)
async def mark_read(
self, *, user_notification_id: UUID, user_id: UUID
) -> NotificationListItem:
result = await self._repository.get_user_notification(
user_notification_id=user_notification_id,
user_id=user_id,
)
if result is None:
raise ApiProblemError(
status_code=404,
detail=problem_payload(
code="NOTIFICATION_NOT_FOUND",
detail="Notification not found or not owned by current user",
),
)
un, n = result
if not un.is_read:
await self._repository.mark_read(
user_notification_id=user_notification_id,
user_id=user_id,
)
await self._repository.commit()
2026-04-10 18:50:08 +08:00
payload = _parse_payload(n.payload)
return NotificationListItem(
id=un.id,
notification_id=n.id,
type=n.type,
title=n.title,
body=n.body,
payload=payload,
is_read=True,
read_at=un.read_at or datetime.now(timezone.utc),
2026-04-10 18:50:08 +08:00
created_at=un.created_at,
)
async def mark_all_read(self, *, user_id: UUID) -> int:
updated_count = await self._repository.mark_all_read(user_id=user_id)
if updated_count > 0:
await self._repository.commit()
return updated_count
async def link_notifications_for_registered_user(
self, *, user_id: UUID, is_first_registration: bool
) -> int:
return await self._repository.link_notifications_for_registered_user(
user_id=user_id, is_first_registration=is_first_registration
)
2026-04-10 18:50:08 +08:00
def _parse_payload(raw: dict[str, object]) -> NotificationPayload:
action = raw.get("action")
if action == "none":
return NotificationPayloadNone(action="none")
if action == "open_route":
return NotificationPayloadRoute(
action="open_route",
route=str(raw.get("route", "")),
entity_id=str(raw["entity_id"])
if "entity_id" in raw and raw["entity_id"] is not None
else None,
tab=str(raw["tab"]) if "tab" in raw and raw["tab"] is not None else None,
)
if action == "open_url":
return NotificationPayloadUrl(
action="open_url",
url=str(raw.get("url", "")),
)
return NotificationPayloadNone(action="none")