from __future__ import annotations from datetime import UTC, datetime from uuid import uuid4 import pytest from models.inbox_messages import InboxMessage from schemas.enums import InboxMessageStatus, InboxMessageType from v1.inbox_messages import realtime class _FakeRedis: def __init__(self) -> None: self.last_stream: str | None = None self.last_payload: str | None = None self.last_block: int | None = None async def xadd(self, stream: str, fields: dict[str, str]) -> str: self.last_stream = stream self.last_payload = fields.get("event") return "1743313300000-0" async def xread(self, _streams: dict[str, str], count: int, block: int): del count self.last_block = block return [ ( "inbox:events:test", [ ( "1743313300000-0", { "event": '{"event_id":"e1","event_type":"INBOX_MESSAGE_CREATED","op":"created"}', }, ) ], ) ] @pytest.mark.asyncio async def test_publish_inbox_message_created_writes_stream(monkeypatch) -> None: fake_redis = _FakeRedis() async def _fake_get_redis(): return fake_redis monkeypatch.setattr(realtime, "get_or_init_redis_client", _fake_get_redis) message = InboxMessage( id=uuid4(), recipient_id=uuid4(), sender_id=uuid4(), message_type=InboxMessageType.CALENDAR, friendship_id=None, schedule_item_id=uuid4(), group_id=None, content={"type": "invite"}, is_read=False, status=InboxMessageStatus.PENDING, created_by=uuid4(), ) message.created_at = datetime(2026, 3, 30, 7, 0, tzinfo=UTC) message.updated_at = datetime(2026, 3, 30, 7, 0, tzinfo=UTC) stream_id = await realtime.publish_inbox_message_created(message) assert stream_id == "1743313300000-0" assert fake_redis.last_stream == f"inbox:events:{message.recipient_id}" assert fake_redis.last_payload is not None assert '"op":"created"' in fake_redis.last_payload @pytest.mark.asyncio async def test_read_inbox_events_decodes_rows(monkeypatch) -> None: fake_redis = _FakeRedis() async def _fake_get_redis(): return fake_redis monkeypatch.setattr(realtime, "get_or_init_redis_client", _fake_get_redis) rows = await realtime.read_inbox_events( recipient_id=uuid4(), last_event_id=None, ) assert len(rows) == 1 assert rows[0]["id"] == "1743313300000-0" assert rows[0]["event"]["event_type"] == "INBOX_MESSAGE_CREATED" @pytest.mark.asyncio async def test_read_inbox_events_handles_redis_timeout(monkeypatch) -> None: class _TimeoutRedis(_FakeRedis): async def xread(self, _streams: dict[str, str], count: int, block: int): del _streams, count, block raise TimeoutError("read timeout") fake_redis = _TimeoutRedis() async def _fake_get_redis(): return fake_redis monkeypatch.setattr(realtime, "get_or_init_redis_client", _fake_get_redis) rows = await realtime.read_inbox_events(recipient_id=uuid4(), last_event_id=None) assert rows == []