Files

469 lines
15 KiB
Python
Raw Permalink Normal View History

2026-04-10 18:50:08 +08:00
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
import pytest
from v1.notifications.service import (
NotificationService,
_parse_payload,
resolve_i18n_text,
normalize_locale,
)
2026-04-10 18:50:08 +08:00
from v1.notifications.schemas import (
NotificationPayloadNone,
NotificationPayloadRoute,
NotificationPayloadUrl,
)
from core.http.errors import ApiProblemError
class _FakeUserNotification:
def __init__(
self,
*,
id: UUID,
user_id: UUID,
notification_id: UUID,
is_read: bool = False,
read_at: datetime | None = None,
created_at: datetime | None = None,
):
self.id = id
self.user_id = user_id
self.notification_id = notification_id
self.is_read = is_read
self.read_at = read_at
self.created_at = created_at or datetime.now()
class _FakeNotification:
def __init__(
self,
*,
id: UUID,
type: str = "system",
title: dict[str, str] | None = None,
body: dict[str, str] | None = None,
2026-04-10 18:50:08 +08:00
payload: dict | None = None,
status: str = "published",
deleted_at: datetime | None = None,
created_at: datetime | None = None,
):
self.id = id
self.type = type
self.title = title or {"zh": "Test"}
self.body = body or {"zh": "Test body"}
2026-04-10 18:50:08 +08:00
self.payload = payload or {"action": "none"}
self.status = status
self.deleted_at = deleted_at
self.created_at = created_at or datetime.now()
class _FakeNotificationRepository:
def __init__(self) -> None:
self._items: list[tuple[_FakeUserNotification, _FakeNotification]] = []
self._mark_read_ids: list[UUID] = []
self._mark_all_read_user_ids: list[UUID] = []
self._commit_count = 0
2026-04-10 18:50:08 +08:00
def add_item(self, un: _FakeUserNotification, n: _FakeNotification) -> None:
self._items.append((un, n))
async def list_notifications(
self,
*,
user_id: UUID,
limit: int = 20,
cursor: datetime | None = None,
) -> list[tuple[_FakeUserNotification, _FakeNotification]]:
user_items = [
(un, n)
for un, n in self._items
if un.user_id == user_id
and n.status == "published"
and n.deleted_at is None
]
if cursor is not None:
user_items = [(un, n) for un, n in user_items if un.created_at < cursor]
user_items.sort(key=lambda x: x[0].created_at, reverse=True)
return user_items[:limit]
async def get_unread_count(self, *, user_id: UUID) -> int:
return sum(
1
for un, n in self._items
if un.user_id == user_id
and not un.is_read
and n.status == "published"
and n.deleted_at is None
)
async def get_user_notification(
self,
*,
user_notification_id: UUID,
user_id: UUID,
) -> tuple[_FakeUserNotification, _FakeNotification] | None:
for un, n in self._items:
if un.id == user_notification_id and un.user_id == user_id:
return (un, n)
return None
async def mark_read(self, *, user_notification_id: UUID, user_id: UUID) -> bool:
self._mark_read_ids.append(user_notification_id)
for un, n in self._items:
if un.id == user_notification_id and un.user_id == user_id:
un.is_read = True
un.read_at = datetime.now()
return True
return False
async def mark_all_read(self, *, user_id: UUID) -> int:
self._mark_all_read_user_ids.append(user_id)
count = 0
for un, n in self._items:
if (
un.user_id == user_id
and not un.is_read
and n.status == "published"
and n.deleted_at is None
):
un.is_read = True
un.read_at = datetime.now()
count += 1
return count
async def commit(self) -> None:
self._commit_count += 1
2026-04-10 18:50:08 +08:00
@pytest.fixture
def fake_repo() -> _FakeNotificationRepository:
return _FakeNotificationRepository()
@pytest.fixture
def service(fake_repo: _FakeNotificationRepository) -> NotificationService:
return NotificationService(repository=fake_repo) # type: ignore[arg-type]
USER_A = uuid4()
USER_B = uuid4()
def _make_notification(
*,
user_id: UUID,
notification_id: UUID | None = None,
is_read: bool = False,
read_at: datetime | None = None,
title: dict[str, str] | None = None,
body: dict[str, str] | None = None,
2026-04-10 18:50:08 +08:00
payload: dict | None = None,
status: str = "published",
deleted_at: datetime | None = None,
) -> tuple[_FakeUserNotification, _FakeNotification]:
nid = notification_id or uuid4()
unid = uuid4()
n = _FakeNotification(
id=nid,
title=title,
body=body,
payload=payload,
status=status,
deleted_at=deleted_at,
)
un = _FakeUserNotification(
id=unid,
user_id=user_id,
notification_id=nid,
is_read=is_read,
read_at=read_at,
)
return un, n
class TestListNotifications:
@pytest.mark.asyncio
async def test_returns_only_user_a_notifications(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un_a, n_a = _make_notification(
user_id=USER_A, title={"zh": "A1"}, body={"zh": "A1 body"},
)
un_b, n_b = _make_notification(
user_id=USER_B, title={"zh": "B1"}, body={"zh": "B1 body"},
)
2026-04-10 18:50:08 +08:00
fake_repo.add_item(un_a, n_a)
fake_repo.add_item(un_b, n_b)
result = await service.list_notifications(user_id=USER_A)
assert len(result.items) == 1
assert result.items[0].title == "A1"
@pytest.mark.asyncio
async def test_excludes_revoked_notifications(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(user_id=USER_A, status="revoked")
fake_repo.add_item(un, n)
result = await service.list_notifications(user_id=USER_A)
assert len(result.items) == 0
@pytest.mark.asyncio
async def test_excludes_deleted_notifications(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(user_id=USER_A, deleted_at=datetime.now())
fake_repo.add_item(un, n)
result = await service.list_notifications(user_id=USER_A)
assert len(result.items) == 0
@pytest.mark.asyncio
async def test_pagination_has_more(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
for i in range(3):
un, n = _make_notification(
user_id=USER_A, title={"zh": f"N{i}"}, body={"zh": f"N{i} body"},
)
2026-04-10 18:50:08 +08:00
fake_repo.add_item(un, n)
result = await service.list_notifications(user_id=USER_A, limit=2)
assert len(result.items) == 2
assert result.has_more is True
assert result.next_cursor is not None
class TestUnreadCount:
@pytest.mark.asyncio
async def test_counts_unread_only(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un_read, n_read = _make_notification(user_id=USER_A, is_read=True)
un_unread, n_unread = _make_notification(user_id=USER_A, is_read=False)
fake_repo.add_item(un_read, n_read)
fake_repo.add_item(un_unread, n_unread)
count = await service.get_unread_count(user_id=USER_A)
assert count == 1
@pytest.mark.asyncio
async def test_excludes_revoked_from_count(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(user_id=USER_A, status="revoked", is_read=False)
fake_repo.add_item(un, n)
count = await service.get_unread_count(user_id=USER_A)
assert count == 0
@pytest.mark.asyncio
async def test_user_b_unread_not_counted_for_user_a(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(user_id=USER_B, is_read=False)
fake_repo.add_item(un, n)
count = await service.get_unread_count(user_id=USER_A)
assert count == 0
class TestMarkRead:
@pytest.mark.asyncio
async def test_mark_read_success(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(user_id=USER_A, is_read=False)
fake_repo.add_item(un, n)
result = await service.mark_read(user_notification_id=un.id, user_id=USER_A)
assert result.is_read is True
@pytest.mark.asyncio
async def test_mark_read_idempotent(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
now = datetime.now()
un, n = _make_notification(user_id=USER_A, is_read=True, read_at=now)
fake_repo.add_item(un, n)
result = await service.mark_read(user_notification_id=un.id, user_id=USER_A)
assert result.is_read is True
@pytest.mark.asyncio
async def test_mark_read_wrong_user_raises_404(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(user_id=USER_A)
fake_repo.add_item(un, n)
with pytest.raises(ApiProblemError) as exc_info:
await service.mark_read(user_notification_id=un.id, user_id=USER_B)
assert exc_info.value.status_code == 404
assert exc_info.value.code == "NOTIFICATION_NOT_FOUND"
class TestMarkAllRead:
@pytest.mark.asyncio
async def test_marks_all_unread_as_read(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un1, n1 = _make_notification(user_id=USER_A, is_read=False)
un2, n2 = _make_notification(user_id=USER_A, is_read=False)
fake_repo.add_item(un1, n1)
fake_repo.add_item(un2, n2)
updated = await service.mark_all_read(user_id=USER_A)
assert updated == 2
@pytest.mark.asyncio
async def test_idempotent_when_all_read(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(user_id=USER_A, is_read=True)
fake_repo.add_item(un, n)
updated = await service.mark_all_read(user_id=USER_A)
assert updated == 0
@pytest.mark.asyncio
async def test_does_not_affect_other_user(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un_a, n_a = _make_notification(user_id=USER_A, is_read=False)
un_b, n_b = _make_notification(user_id=USER_B, is_read=False)
fake_repo.add_item(un_a, n_a)
fake_repo.add_item(un_b, n_b)
updated = await service.mark_all_read(user_id=USER_A)
assert updated == 1
assert un_b.is_read is False
class TestParsePayload:
def test_none_action(self):
payload = _parse_payload({"action": "none"})
assert isinstance(payload, NotificationPayloadNone)
assert payload.action == "none"
def test_open_route_action(self):
payload = _parse_payload(
{
"action": "open_route",
"route": "/history",
"entity_id": "abc-123",
"tab": "details",
}
)
assert isinstance(payload, NotificationPayloadRoute)
assert payload.route == "/history"
assert payload.entity_id == "abc-123"
assert payload.tab == "details"
def test_open_url_action(self):
payload = _parse_payload(
{
"action": "open_url",
"url": "https://example.com",
}
)
assert isinstance(payload, NotificationPayloadUrl)
assert payload.url == "https://example.com"
def test_unknown_action_defaults_to_none(self):
payload = _parse_payload({"action": "unknown"})
assert isinstance(payload, NotificationPayloadNone)
def test_missing_action_defaults_to_none(self):
payload = _parse_payload({})
assert isinstance(payload, NotificationPayloadNone)
def test_open_route_minimal(self):
payload = _parse_payload(
{
"action": "open_route",
"route": "/settings",
}
)
assert isinstance(payload, NotificationPayloadRoute)
assert payload.route == "/settings"
assert payload.entity_id is None
assert payload.tab is None
class TestResolveI18nText:
def test_exact_locale_match(self):
text = resolve_i18n_text({"zh": "你好", "en": "Hello"}, "en")
assert text == "Hello"
def test_falls_back_to_default(self):
text = resolve_i18n_text({"zh": "你好", "en": "Hello"}, "zh_Hant")
assert text == "你好"
def test_returns_empty_when_default_missing(self):
text = resolve_i18n_text({"en": "Hello"}, "zh_Hant")
assert text == ""
def test_empty_dict(self):
text = resolve_i18n_text({}, "en")
assert text == ""
class TestNormalizeLocale:
def test_known_locale_passthrough(self):
assert normalize_locale("zh") == "zh"
assert normalize_locale("zh_Hant") == "zh_Hant"
assert normalize_locale("en") == "en"
def test_none_returns_default(self):
assert normalize_locale(None) == "zh"
def test_zh_cn_maps_to_zh(self):
assert normalize_locale("zh_CN") == "zh"
assert normalize_locale("zh_Hans") == "zh"
def test_zh_tw_maps_to_hant(self):
assert normalize_locale("zh_TW") == "zh_Hant"
assert normalize_locale("zh-Hant") == "zh_Hant"
def test_unknown_returns_default(self):
assert normalize_locale("fr") == "zh"
assert normalize_locale("ja") == "zh"
class TestListNotificationsI18n:
@pytest.mark.asyncio
async def test_locale_en_returns_english(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(
user_id=USER_A,
title={"zh": "你好", "en": "Hello"},
body={"zh": "正文", "en": "Body"},
)
fake_repo.add_item(un, n)
result = await service.list_notifications(user_id=USER_A, locale="en")
assert result.items[0].title == "Hello"
assert result.items[0].body == "Body"
@pytest.mark.asyncio
async def test_locale_zh_hant_falls_back_to_zh(
self, service: NotificationService, fake_repo: _FakeNotificationRepository
):
un, n = _make_notification(
user_id=USER_A,
title={"zh": "你好", "en": "Hello"},
body={"zh": "正文", "en": "Body"},
)
fake_repo.add_item(un, n)
result = await service.list_notifications(user_id=USER_A, locale="zh_Hant")
assert result.items[0].title == "你好"
assert result.items[0].body == "正文"