from __future__ import annotations import asyncio import pytest from core.config.settings import RedisSettings from services.base.redis import RedisService, get_or_init_redis_client, redis_service class _FakeRedisClient: def __init__(self) -> None: self.closed = False async def ping(self) -> bool: return True async def info(self) -> dict[str, object]: return { "redis_version": "7.2", "connected_clients": 1, "used_memory_human": "1M", "uptime_in_seconds": 10, } async def aclose(self) -> None: self.closed = True @pytest.mark.asyncio async def test_initialize_success(monkeypatch: pytest.MonkeyPatch) -> None: service = RedisService(settings=RedisSettings(host="localhost", port=6379)) def _build_client(_: RedisService) -> _FakeRedisClient: return _FakeRedisClient() monkeypatch.setattr(RedisService, "_build_client", _build_client) result = await service.initialize() assert result is True assert service.is_initialized is True health = await service.health_check() assert health["status"] == "healthy" @pytest.mark.asyncio async def test_initialize_failure(monkeypatch: pytest.MonkeyPatch) -> None: service = RedisService(settings=RedisSettings(host="localhost", port=6379)) def _build_client(_: RedisService) -> _FakeRedisClient: raise RuntimeError("boom") monkeypatch.setattr(RedisService, "_build_client", _build_client) result = await service.initialize() assert result is False assert service.is_initialized is False @pytest.mark.asyncio async def test_close_is_idempotent() -> None: service = RedisService(settings=RedisSettings(host="localhost", port=6379)) assert await service.close() is True assert service.is_initialized is False @pytest.mark.asyncio async def test_health_check_uninitialized() -> None: service = RedisService(settings=RedisSettings(host="localhost", port=6379)) health = await service.health_check() assert health["status"] == "unhealthy" @pytest.mark.asyncio async def test_close_closes_client(monkeypatch: pytest.MonkeyPatch) -> None: service = RedisService(settings=RedisSettings(host="localhost", port=6379)) client = _FakeRedisClient() def _build_client(_: RedisService) -> _FakeRedisClient: return client monkeypatch.setattr(RedisService, "_build_client", _build_client) assert await service.initialize() is True assert await service.close() is True assert client.closed is True assert service.is_initialized is False def test_get_client_raises_before_init() -> None: service = RedisService(settings=RedisSettings(host="localhost", port=6379)) with pytest.raises(RuntimeError): service.get_client() @pytest.mark.asyncio async def test_get_or_init_redis_client_initializes_when_needed( monkeypatch: pytest.MonkeyPatch, ) -> None: fake_client = _FakeRedisClient() async def _fake_initialize() -> bool: return True monkeypatch.setattr( type(redis_service), "is_initialized", property(lambda _: False) ) monkeypatch.setattr(redis_service, "initialize", _fake_initialize) monkeypatch.setattr(redis_service, "get_client", lambda: fake_client) client = await get_or_init_redis_client() assert client is fake_client @pytest.mark.asyncio async def test_get_or_init_redis_client_raises_when_init_fails( monkeypatch: pytest.MonkeyPatch, ) -> None: async def _fake_initialize() -> bool: return False monkeypatch.setattr( type(redis_service), "is_initialized", property(lambda _: False) ) monkeypatch.setattr(redis_service, "initialize", _fake_initialize) with pytest.raises(RuntimeError, match="Redis service initialization failed"): await get_or_init_redis_client() @pytest.mark.asyncio async def test_get_or_init_redis_client_reinitializes_when_event_loop_changes( monkeypatch: pytest.MonkeyPatch, ) -> None: stale_client = _FakeRedisClient() fresh_client = _FakeRedisClient() call_count = {"initialize": 0} async def _fake_initialize() -> bool: call_count["initialize"] += 1 return True class _Loop: pass loop_obj = _Loop() monkeypatch.setattr(asyncio, "get_running_loop", lambda: loop_obj) monkeypatch.setattr(redis_service, "initialize", _fake_initialize) monkeypatch.setattr(redis_service, "get_client", lambda: fresh_client) monkeypatch.setattr(redis_service, "_client", stale_client, raising=False) monkeypatch.setattr(redis_service, "_loop_id", 123, raising=False) monkeypatch.setattr(redis_service, "_initialized", True, raising=False) client = await get_or_init_redis_client() assert call_count["initialize"] == 1 assert client is fresh_client