from __future__ import annotations import asyncio from dataclasses import dataclass from datetime import UTC, datetime import inspect import json from typing import Any from uuid import UUID, uuid4 from redis.exceptions import TimeoutError as RedisTimeoutError from core.config.settings import config from core.logging import get_logger from models.inbox_messages import InboxMessage from services.base.redis import get_or_init_redis_client logger = get_logger("v1.inbox_messages.realtime") INBOX_STREAM_PREFIX = "inbox:events" EVENT_MESSAGE_CREATED = "INBOX_MESSAGE_CREATED" EVENT_MESSAGE_READ_CHANGED = "INBOX_MESSAGE_READ_CHANGED" EVENT_MESSAGE_STATUS_CHANGED = "INBOX_MESSAGE_STATUS_CHANGED" EVENT_SNAPSHOT_REQUIRED = "INBOX_SNAPSHOT_REQUIRED" @dataclass(frozen=True) class InboxMessageEventSnapshot: message_id: UUID recipient_id: UUID sender_id: UUID | None message_type: str schedule_item_id: UUID | None friendship_id: UUID | None content: dict[str, Any] | None is_read: bool status: str created_at: datetime occurred_at: datetime def snapshot_from_inbox_message(message: InboxMessage) -> InboxMessageEventSnapshot: message_type = ( message.message_type.value if hasattr(message.message_type, "value") else str(message.message_type) ) status = ( message.status.value if hasattr(message.status, "value") else str(message.status) ) if status in {"None", ""}: status = "pending" created_at = ( message.created_at if isinstance(message.created_at, datetime) else datetime.now(UTC) ) occurred_at = ( message.updated_at if isinstance(message.updated_at, datetime) else created_at ) message_id = message.id if isinstance(message.id, UUID) else uuid4() return InboxMessageEventSnapshot( message_id=message_id, recipient_id=message.recipient_id, sender_id=message.sender_id, message_type=message_type, schedule_item_id=message.schedule_item_id, friendship_id=message.friendship_id, content=message.content, is_read=bool(message.is_read), status=status, created_at=created_at, occurred_at=occurred_at, ) def to_inbox_sse_event(stream_id: str, event_type: str, payload: dict[str, Any]) -> str: safe_stream_id = str(stream_id).replace("\r", "").replace("\n", "") safe_event_type = str(event_type).replace("\r", "").replace("\n", "") data = json.dumps(payload, ensure_ascii=True, separators=(",", ":")) return f"id: {safe_stream_id}\nevent: {safe_event_type}\ndata: {data}\n\n" def _stream_name(recipient_id: UUID) -> str: return f"{INBOX_STREAM_PREFIX}:{recipient_id}" def _to_epoch_ms(value: datetime) -> int: normalized = value.astimezone(UTC) return int(normalized.timestamp() * 1000) def _resolve_occurred_at(snapshot: InboxMessageEventSnapshot) -> datetime: if isinstance(snapshot.occurred_at, datetime): return snapshot.occurred_at if isinstance(snapshot.created_at, datetime): return snapshot.created_at return datetime.now(UTC) def _safe_stream_block_ms(requested_ms: int) -> int: try: socket_timeout_ms = max(int(float(config.redis.socket_timeout) * 1000), 1) except (TypeError, ValueError): socket_timeout_ms = 5000 safe_max = max(socket_timeout_ms - 100, 1) return max(1, min(int(requested_ms), safe_max)) def _message_to_payload(snapshot: InboxMessageEventSnapshot) -> dict[str, Any]: return { "id": str(snapshot.message_id), "recipient_id": str(snapshot.recipient_id), "sender_id": str(snapshot.sender_id) if snapshot.sender_id else None, "message_type": snapshot.message_type, "schedule_item_id": str(snapshot.schedule_item_id) if snapshot.schedule_item_id else None, "friendship_id": str(snapshot.friendship_id) if snapshot.friendship_id else None, "content": snapshot.content, "is_read": bool(snapshot.is_read), "status": snapshot.status, "created_at": snapshot.created_at.isoformat(), } async def _publish_event(recipient_id: UUID, payload: dict[str, Any]) -> str: redis = await get_or_init_redis_client() stream_name = _stream_name(recipient_id) event_json = json.dumps(payload, ensure_ascii=True, separators=(",", ":")) result = redis.xadd(stream_name, {"event": event_json}) if inspect.isawaitable(result): return str(await result) return str(result) async def publish_inbox_message_created( message: InboxMessage | InboxMessageEventSnapshot, ) -> str: snapshot = ( message if isinstance(message, InboxMessageEventSnapshot) else snapshot_from_inbox_message(message) ) occurred_at = _resolve_occurred_at(snapshot) version = _to_epoch_ms(occurred_at) payload = { "event_id": str(uuid4()), "occurred_at": occurred_at.isoformat(), "user_id": str(snapshot.recipient_id), "message_id": str(snapshot.message_id), "event_type": EVENT_MESSAGE_CREATED, "op": "created", "version": version, "data": {"message": _message_to_payload(snapshot)}, } return await _publish_event(snapshot.recipient_id, payload) async def publish_inbox_message_read_changed( message: InboxMessage | InboxMessageEventSnapshot, ) -> str: snapshot = ( message if isinstance(message, InboxMessageEventSnapshot) else snapshot_from_inbox_message(message) ) occurred_at = _resolve_occurred_at(snapshot) payload = { "event_id": str(uuid4()), "occurred_at": occurred_at.isoformat(), "user_id": str(snapshot.recipient_id), "message_id": str(snapshot.message_id), "event_type": EVENT_MESSAGE_READ_CHANGED, "op": "read_changed", "version": _to_epoch_ms(occurred_at), "data": {"is_read": bool(snapshot.is_read)}, } return await _publish_event(snapshot.recipient_id, payload) async def publish_inbox_message_status_changed( message: InboxMessage | InboxMessageEventSnapshot, ) -> str: snapshot = ( message if isinstance(message, InboxMessageEventSnapshot) else snapshot_from_inbox_message(message) ) occurred_at = _resolve_occurred_at(snapshot) payload = { "event_id": str(uuid4()), "occurred_at": occurred_at.isoformat(), "user_id": str(snapshot.recipient_id), "message_id": str(snapshot.message_id), "event_type": EVENT_MESSAGE_STATUS_CHANGED, "op": "status_changed", "version": _to_epoch_ms(occurred_at), "data": {"status": snapshot.status}, } return await _publish_event(snapshot.recipient_id, payload) async def publish_inbox_snapshot_required( *, recipient_id: UUID, message_id: UUID ) -> str: now = datetime.now(UTC) payload = { "event_id": str(uuid4()), "occurred_at": now.isoformat(), "user_id": str(recipient_id), "message_id": str(message_id), "event_type": EVENT_SNAPSHOT_REQUIRED, "op": "snapshot_required", "version": _to_epoch_ms(now), "data": {}, } return await _publish_event(recipient_id, payload) async def read_inbox_events( *, recipient_id: UUID, last_event_id: str | None, count: int = 100, block_ms: int = 5000, ) -> list[dict[str, Any]]: redis = await get_or_init_redis_client() stream = _stream_name(recipient_id) start_id = "0-0" if not last_event_id else last_event_id safe_block_ms = _safe_stream_block_ms(block_ms) try: raw = redis.xread({stream: start_id}, count=count, block=safe_block_ms) response = await raw if inspect.isawaitable(raw) else raw except (TimeoutError, asyncio.TimeoutError, RedisTimeoutError): return [] if not response: return [] first = response[0] if not isinstance(first, (list, tuple)) or len(first) != 2: return [] entries_raw = first[1] if not isinstance(entries_raw, list): return [] rows: list[dict[str, Any]] = [] for entry in entries_raw: if not isinstance(entry, (list, tuple)) or len(entry) != 2: continue entry_id_raw, fields = entry if isinstance(entry_id_raw, bytes): stream_id = entry_id_raw.decode("utf-8", errors="replace") elif isinstance(entry_id_raw, str): stream_id = entry_id_raw else: continue if not isinstance(fields, dict): continue payload_raw = fields.get("event") if isinstance(payload_raw, bytes): payload_raw = payload_raw.decode("utf-8", errors="replace") if not isinstance(payload_raw, str): continue try: payload = json.loads(payload_raw) except (TypeError, ValueError): logger.warning( "Discard malformed inbox stream payload", stream_id=stream_id ) continue if not isinstance(payload, dict): continue rows.append({"id": stream_id, "event": payload}) return rows