from __future__ import annotations from datetime import datetime, time, timezone from types import SimpleNamespace from uuid import uuid4 import pytest from models.automation_jobs import AutomationJobStatus, ScheduleType from v1.automation_jobs.repository import AutomationJobsRepository class _ExecuteResult: def __init__(self, value: object) -> None: self._value = value def scalar_one_or_none(self) -> object: return self._value def scalar_one(self) -> int: return self._value # type: ignore[return-value] class _ScalarRows: def __init__(self, rows: list[object]) -> None: self._rows = rows def all(self) -> list[object]: return self._rows class _ExecuteRowsResult: def __init__(self, rows: list[object]) -> None: self._rows = rows def scalars(self) -> _ScalarRows: return _ScalarRows(self._rows) class _FakeSession: def __init__(self) -> None: self.added: list[object] = [] self.flushed = False self._execute_result: object = None self._return_rows: bool = False def set_execute_result(self, value: object) -> None: self._execute_result = value self._return_rows = isinstance(value, list) async def execute(self, stmt): # noqa: ANN001 del stmt if self._return_rows: return _ExecuteRowsResult(self._execute_result) return _ExecuteResult(self._execute_result) def add(self, obj: object) -> None: self.added.append(obj) async def flush(self) -> None: self.flushed = True @pytest.fixture def fake_session() -> _FakeSession: return _FakeSession() @pytest.fixture def repository(fake_session: _FakeSession) -> AutomationJobsRepository: return AutomationJobsRepository(session=fake_session) # type: ignore[arg-type] @pytest.fixture def sample_job() -> SimpleNamespace: return SimpleNamespace( id=uuid4(), owner_id=uuid4(), bootstrap_key=None, title="Test Job", config={"input_template": "Hello {name}"}, schedule_type=ScheduleType.DAILY, run_at=datetime(2026, 3, 23, 0, 0, tzinfo=timezone.utc), next_run_at=datetime(2026, 3, 24, 0, 0, tzinfo=timezone.utc), timezone="UTC", status=AutomationJobStatus.ACTIVE, created_by=uuid4(), deleted_at=None, ) @pytest.mark.asyncio async def test_list_by_owner_returns_jobs( repository: AutomationJobsRepository, fake_session: _FakeSession, sample_job: SimpleNamespace, ) -> None: fake_session.set_execute_result([sample_job]) owner_id = uuid4() jobs = await repository.list_by_owner(owner_id) assert len(jobs) == 1 assert jobs[0].title == "Test Job" @pytest.mark.asyncio async def test_list_by_owner_returns_empty_list( repository: AutomationJobsRepository, fake_session: _FakeSession, ) -> None: fake_session.set_execute_result([]) owner_id = uuid4() jobs = await repository.list_by_owner(owner_id) assert jobs == [] @pytest.mark.asyncio async def test_get_by_id_returns_job( repository: AutomationJobsRepository, fake_session: _FakeSession, sample_job: SimpleNamespace, ) -> None: fake_session.set_execute_result(sample_job) job_id = uuid4() job = await repository.get_by_id(job_id) assert job is not None assert job.title == "Test Job" @pytest.mark.asyncio async def test_get_by_id_returns_none_when_not_found( repository: AutomationJobsRepository, fake_session: _FakeSession, ) -> None: fake_session.set_execute_result(None) job_id = uuid4() job = await repository.get_by_id(job_id) assert job is None @pytest.mark.asyncio async def test_count_user_jobs_returns_count( repository: AutomationJobsRepository, fake_session: _FakeSession, ) -> None: fake_session.set_execute_result(5) owner_id = uuid4() count = await repository.count_user_jobs(owner_id) assert count == 5 @pytest.mark.asyncio async def test_count_user_jobs_returns_zero_when_none( repository: AutomationJobsRepository, fake_session: _FakeSession, ) -> None: fake_session.set_execute_result(0) owner_id = uuid4() count = await repository.count_user_jobs(owner_id) assert count == 0 @pytest.mark.asyncio async def test_create_job( repository: AutomationJobsRepository, fake_session: _FakeSession, ) -> None: from v1.automation_jobs.schemas import AutomationJobCreateRequest from schemas.automation import AutomationJobConfig owner_id = uuid4() request = AutomationJobCreateRequest( title="New Job", schedule_type=ScheduleType.DAILY, run_at=time(0, 0), timezone="UTC", status=AutomationJobStatus.ACTIVE, config=AutomationJobConfig(input_template="Test"), ) job = await repository.create(owner_id, request) assert job.title == "New Job" assert job.owner_id == owner_id assert job.created_by == owner_id assert job.bootstrap_key is None assert job.schedule_type == ScheduleType.DAILY assert fake_session.flushed is True assert len(fake_session.added) == 1 @pytest.mark.asyncio async def test_soft_delete( repository: AutomationJobsRepository, fake_session: _FakeSession, ) -> None: job_id = uuid4() fake_session.set_execute_result(None) await repository.soft_delete(job_id) assert fake_session.flushed is True @pytest.mark.asyncio async def test_update_job_title( repository: AutomationJobsRepository, fake_session: _FakeSession, sample_job: SimpleNamespace, ) -> None: from v1.automation_jobs.schemas import AutomationJobUpdateRequest sample_job.title = "Updated Title" fake_session.set_execute_result(sample_job) request = AutomationJobUpdateRequest(title="Updated Title") job = await repository.update(sample_job.id, request) assert job is not None assert job.title == "Updated Title" @pytest.mark.asyncio async def test_update_job_run_at_recomputes_next_run_at( repository: AutomationJobsRepository, fake_session: _FakeSession, sample_job: SimpleNamespace, ) -> None: from v1.automation_jobs.schemas import AutomationJobUpdateRequest fake_session.set_execute_result(sample_job) request = AutomationJobUpdateRequest( run_at=time(12, 0), timezone="UTC", ) job = await repository.update(sample_job.id, request) assert job is not None assert fake_session.flushed is True @pytest.mark.asyncio async def test_update_returns_none_when_job_not_found( repository: AutomationJobsRepository, fake_session: _FakeSession, ) -> None: from v1.automation_jobs.schemas import AutomationJobUpdateRequest fake_session.set_execute_result(None) request = AutomationJobUpdateRequest(title="New Title") job = await repository.update(uuid4(), request) assert job is None @pytest.mark.asyncio async def test_update_with_no_changes_returns_existing_job( repository: AutomationJobsRepository, fake_session: _FakeSession, sample_job: SimpleNamespace, ) -> None: from v1.automation_jobs.schemas import AutomationJobUpdateRequest fake_session.set_execute_result(sample_job) request = AutomationJobUpdateRequest() job = await repository.update(sample_job.id, request) assert job is not None assert job.title == "Test Job"