from __future__ import annotations from datetime import datetime from uuid import UUID, uuid4 import pytest from v1.notifications.service import NotificationService, _parse_payload from v1.notifications.schemas import ( NotificationPayloadNone, NotificationPayloadRoute, NotificationPayloadUrl, ) from core.http.errors import ApiProblemError class _FakeUserNotification: def __init__( self, *, id: UUID, user_id: UUID, notification_id: UUID, is_read: bool = False, read_at: datetime | None = None, created_at: datetime | None = None, ): self.id = id self.user_id = user_id self.notification_id = notification_id self.is_read = is_read self.read_at = read_at self.created_at = created_at or datetime.now() class _FakeNotification: def __init__( self, *, id: UUID, type: str = "system", title: str = "Test", body: str = "Test body", payload: dict | None = None, status: str = "published", deleted_at: datetime | None = None, created_at: datetime | None = None, ): self.id = id self.type = type self.title = title self.body = body self.payload = payload or {"action": "none"} self.status = status self.deleted_at = deleted_at self.created_at = created_at or datetime.now() class _FakeNotificationRepository: def __init__(self) -> None: self._items: list[tuple[_FakeUserNotification, _FakeNotification]] = [] self._mark_read_ids: list[UUID] = [] self._mark_all_read_user_ids: list[UUID] = [] self._commit_count = 0 def add_item(self, un: _FakeUserNotification, n: _FakeNotification) -> None: self._items.append((un, n)) async def list_notifications( self, *, user_id: UUID, limit: int = 20, cursor: datetime | None = None, ) -> list[tuple[_FakeUserNotification, _FakeNotification]]: user_items = [ (un, n) for un, n in self._items if un.user_id == user_id and n.status == "published" and n.deleted_at is None ] if cursor is not None: user_items = [(un, n) for un, n in user_items if un.created_at < cursor] user_items.sort(key=lambda x: x[0].created_at, reverse=True) return user_items[:limit] async def get_unread_count(self, *, user_id: UUID) -> int: return sum( 1 for un, n in self._items if un.user_id == user_id and not un.is_read and n.status == "published" and n.deleted_at is None ) async def get_user_notification( self, *, user_notification_id: UUID, user_id: UUID, ) -> tuple[_FakeUserNotification, _FakeNotification] | None: for un, n in self._items: if un.id == user_notification_id and un.user_id == user_id: return (un, n) return None async def mark_read(self, *, user_notification_id: UUID, user_id: UUID) -> bool: self._mark_read_ids.append(user_notification_id) for un, n in self._items: if un.id == user_notification_id and un.user_id == user_id: un.is_read = True un.read_at = datetime.now() return True return False async def mark_all_read(self, *, user_id: UUID) -> int: self._mark_all_read_user_ids.append(user_id) count = 0 for un, n in self._items: if ( un.user_id == user_id and not un.is_read and n.status == "published" and n.deleted_at is None ): un.is_read = True un.read_at = datetime.now() count += 1 return count async def commit(self) -> None: self._commit_count += 1 @pytest.fixture def fake_repo() -> _FakeNotificationRepository: return _FakeNotificationRepository() @pytest.fixture def service(fake_repo: _FakeNotificationRepository) -> NotificationService: return NotificationService(repository=fake_repo) # type: ignore[arg-type] USER_A = uuid4() USER_B = uuid4() def _make_notification( *, user_id: UUID, notification_id: UUID | None = None, is_read: bool = False, read_at: datetime | None = None, title: str = "Test", body: str = "Test body", payload: dict | None = None, status: str = "published", deleted_at: datetime | None = None, ) -> tuple[_FakeUserNotification, _FakeNotification]: nid = notification_id or uuid4() unid = uuid4() n = _FakeNotification( id=nid, title=title, body=body, payload=payload, status=status, deleted_at=deleted_at, ) un = _FakeUserNotification( id=unid, user_id=user_id, notification_id=nid, is_read=is_read, read_at=read_at, ) return un, n class TestListNotifications: @pytest.mark.asyncio async def test_returns_only_user_a_notifications( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un_a, n_a = _make_notification(user_id=USER_A, title="A1") un_b, n_b = _make_notification(user_id=USER_B, title="B1") fake_repo.add_item(un_a, n_a) fake_repo.add_item(un_b, n_b) result = await service.list_notifications(user_id=USER_A) assert len(result.items) == 1 assert result.items[0].title == "A1" @pytest.mark.asyncio async def test_excludes_revoked_notifications( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un, n = _make_notification(user_id=USER_A, status="revoked") fake_repo.add_item(un, n) result = await service.list_notifications(user_id=USER_A) assert len(result.items) == 0 @pytest.mark.asyncio async def test_excludes_deleted_notifications( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un, n = _make_notification(user_id=USER_A, deleted_at=datetime.now()) fake_repo.add_item(un, n) result = await service.list_notifications(user_id=USER_A) assert len(result.items) == 0 @pytest.mark.asyncio async def test_pagination_has_more( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): for i in range(3): un, n = _make_notification(user_id=USER_A, title=f"N{i}") fake_repo.add_item(un, n) result = await service.list_notifications(user_id=USER_A, limit=2) assert len(result.items) == 2 assert result.has_more is True assert result.next_cursor is not None class TestUnreadCount: @pytest.mark.asyncio async def test_counts_unread_only( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un_read, n_read = _make_notification(user_id=USER_A, is_read=True) un_unread, n_unread = _make_notification(user_id=USER_A, is_read=False) fake_repo.add_item(un_read, n_read) fake_repo.add_item(un_unread, n_unread) count = await service.get_unread_count(user_id=USER_A) assert count == 1 @pytest.mark.asyncio async def test_excludes_revoked_from_count( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un, n = _make_notification(user_id=USER_A, status="revoked", is_read=False) fake_repo.add_item(un, n) count = await service.get_unread_count(user_id=USER_A) assert count == 0 @pytest.mark.asyncio async def test_user_b_unread_not_counted_for_user_a( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un, n = _make_notification(user_id=USER_B, is_read=False) fake_repo.add_item(un, n) count = await service.get_unread_count(user_id=USER_A) assert count == 0 class TestMarkRead: @pytest.mark.asyncio async def test_mark_read_success( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un, n = _make_notification(user_id=USER_A, is_read=False) fake_repo.add_item(un, n) result = await service.mark_read(user_notification_id=un.id, user_id=USER_A) assert result.is_read is True @pytest.mark.asyncio async def test_mark_read_idempotent( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): now = datetime.now() un, n = _make_notification(user_id=USER_A, is_read=True, read_at=now) fake_repo.add_item(un, n) result = await service.mark_read(user_notification_id=un.id, user_id=USER_A) assert result.is_read is True @pytest.mark.asyncio async def test_mark_read_wrong_user_raises_404( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un, n = _make_notification(user_id=USER_A) fake_repo.add_item(un, n) with pytest.raises(ApiProblemError) as exc_info: await service.mark_read(user_notification_id=un.id, user_id=USER_B) assert exc_info.value.status_code == 404 assert exc_info.value.code == "NOTIFICATION_NOT_FOUND" class TestMarkAllRead: @pytest.mark.asyncio async def test_marks_all_unread_as_read( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un1, n1 = _make_notification(user_id=USER_A, is_read=False) un2, n2 = _make_notification(user_id=USER_A, is_read=False) fake_repo.add_item(un1, n1) fake_repo.add_item(un2, n2) updated = await service.mark_all_read(user_id=USER_A) assert updated == 2 @pytest.mark.asyncio async def test_idempotent_when_all_read( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un, n = _make_notification(user_id=USER_A, is_read=True) fake_repo.add_item(un, n) updated = await service.mark_all_read(user_id=USER_A) assert updated == 0 @pytest.mark.asyncio async def test_does_not_affect_other_user( self, service: NotificationService, fake_repo: _FakeNotificationRepository ): un_a, n_a = _make_notification(user_id=USER_A, is_read=False) un_b, n_b = _make_notification(user_id=USER_B, is_read=False) fake_repo.add_item(un_a, n_a) fake_repo.add_item(un_b, n_b) updated = await service.mark_all_read(user_id=USER_A) assert updated == 1 assert un_b.is_read is False class TestParsePayload: def test_none_action(self): payload = _parse_payload({"action": "none"}) assert isinstance(payload, NotificationPayloadNone) assert payload.action == "none" def test_open_route_action(self): payload = _parse_payload( { "action": "open_route", "route": "/history", "entity_id": "abc-123", "tab": "details", } ) assert isinstance(payload, NotificationPayloadRoute) assert payload.route == "/history" assert payload.entity_id == "abc-123" assert payload.tab == "details" def test_open_url_action(self): payload = _parse_payload( { "action": "open_url", "url": "https://example.com", } ) assert isinstance(payload, NotificationPayloadUrl) assert payload.url == "https://example.com" def test_unknown_action_defaults_to_none(self): payload = _parse_payload({"action": "unknown"}) assert isinstance(payload, NotificationPayloadNone) def test_missing_action_defaults_to_none(self): payload = _parse_payload({}) assert isinstance(payload, NotificationPayloadNone) def test_open_route_minimal(self): payload = _parse_payload( { "action": "open_route", "route": "/settings", } ) assert isinstance(payload, NotificationPayloadRoute) assert payload.route == "/settings" assert payload.entity_id is None assert payload.tab is None