Files
social-app/backend/tests/unit/v1/schedule_items/test_service.py
T

543 lines
16 KiB
Python

from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
from uuid import UUID, uuid4
import pytest
from core.http.errors import ApiProblemError
from sqlalchemy.exc import SQLAlchemyError
from core.auth.models import CurrentUser
from models.schedule_items import (
ScheduleItem,
ScheduleItemSourceType,
ScheduleItemStatus,
)
from v1.schedule_items.schemas import (
ScheduleItemCreateRequest,
ScheduleItemListRequest,
ScheduleItemMetadata,
ScheduleItemUpdateRequest,
)
from v1.schedule_items.service import ScheduleItemService
def _create_mock_schedule_item(
item_id: UUID = uuid4(),
owner_id: UUID = UUID("00000000-0000-0000-0000-000000000001"),
title: str = "Test Event",
) -> ScheduleItem:
item = MagicMock(spec=ScheduleItem)
item.id = item_id
item.owner_id = owner_id
item.title = title
item.description = None
item.start_at = datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc)
item.end_at = datetime(2026, 2, 28, 17, 0, 0, tzinfo=timezone.utc)
item.timezone = "UTC"
item.extra_metadata = {}
item.source_type = ScheduleItemSourceType.MANUAL
item.status = ScheduleItemStatus.ACTIVE
item.created_at = datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc)
item.updated_at = datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc)
return item
class FakeRepo:
def __init__(self, item: ScheduleItem | None) -> None:
self._item = item
self.archive_expired_called = 0
async def get_by_item_id(
self, item_id: UUID, owner_id: UUID
) -> ScheduleItem | None:
if self._item and item_id == self._item.id:
return self._item
return None
async def get_by_id(self, entity_id: UUID) -> ScheduleItem | None:
if self._item and entity_id == self._item.id:
return self._item
return None
async def create(self, data: dict) -> ScheduleItem:
return _create_mock_schedule_item(
owner_id=data["owner_id"],
title=data["title"],
)
async def update_by_item_id(
self, item_id: UUID, owner_id: UUID, data: dict
) -> ScheduleItem | None:
if not self._item or item_id != self._item.id:
return None
if "title" in data:
self._item.title = data["title"]
return self._item
async def delete_by_item_id(
self, item_id: UUID, owner_id: UUID
) -> ScheduleItem | None:
if not self._item or item_id != self._item.id:
return None
return self._item
async def list_by_date_range(
self, owner_id: UUID, start_at: datetime, end_at: datetime
) -> list[ScheduleItem]:
return [self._item] if self._item else []
async def list_paginated(
self,
owner_id: UUID,
*,
page: int,
page_size: int,
query: str | None = None,
) -> tuple[list[ScheduleItem], int]:
del owner_id, page, page_size, query
return ([self._item] if self._item else [], 1 if self._item else 0)
async def create_subscription(self, data: dict):
del data
return MagicMock()
async def list_subscribed_items_by_date_range(
self,
subscriber_id: UUID,
start_at: datetime,
end_at: datetime,
):
del subscriber_id, start_at, end_at
if self._item is None:
return []
subscription = MagicMock()
subscription.permission = 1
return [(self._item, subscription)]
async def archive_expired_subscribed_items(
self,
subscriber_id: UUID,
now_at: datetime,
) -> int:
del subscriber_id, now_at
self.archive_expired_called += 1
return 0
async def get_user_subscriptions(self, subscriber_id: UUID):
del subscriber_id
return []
async def get_subscriptions_by_item_id(self, item_id: UUID):
del item_id
return []
async def get_subscription(self, item_id: UUID, subscriber_id: UUID):
del item_id, subscriber_id
return None
async def update_subscription_status(
self, item_id: UUID, subscriber_id: UUID, status
):
del item_id, subscriber_id, status
async def delete_subscriptions_by_item_id(self, item_id: UUID):
del item_id
@pytest.fixture
def mock_session() -> AsyncMock:
session = AsyncMock()
session.commit = AsyncMock()
session.rollback = AsyncMock()
return session
@pytest.fixture
def mock_inbox_repository() -> MagicMock:
return MagicMock()
@pytest.mark.asyncio
async def test_create_success(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
request = ScheduleItemCreateRequest(
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
)
service = ScheduleItemService(
repository=FakeRepo(None),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
result = await service.create(request)
assert result.title == "Test Event"
mock_session.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_create_invalid_end_at(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
request = ScheduleItemCreateRequest(
title="Test Event",
start_at=datetime(2026, 2, 28, 17, 0, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
)
service = ScheduleItemService(
repository=FakeRepo(None),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
with pytest.raises(ApiProblemError) as exc_info:
await service.create(request)
assert exc_info.value.status_code == 400
@pytest.mark.asyncio
async def test_get_by_id_success(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
result = await service.get_by_id(item.id)
assert result.id == item.id
@pytest.mark.asyncio
async def test_get_by_id_not_found(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
service = ScheduleItemService(
repository=FakeRepo(None),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
with pytest.raises(ApiProblemError) as exc_info:
await service.get_by_id(uuid4())
assert exc_info.value.status_code == 404
@pytest.mark.asyncio
async def test_update_success(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
result = await service.update(item.id, ScheduleItemUpdateRequest(title="Updated"))
assert result.title == "Updated"
@pytest.mark.asyncio
async def test_delete_success(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
await service.delete(item.id)
mock_session.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_create_maps_metadata_to_extra_metadata(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
captured: dict | None = None
class CaptureRepo(FakeRepo):
async def create(self, data: dict) -> ScheduleItem:
nonlocal captured
captured = data
return _create_mock_schedule_item(
owner_id=data["owner_id"], title=data["title"]
)
request = ScheduleItemCreateRequest(
title="Roadmap",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
metadata=ScheduleItemMetadata(
location="会议室A",
color="#4F46E5",
reminder_minutes=15,
version=1,
),
)
service = ScheduleItemService(
repository=CaptureRepo(None),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
await service.create(request)
assert captured is not None
assert "extra_metadata" in captured
assert captured["extra_metadata"]["location"] == "会议室A"
assert captured["extra_metadata"]["reminder_minutes"] == 15
assert "metadata" not in captured
@pytest.mark.asyncio
async def test_update_maps_metadata_to_extra_metadata(
mock_session: AsyncMock, mock_inbox_repository: MagicMock
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
captured: dict | None = None
class CaptureRepo(FakeRepo):
async def update_by_item_id(
self, item_id: UUID, owner_id: UUID, data: dict
) -> ScheduleItem | None:
nonlocal captured
captured = data
return await super().update_by_item_id(item_id, owner_id, data)
service = ScheduleItemService(
repository=CaptureRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
await service.update(
item.id,
ScheduleItemUpdateRequest(
metadata=ScheduleItemMetadata(
location="线上会议",
color="#3B82F6",
reminder_minutes=30,
version=1,
)
),
)
assert captured is not None
assert "extra_metadata" in captured
assert captured["extra_metadata"]["location"] == "线上会议"
assert captured["extra_metadata"]["reminder_minutes"] == 30
assert "metadata" not in captured
@pytest.mark.asyncio
async def test_update_maps_null_metadata_to_extra_metadata_null(
mock_session: AsyncMock,
mock_inbox_repository: MagicMock,
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
captured: dict | None = None
class CaptureRepo(FakeRepo):
async def update_by_item_id(
self, item_id: UUID, owner_id: UUID, data: dict
) -> ScheduleItem | None:
nonlocal captured
captured = data
return await super().update_by_item_id(item_id, owner_id, data)
service = ScheduleItemService(
repository=CaptureRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
await service.update(
item.id,
ScheduleItemUpdateRequest(metadata=None),
)
assert captured is not None
assert "extra_metadata" in captured
assert captured["extra_metadata"] is None
assert "metadata" not in captured
@pytest.mark.asyncio
async def test_list_by_date_range_archives_expired_before_query(
mock_session: AsyncMock,
mock_inbox_repository: MagicMock,
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
repo = FakeRepo(item)
service = ScheduleItemService(
repository=repo,
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
await service.list_by_date_range(
request=ScheduleItemListRequest(
start_at=datetime(2026, 2, 1, 0, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 3, 1, 0, 0, tzinfo=timezone.utc),
),
)
assert repo.archive_expired_called == 1
@pytest.mark.asyncio
async def test_list_by_date_range_commits_when_archived_changed(
mock_session: AsyncMock,
mock_inbox_repository: MagicMock,
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
class ArchiveRepo(FakeRepo):
async def archive_expired_subscribed_items(
self,
subscriber_id: UUID,
now_at: datetime,
) -> int:
del subscriber_id, now_at
self.archive_expired_called += 1
return 2
repo = ArchiveRepo(item)
service = ScheduleItemService(
repository=repo,
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
await service.list_by_date_range(
request=ScheduleItemListRequest(
start_at=datetime(2026, 2, 1, 0, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 3, 1, 0, 0, tzinfo=timezone.utc),
),
)
mock_session.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_list_by_date_range_rolls_back_when_query_fails_after_archive(
mock_session: AsyncMock,
mock_inbox_repository: MagicMock,
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
class FailingRepo(FakeRepo):
async def archive_expired_subscribed_items(
self,
subscriber_id: UUID,
now_at: datetime,
) -> int:
del subscriber_id, now_at
return 1
async def list_subscribed_items_by_date_range(
self,
subscriber_id: UUID,
start_at: datetime,
end_at: datetime,
):
del subscriber_id, start_at, end_at
raise SQLAlchemyError("db unavailable")
service = ScheduleItemService(
repository=FailingRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
with pytest.raises(ApiProblemError) as exc_info:
await service.list_by_date_range(
request=ScheduleItemListRequest(
start_at=datetime(2026, 2, 1, 0, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 3, 1, 0, 0, tzinfo=timezone.utc),
),
)
assert exc_info.value.status_code == 503
mock_session.rollback.assert_awaited_once()
mock_session.commit.assert_not_awaited()
@pytest.mark.asyncio
async def test_get_by_id_maps_legacy_completed_to_archived(
mock_session: AsyncMock,
mock_inbox_repository: MagicMock,
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
setattr(item, "status", "completed")
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
result = await service.get_by_id(item.id)
assert result.status == ScheduleItemStatus.ARCHIVED
@pytest.mark.asyncio
async def test_get_by_id_maps_legacy_canceled_to_archived(
mock_session: AsyncMock,
mock_inbox_repository: MagicMock,
) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
setattr(item, "status", "canceled")
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
inbox_repository=mock_inbox_repository,
)
result = await service.get_by_id(item.id)
assert result.status == ScheduleItemStatus.ARCHIVED