from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock from uuid import UUID, uuid4 import pytest from fastapi import HTTPException from core.auth.models import CurrentUser from models.schedule_items import ( ScheduleItem, ScheduleItemSourceType, ScheduleItemStatus, ) from v1.schedule_items.schemas import ( ScheduleItemCreateRequest, ScheduleItemMetadata, ScheduleItemUpdateRequest, ) from v1.schedule_items.service import ScheduleItemService def _create_mock_schedule_item( item_id: UUID = uuid4(), owner_id: UUID = UUID("00000000-0000-0000-0000-000000000001"), title: str = "Test Event", ) -> ScheduleItem: item = MagicMock(spec=ScheduleItem) item.id = item_id item.owner_id = owner_id item.title = title item.description = None item.start_at = datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc) item.end_at = datetime(2026, 2, 28, 17, 0, 0, tzinfo=timezone.utc) item.timezone = "UTC" item.extra_metadata = {} item.source_type = ScheduleItemSourceType.MANUAL item.status = ScheduleItemStatus.ACTIVE item.created_at = datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc) item.updated_at = datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc) return item class FakeRepo: def __init__(self, item: ScheduleItem | None) -> None: self._item = item async def get_by_item_id( self, item_id: UUID, owner_id: UUID ) -> ScheduleItem | None: if self._item and item_id == self._item.id: return self._item return None async def get_by_id(self, entity_id: UUID) -> ScheduleItem | None: if self._item and entity_id == self._item.id: return self._item return None async def create(self, data: dict) -> ScheduleItem: return _create_mock_schedule_item( owner_id=data["owner_id"], title=data["title"], ) async def update_by_item_id( self, item_id: UUID, owner_id: UUID, data: dict ) -> ScheduleItem | None: if not self._item or item_id != self._item.id: return None if "title" in data: self._item.title = data["title"] return self._item async def delete_by_item_id( self, item_id: UUID, owner_id: UUID ) -> ScheduleItem | None: if not self._item or item_id != self._item.id: return None return self._item async def list_by_date_range( self, owner_id: UUID, start_at: datetime, end_at: datetime ) -> list[ScheduleItem]: return [self._item] if self._item else [] async def list_paginated( self, owner_id: UUID, *, page: int, page_size: int, ) -> tuple[list[ScheduleItem], int]: del owner_id, page, page_size return ([self._item] if self._item else [], 1 if self._item else 0) async def create_subscription(self, data: dict): del data return MagicMock() async def list_subscribed_items_by_date_range( self, subscriber_id: UUID, start_at: datetime, end_at: datetime, ): del subscriber_id, start_at, end_at return [] async def get_user_subscriptions(self, subscriber_id: UUID): del subscriber_id return [] async def get_subscriptions_by_item_id(self, item_id: UUID): del item_id return [] async def get_subscription(self, item_id: UUID, subscriber_id: UUID): del item_id, subscriber_id return None async def update_subscription_status( self, item_id: UUID, subscriber_id: UUID, status ): del item_id, subscriber_id, status async def delete_subscriptions_by_item_id(self, item_id: UUID): del item_id @pytest.fixture def mock_session() -> AsyncMock: session = AsyncMock() session.commit = AsyncMock() session.rollback = AsyncMock() return session @pytest.fixture def mock_inbox_repository() -> MagicMock: return MagicMock() @pytest.mark.asyncio async def test_create_success( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") request = ScheduleItemCreateRequest( title="Test Event", start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc), ) service = ScheduleItemService( repository=FakeRepo(None), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) result = await service.create(request) assert result.title == "Test Event" mock_session.commit.assert_awaited_once() @pytest.mark.asyncio async def test_create_invalid_end_at( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") request = ScheduleItemCreateRequest( title="Test Event", start_at=datetime(2026, 2, 28, 17, 0, 0, tzinfo=timezone.utc), end_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc), ) service = ScheduleItemService( repository=FakeRepo(None), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) with pytest.raises(HTTPException) as exc_info: await service.create(request) assert exc_info.value.status_code == 400 @pytest.mark.asyncio async def test_get_by_id_success( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") item = _create_mock_schedule_item() service = ScheduleItemService( repository=FakeRepo(item), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) result = await service.get_by_id(item.id) assert result.id == item.id @pytest.mark.asyncio async def test_get_by_id_not_found( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") service = ScheduleItemService( repository=FakeRepo(None), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) with pytest.raises(HTTPException) as exc_info: await service.get_by_id(uuid4()) assert exc_info.value.status_code == 404 @pytest.mark.asyncio async def test_update_success( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") item = _create_mock_schedule_item() service = ScheduleItemService( repository=FakeRepo(item), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) result = await service.update(item.id, ScheduleItemUpdateRequest(title="Updated")) assert result.title == "Updated" @pytest.mark.asyncio async def test_delete_success( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") item = _create_mock_schedule_item() service = ScheduleItemService( repository=FakeRepo(item), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) await service.delete(item.id) mock_session.commit.assert_awaited_once() @pytest.mark.asyncio async def test_create_maps_metadata_to_extra_metadata( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") captured: dict | None = None class CaptureRepo(FakeRepo): async def create(self, data: dict) -> ScheduleItem: nonlocal captured captured = data return _create_mock_schedule_item( owner_id=data["owner_id"], title=data["title"] ) request = ScheduleItemCreateRequest( title="Roadmap", start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc), metadata=ScheduleItemMetadata( location="会议室A", color="#4F46E5", reminder_minutes=15, version=1, ), ) service = ScheduleItemService( repository=CaptureRepo(None), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) await service.create(request) assert captured is not None assert "extra_metadata" in captured assert captured["extra_metadata"]["location"] == "会议室A" assert captured["extra_metadata"]["reminder_minutes"] == 15 assert "metadata" not in captured @pytest.mark.asyncio async def test_update_maps_metadata_to_extra_metadata( mock_session: AsyncMock, mock_inbox_repository: MagicMock ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") item = _create_mock_schedule_item() captured: dict | None = None class CaptureRepo(FakeRepo): async def update_by_item_id( self, item_id: UUID, owner_id: UUID, data: dict ) -> ScheduleItem | None: nonlocal captured captured = data return await super().update_by_item_id(item_id, owner_id, data) service = ScheduleItemService( repository=CaptureRepo(item), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) await service.update( item.id, ScheduleItemUpdateRequest( metadata=ScheduleItemMetadata( location="线上会议", color="#3B82F6", reminder_minutes=30, version=1, ) ), ) assert captured is not None assert "extra_metadata" in captured assert captured["extra_metadata"]["location"] == "线上会议" assert captured["extra_metadata"]["reminder_minutes"] == 30 assert "metadata" not in captured @pytest.mark.asyncio async def test_update_maps_null_metadata_to_extra_metadata_null( mock_session: AsyncMock, mock_inbox_repository: MagicMock, ) -> None: user_id = UUID("00000000-0000-0000-0000-000000000001") item = _create_mock_schedule_item() captured: dict | None = None class CaptureRepo(FakeRepo): async def update_by_item_id( self, item_id: UUID, owner_id: UUID, data: dict ) -> ScheduleItem | None: nonlocal captured captured = data return await super().update_by_item_id(item_id, owner_id, data) service = ScheduleItemService( repository=CaptureRepo(item), session=mock_session, current_user=CurrentUser(id=user_id), inbox_repository=mock_inbox_repository, ) await service.update( item.id, ScheduleItemUpdateRequest(metadata=None), ) assert captured is not None assert "extra_metadata" in captured assert captured["extra_metadata"] is None assert "metadata" not in captured