2026-04-10 18:50:08 +08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from uuid import UUID, uuid4
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from v1.notifications.service import NotificationService, _parse_payload
|
|
|
|
|
from v1.notifications.schemas import (
|
|
|
|
|
NotificationPayloadNone,
|
|
|
|
|
NotificationPayloadRoute,
|
|
|
|
|
NotificationPayloadUrl,
|
|
|
|
|
)
|
|
|
|
|
from core.http.errors import ApiProblemError
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _FakeUserNotification:
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
id: UUID,
|
|
|
|
|
user_id: UUID,
|
|
|
|
|
notification_id: UUID,
|
|
|
|
|
is_read: bool = False,
|
|
|
|
|
read_at: datetime | None = None,
|
|
|
|
|
created_at: datetime | None = None,
|
|
|
|
|
):
|
|
|
|
|
self.id = id
|
|
|
|
|
self.user_id = user_id
|
|
|
|
|
self.notification_id = notification_id
|
|
|
|
|
self.is_read = is_read
|
|
|
|
|
self.read_at = read_at
|
|
|
|
|
self.created_at = created_at or datetime.now()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _FakeNotification:
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
id: UUID,
|
|
|
|
|
type: str = "system",
|
|
|
|
|
title: str = "Test",
|
|
|
|
|
body: str = "Test body",
|
|
|
|
|
payload: dict | None = None,
|
|
|
|
|
status: str = "published",
|
|
|
|
|
deleted_at: datetime | None = None,
|
|
|
|
|
created_at: datetime | None = None,
|
|
|
|
|
):
|
|
|
|
|
self.id = id
|
|
|
|
|
self.type = type
|
|
|
|
|
self.title = title
|
|
|
|
|
self.body = body
|
|
|
|
|
self.payload = payload or {"action": "none"}
|
|
|
|
|
self.status = status
|
|
|
|
|
self.deleted_at = deleted_at
|
|
|
|
|
self.created_at = created_at or datetime.now()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _FakeNotificationRepository:
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
|
self._items: list[tuple[_FakeUserNotification, _FakeNotification]] = []
|
|
|
|
|
self._mark_read_ids: list[UUID] = []
|
|
|
|
|
self._mark_all_read_user_ids: list[UUID] = []
|
2026-04-13 14:52:22 +08:00
|
|
|
self._commit_count = 0
|
2026-04-10 18:50:08 +08:00
|
|
|
|
|
|
|
|
def add_item(self, un: _FakeUserNotification, n: _FakeNotification) -> None:
|
|
|
|
|
self._items.append((un, n))
|
|
|
|
|
|
|
|
|
|
async def list_notifications(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
user_id: UUID,
|
|
|
|
|
limit: int = 20,
|
|
|
|
|
cursor: datetime | None = None,
|
|
|
|
|
) -> list[tuple[_FakeUserNotification, _FakeNotification]]:
|
|
|
|
|
user_items = [
|
|
|
|
|
(un, n)
|
|
|
|
|
for un, n in self._items
|
|
|
|
|
if un.user_id == user_id
|
|
|
|
|
and n.status == "published"
|
|
|
|
|
and n.deleted_at is None
|
|
|
|
|
]
|
|
|
|
|
if cursor is not None:
|
|
|
|
|
user_items = [(un, n) for un, n in user_items if un.created_at < cursor]
|
|
|
|
|
user_items.sort(key=lambda x: x[0].created_at, reverse=True)
|
|
|
|
|
return user_items[:limit]
|
|
|
|
|
|
|
|
|
|
async def get_unread_count(self, *, user_id: UUID) -> int:
|
|
|
|
|
return sum(
|
|
|
|
|
1
|
|
|
|
|
for un, n in self._items
|
|
|
|
|
if un.user_id == user_id
|
|
|
|
|
and not un.is_read
|
|
|
|
|
and n.status == "published"
|
|
|
|
|
and n.deleted_at is None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def get_user_notification(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
user_notification_id: UUID,
|
|
|
|
|
user_id: UUID,
|
|
|
|
|
) -> tuple[_FakeUserNotification, _FakeNotification] | None:
|
|
|
|
|
for un, n in self._items:
|
|
|
|
|
if un.id == user_notification_id and un.user_id == user_id:
|
|
|
|
|
return (un, n)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
async def mark_read(self, *, user_notification_id: UUID, user_id: UUID) -> bool:
|
|
|
|
|
self._mark_read_ids.append(user_notification_id)
|
|
|
|
|
for un, n in self._items:
|
|
|
|
|
if un.id == user_notification_id and un.user_id == user_id:
|
|
|
|
|
un.is_read = True
|
|
|
|
|
un.read_at = datetime.now()
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
async def mark_all_read(self, *, user_id: UUID) -> int:
|
|
|
|
|
self._mark_all_read_user_ids.append(user_id)
|
|
|
|
|
count = 0
|
|
|
|
|
for un, n in self._items:
|
|
|
|
|
if (
|
|
|
|
|
un.user_id == user_id
|
|
|
|
|
and not un.is_read
|
|
|
|
|
and n.status == "published"
|
|
|
|
|
and n.deleted_at is None
|
|
|
|
|
):
|
|
|
|
|
un.is_read = True
|
|
|
|
|
un.read_at = datetime.now()
|
|
|
|
|
count += 1
|
|
|
|
|
return count
|
|
|
|
|
|
2026-04-13 14:52:22 +08:00
|
|
|
async def commit(self) -> None:
|
|
|
|
|
self._commit_count += 1
|
|
|
|
|
|
2026-04-10 18:50:08 +08:00
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def fake_repo() -> _FakeNotificationRepository:
|
|
|
|
|
return _FakeNotificationRepository()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def service(fake_repo: _FakeNotificationRepository) -> NotificationService:
|
|
|
|
|
return NotificationService(repository=fake_repo) # type: ignore[arg-type]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
USER_A = uuid4()
|
|
|
|
|
USER_B = uuid4()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_notification(
|
|
|
|
|
*,
|
|
|
|
|
user_id: UUID,
|
|
|
|
|
notification_id: UUID | None = None,
|
|
|
|
|
is_read: bool = False,
|
|
|
|
|
read_at: datetime | None = None,
|
|
|
|
|
title: str = "Test",
|
|
|
|
|
body: str = "Test body",
|
|
|
|
|
payload: dict | None = None,
|
|
|
|
|
status: str = "published",
|
|
|
|
|
deleted_at: datetime | None = None,
|
|
|
|
|
) -> tuple[_FakeUserNotification, _FakeNotification]:
|
|
|
|
|
nid = notification_id or uuid4()
|
|
|
|
|
unid = uuid4()
|
|
|
|
|
n = _FakeNotification(
|
|
|
|
|
id=nid,
|
|
|
|
|
title=title,
|
|
|
|
|
body=body,
|
|
|
|
|
payload=payload,
|
|
|
|
|
status=status,
|
|
|
|
|
deleted_at=deleted_at,
|
|
|
|
|
)
|
|
|
|
|
un = _FakeUserNotification(
|
|
|
|
|
id=unid,
|
|
|
|
|
user_id=user_id,
|
|
|
|
|
notification_id=nid,
|
|
|
|
|
is_read=is_read,
|
|
|
|
|
read_at=read_at,
|
|
|
|
|
)
|
|
|
|
|
return un, n
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestListNotifications:
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_returns_only_user_a_notifications(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un_a, n_a = _make_notification(user_id=USER_A, title="A1")
|
|
|
|
|
un_b, n_b = _make_notification(user_id=USER_B, title="B1")
|
|
|
|
|
fake_repo.add_item(un_a, n_a)
|
|
|
|
|
fake_repo.add_item(un_b, n_b)
|
|
|
|
|
|
|
|
|
|
result = await service.list_notifications(user_id=USER_A)
|
|
|
|
|
assert len(result.items) == 1
|
|
|
|
|
assert result.items[0].title == "A1"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_excludes_revoked_notifications(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un, n = _make_notification(user_id=USER_A, status="revoked")
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
result = await service.list_notifications(user_id=USER_A)
|
|
|
|
|
assert len(result.items) == 0
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_excludes_deleted_notifications(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un, n = _make_notification(user_id=USER_A, deleted_at=datetime.now())
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
result = await service.list_notifications(user_id=USER_A)
|
|
|
|
|
assert len(result.items) == 0
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pagination_has_more(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
for i in range(3):
|
|
|
|
|
un, n = _make_notification(user_id=USER_A, title=f"N{i}")
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
result = await service.list_notifications(user_id=USER_A, limit=2)
|
|
|
|
|
assert len(result.items) == 2
|
|
|
|
|
assert result.has_more is True
|
|
|
|
|
assert result.next_cursor is not None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestUnreadCount:
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_counts_unread_only(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un_read, n_read = _make_notification(user_id=USER_A, is_read=True)
|
|
|
|
|
un_unread, n_unread = _make_notification(user_id=USER_A, is_read=False)
|
|
|
|
|
fake_repo.add_item(un_read, n_read)
|
|
|
|
|
fake_repo.add_item(un_unread, n_unread)
|
|
|
|
|
|
|
|
|
|
count = await service.get_unread_count(user_id=USER_A)
|
|
|
|
|
assert count == 1
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_excludes_revoked_from_count(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un, n = _make_notification(user_id=USER_A, status="revoked", is_read=False)
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
count = await service.get_unread_count(user_id=USER_A)
|
|
|
|
|
assert count == 0
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_user_b_unread_not_counted_for_user_a(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un, n = _make_notification(user_id=USER_B, is_read=False)
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
count = await service.get_unread_count(user_id=USER_A)
|
|
|
|
|
assert count == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMarkRead:
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_mark_read_success(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un, n = _make_notification(user_id=USER_A, is_read=False)
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
result = await service.mark_read(user_notification_id=un.id, user_id=USER_A)
|
|
|
|
|
assert result.is_read is True
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_mark_read_idempotent(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
un, n = _make_notification(user_id=USER_A, is_read=True, read_at=now)
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
result = await service.mark_read(user_notification_id=un.id, user_id=USER_A)
|
|
|
|
|
assert result.is_read is True
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_mark_read_wrong_user_raises_404(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un, n = _make_notification(user_id=USER_A)
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
with pytest.raises(ApiProblemError) as exc_info:
|
|
|
|
|
await service.mark_read(user_notification_id=un.id, user_id=USER_B)
|
|
|
|
|
assert exc_info.value.status_code == 404
|
|
|
|
|
assert exc_info.value.code == "NOTIFICATION_NOT_FOUND"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMarkAllRead:
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_marks_all_unread_as_read(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un1, n1 = _make_notification(user_id=USER_A, is_read=False)
|
|
|
|
|
un2, n2 = _make_notification(user_id=USER_A, is_read=False)
|
|
|
|
|
fake_repo.add_item(un1, n1)
|
|
|
|
|
fake_repo.add_item(un2, n2)
|
|
|
|
|
|
|
|
|
|
updated = await service.mark_all_read(user_id=USER_A)
|
|
|
|
|
assert updated == 2
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_idempotent_when_all_read(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un, n = _make_notification(user_id=USER_A, is_read=True)
|
|
|
|
|
fake_repo.add_item(un, n)
|
|
|
|
|
|
|
|
|
|
updated = await service.mark_all_read(user_id=USER_A)
|
|
|
|
|
assert updated == 0
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_does_not_affect_other_user(
|
|
|
|
|
self, service: NotificationService, fake_repo: _FakeNotificationRepository
|
|
|
|
|
):
|
|
|
|
|
un_a, n_a = _make_notification(user_id=USER_A, is_read=False)
|
|
|
|
|
un_b, n_b = _make_notification(user_id=USER_B, is_read=False)
|
|
|
|
|
fake_repo.add_item(un_a, n_a)
|
|
|
|
|
fake_repo.add_item(un_b, n_b)
|
|
|
|
|
|
|
|
|
|
updated = await service.mark_all_read(user_id=USER_A)
|
|
|
|
|
assert updated == 1
|
|
|
|
|
assert un_b.is_read is False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestParsePayload:
|
|
|
|
|
def test_none_action(self):
|
|
|
|
|
payload = _parse_payload({"action": "none"})
|
|
|
|
|
assert isinstance(payload, NotificationPayloadNone)
|
|
|
|
|
assert payload.action == "none"
|
|
|
|
|
|
|
|
|
|
def test_open_route_action(self):
|
|
|
|
|
payload = _parse_payload(
|
|
|
|
|
{
|
|
|
|
|
"action": "open_route",
|
|
|
|
|
"route": "/history",
|
|
|
|
|
"entity_id": "abc-123",
|
|
|
|
|
"tab": "details",
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
assert isinstance(payload, NotificationPayloadRoute)
|
|
|
|
|
assert payload.route == "/history"
|
|
|
|
|
assert payload.entity_id == "abc-123"
|
|
|
|
|
assert payload.tab == "details"
|
|
|
|
|
|
|
|
|
|
def test_open_url_action(self):
|
|
|
|
|
payload = _parse_payload(
|
|
|
|
|
{
|
|
|
|
|
"action": "open_url",
|
|
|
|
|
"url": "https://example.com",
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
assert isinstance(payload, NotificationPayloadUrl)
|
|
|
|
|
assert payload.url == "https://example.com"
|
|
|
|
|
|
|
|
|
|
def test_unknown_action_defaults_to_none(self):
|
|
|
|
|
payload = _parse_payload({"action": "unknown"})
|
|
|
|
|
assert isinstance(payload, NotificationPayloadNone)
|
|
|
|
|
|
|
|
|
|
def test_missing_action_defaults_to_none(self):
|
|
|
|
|
payload = _parse_payload({})
|
|
|
|
|
assert isinstance(payload, NotificationPayloadNone)
|
|
|
|
|
|
|
|
|
|
def test_open_route_minimal(self):
|
|
|
|
|
payload = _parse_payload(
|
|
|
|
|
{
|
|
|
|
|
"action": "open_route",
|
|
|
|
|
"route": "/settings",
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
assert isinstance(payload, NotificationPayloadRoute)
|
|
|
|
|
assert payload.route == "/settings"
|
|
|
|
|
assert payload.entity_id is None
|
|
|
|
|
assert payload.tab is None
|