test: add calendar sharing tests and update API docs
This commit is contained in:
@@ -0,0 +1,156 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Callable
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from fastapi import HTTPException
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app import app
|
||||
from v1.inbox_messages.dependencies import get_inbox_message_service
|
||||
from v1.inbox_messages.schemas import (
|
||||
InboxMessageAcceptRequest,
|
||||
InboxMessageListRequest,
|
||||
InboxMessageResponse,
|
||||
InboxMessageStatus,
|
||||
InboxMessageType,
|
||||
)
|
||||
from v1.inbox_messages.service import InboxMessageService
|
||||
|
||||
|
||||
class FakeInboxMessageService:
|
||||
def __init__(
|
||||
self,
|
||||
messages: list[InboxMessageResponse],
|
||||
accepted: InboxMessageResponse,
|
||||
dismissed: InboxMessageResponse,
|
||||
) -> None:
|
||||
self._messages = messages
|
||||
self._accepted = accepted
|
||||
self._dismissed = dismissed
|
||||
|
||||
async def list_messages(
|
||||
self, request: InboxMessageListRequest
|
||||
) -> list[InboxMessageResponse]:
|
||||
if request.status is None:
|
||||
return self._messages
|
||||
return [
|
||||
message for message in self._messages if message.status == request.status
|
||||
]
|
||||
|
||||
async def accept_invitation(
|
||||
self,
|
||||
message_id: UUID,
|
||||
request: InboxMessageAcceptRequest,
|
||||
) -> InboxMessageResponse:
|
||||
if message_id != self._accepted.id:
|
||||
raise HTTPException(status_code=404, detail="Inbox message not found")
|
||||
if not request.permission_view:
|
||||
raise HTTPException(status_code=400, detail="permission_view is required")
|
||||
return self._accepted
|
||||
|
||||
async def dismiss_invitation(self, message_id: UUID) -> InboxMessageResponse:
|
||||
if message_id != self._dismissed.id:
|
||||
raise HTTPException(status_code=404, detail="Inbox message not found")
|
||||
return self._dismissed
|
||||
|
||||
|
||||
def _override_inbox_message_service(
|
||||
service: FakeInboxMessageService,
|
||||
) -> Callable[[], InboxMessageService]:
|
||||
def _get_service() -> InboxMessageService:
|
||||
return service # type: ignore[return-value]
|
||||
|
||||
return _get_service
|
||||
|
||||
|
||||
def _build_message(
|
||||
message_id: UUID,
|
||||
status: InboxMessageStatus,
|
||||
) -> InboxMessageResponse:
|
||||
return InboxMessageResponse(
|
||||
id=message_id,
|
||||
recipient_id=uuid4(),
|
||||
sender_id=uuid4(),
|
||||
message_type=InboxMessageType.CALENDAR,
|
||||
schedule_item_id=uuid4(),
|
||||
content='{"permission": 1}',
|
||||
is_read=False,
|
||||
status=status,
|
||||
created_at=datetime(2026, 2, 28, 9, 0, 0, tzinfo=timezone.utc),
|
||||
)
|
||||
|
||||
|
||||
def test_list_inbox_messages_returns_200() -> None:
|
||||
pending_message = _build_message(uuid4(), InboxMessageStatus.PENDING)
|
||||
accepted_message = _build_message(uuid4(), InboxMessageStatus.ACCEPTED)
|
||||
service = FakeInboxMessageService(
|
||||
messages=[pending_message, accepted_message],
|
||||
accepted=accepted_message,
|
||||
dismissed=_build_message(uuid4(), InboxMessageStatus.DISMISSED),
|
||||
)
|
||||
app.dependency_overrides[get_inbox_message_service] = (
|
||||
_override_inbox_message_service(service)
|
||||
)
|
||||
|
||||
client = TestClient(app)
|
||||
try:
|
||||
response = client.get("/api/v1/inbox/messages", params={"status": "pending"})
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert len(body) == 1
|
||||
assert body[0]["status"] == "pending"
|
||||
finally:
|
||||
app.dependency_overrides = {}
|
||||
|
||||
|
||||
def test_accept_inbox_message_returns_200() -> None:
|
||||
accepted_message = _build_message(uuid4(), InboxMessageStatus.ACCEPTED)
|
||||
service = FakeInboxMessageService(
|
||||
messages=[accepted_message],
|
||||
accepted=accepted_message,
|
||||
dismissed=_build_message(uuid4(), InboxMessageStatus.DISMISSED),
|
||||
)
|
||||
app.dependency_overrides[get_inbox_message_service] = (
|
||||
_override_inbox_message_service(service)
|
||||
)
|
||||
|
||||
client = TestClient(app)
|
||||
try:
|
||||
response = client.post(
|
||||
f"/api/v1/inbox/messages/{accepted_message.id}/accept",
|
||||
json={
|
||||
"permission_view": True,
|
||||
"permission_edit": True,
|
||||
"permission_invite": False,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert body["id"] == str(accepted_message.id)
|
||||
assert body["status"] == "accepted"
|
||||
finally:
|
||||
app.dependency_overrides = {}
|
||||
|
||||
|
||||
def test_dismiss_inbox_message_returns_200() -> None:
|
||||
dismissed_message = _build_message(uuid4(), InboxMessageStatus.DISMISSED)
|
||||
service = FakeInboxMessageService(
|
||||
messages=[dismissed_message],
|
||||
accepted=_build_message(uuid4(), InboxMessageStatus.ACCEPTED),
|
||||
dismissed=dismissed_message,
|
||||
)
|
||||
app.dependency_overrides[get_inbox_message_service] = (
|
||||
_override_inbox_message_service(service)
|
||||
)
|
||||
|
||||
client = TestClient(app)
|
||||
try:
|
||||
response = client.post(f"/api/v1/inbox/messages/{dismissed_message.id}/dismiss")
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert body["id"] == str(dismissed_message.id)
|
||||
assert body["status"] == "dismissed"
|
||||
finally:
|
||||
app.dependency_overrides = {}
|
||||
@@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Callable
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from fastapi import HTTPException
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app import app
|
||||
from v1.schedule_items.dependencies import get_schedule_item_service
|
||||
from v1.schedule_items.schemas import (
|
||||
ScheduleItemShareRequest,
|
||||
ScheduleItemShareResponse,
|
||||
)
|
||||
from v1.schedule_items.service import ScheduleItemService
|
||||
|
||||
|
||||
class FakeScheduleItemShareService:
|
||||
def __init__(self, item_id: UUID) -> None:
|
||||
self._item_id = item_id
|
||||
self.last_share_request: ScheduleItemShareRequest | None = None
|
||||
|
||||
async def share(
|
||||
self,
|
||||
item_id: UUID,
|
||||
request: ScheduleItemShareRequest,
|
||||
) -> ScheduleItemShareResponse:
|
||||
if item_id != self._item_id:
|
||||
raise HTTPException(status_code=404, detail="Schedule item not found")
|
||||
self.last_share_request = request
|
||||
return ScheduleItemShareResponse(message="Calendar invitation sent")
|
||||
|
||||
|
||||
def _override_schedule_item_service(
|
||||
service: FakeScheduleItemShareService,
|
||||
) -> Callable[[], ScheduleItemService]:
|
||||
def _get_service() -> ScheduleItemService:
|
||||
return service # type: ignore[return-value]
|
||||
|
||||
return _get_service
|
||||
|
||||
|
||||
def test_share_schedule_item_returns_200() -> None:
|
||||
item_id = uuid4()
|
||||
service = FakeScheduleItemShareService(item_id=item_id)
|
||||
app.dependency_overrides[get_schedule_item_service] = (
|
||||
_override_schedule_item_service(service)
|
||||
)
|
||||
|
||||
client = TestClient(app)
|
||||
try:
|
||||
response = client.post(
|
||||
f"/api/v1/schedule-items/{item_id}/share",
|
||||
json={
|
||||
"email": "friend@example.com",
|
||||
"permission_view": True,
|
||||
"permission_edit": False,
|
||||
"permission_invite": True,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert body["message"] == "Calendar invitation sent"
|
||||
assert service.last_share_request is not None
|
||||
assert service.last_share_request.email == "friend@example.com"
|
||||
assert service.last_share_request.permission_invite is True
|
||||
finally:
|
||||
app.dependency_overrides = {}
|
||||
@@ -0,0 +1,88 @@
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from models.inbox_messages import InboxMessageType
|
||||
from v1.inbox_messages.repository import SQLAlchemyInboxMessageRepository
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_adds_message_and_flushes() -> None:
|
||||
session = AsyncMock()
|
||||
session.add = MagicMock()
|
||||
repository = SQLAlchemyInboxMessageRepository(session)
|
||||
recipient_id = uuid4()
|
||||
|
||||
result = await repository.create(
|
||||
{
|
||||
"recipient_id": recipient_id,
|
||||
"sender_id": uuid4(),
|
||||
"message_type": InboxMessageType.CALENDAR,
|
||||
"schedule_item_id": uuid4(),
|
||||
"content": "invite",
|
||||
"created_by": uuid4(),
|
||||
}
|
||||
)
|
||||
|
||||
session.add.assert_called_once_with(result)
|
||||
session.flush.assert_awaited_once()
|
||||
assert result.recipient_id == recipient_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_by_id_returns_message_when_exists() -> None:
|
||||
session = AsyncMock()
|
||||
repository = SQLAlchemyInboxMessageRepository(session)
|
||||
expected = MagicMock()
|
||||
execute_result = MagicMock()
|
||||
execute_result.scalar_one_or_none.return_value = expected
|
||||
session.execute.return_value = execute_result
|
||||
|
||||
result = await repository.get_by_id(uuid4(), uuid4())
|
||||
|
||||
assert result is expected
|
||||
session.execute.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_by_recipient_returns_messages() -> None:
|
||||
session = AsyncMock()
|
||||
repository = SQLAlchemyInboxMessageRepository(session)
|
||||
message_one = MagicMock()
|
||||
message_two = MagicMock()
|
||||
execute_result = MagicMock()
|
||||
execute_result.scalars.return_value.all.return_value = [message_one, message_two]
|
||||
session.execute.return_value = execute_result
|
||||
|
||||
result = await repository.list_by_recipient(uuid4(), "pending")
|
||||
|
||||
assert result == [message_one, message_two]
|
||||
session.execute.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_status_returns_updated_message_and_flushes() -> None:
|
||||
session = AsyncMock()
|
||||
repository = SQLAlchemyInboxMessageRepository(session)
|
||||
updated = MagicMock()
|
||||
execute_result = MagicMock()
|
||||
execute_result.scalar_one_or_none.return_value = updated
|
||||
session.execute.return_value = execute_result
|
||||
|
||||
result = await repository.update_status(uuid4(), uuid4(), "dismissed")
|
||||
|
||||
assert result is updated
|
||||
session.execute.assert_awaited_once()
|
||||
session.flush.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_by_id_propagates_sqlalchemy_error() -> None:
|
||||
session = AsyncMock()
|
||||
repository = SQLAlchemyInboxMessageRepository(session)
|
||||
session.execute.side_effect = SQLAlchemyError("boom")
|
||||
|
||||
with pytest.raises(SQLAlchemyError):
|
||||
await repository.get_by_id(uuid4(), uuid4())
|
||||
@@ -0,0 +1,180 @@
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
|
||||
from core.auth.models import CurrentUser
|
||||
from models.inbox_messages import (
|
||||
InboxMessage,
|
||||
InboxMessageStatus as InboxMessageModelStatus,
|
||||
InboxMessageType as InboxMessageModelType,
|
||||
)
|
||||
from models.schedule_subscriptions import ScheduleSubscription, SubscriptionStatus
|
||||
from v1.inbox_messages.schemas import InboxMessageAcceptRequest, InboxMessageListRequest
|
||||
from v1.inbox_messages.service import InboxMessageService
|
||||
|
||||
|
||||
def _build_message(
|
||||
*,
|
||||
message_id: UUID,
|
||||
recipient_id: UUID,
|
||||
status: InboxMessageModelStatus = InboxMessageModelStatus.PENDING,
|
||||
message_type: InboxMessageModelType = InboxMessageModelType.CALENDAR,
|
||||
schedule_item_id: UUID | None = None,
|
||||
) -> InboxMessage:
|
||||
message = MagicMock(spec=InboxMessage)
|
||||
message.id = message_id
|
||||
message.recipient_id = recipient_id
|
||||
message.sender_id = uuid4()
|
||||
message.message_type = message_type
|
||||
message.schedule_item_id = schedule_item_id
|
||||
message.content = "calendar invite"
|
||||
message.is_read = False
|
||||
message.status = status
|
||||
message.created_at = datetime(2026, 2, 28, 10, 0, 0, tzinfo=timezone.utc)
|
||||
return message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_messages_returns_messages() -> None:
|
||||
user_id = uuid4()
|
||||
repo = AsyncMock()
|
||||
repo.list_by_recipient.return_value = [
|
||||
_build_message(
|
||||
message_id=uuid4(),
|
||||
recipient_id=user_id,
|
||||
schedule_item_id=uuid4(),
|
||||
)
|
||||
]
|
||||
session = AsyncMock()
|
||||
service = InboxMessageService(
|
||||
repository=repo,
|
||||
session=session,
|
||||
current_user=CurrentUser(id=user_id),
|
||||
)
|
||||
|
||||
result = await service.list_messages(InboxMessageListRequest())
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].recipient_id == user_id
|
||||
assert result[0].status.value == "pending"
|
||||
repo.list_by_recipient.assert_awaited_once_with(user_id, None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_accept_invitation_creates_subscription() -> None:
|
||||
user_id = uuid4()
|
||||
message_id = uuid4()
|
||||
item_id = uuid4()
|
||||
pending_message = _build_message(
|
||||
message_id=message_id,
|
||||
recipient_id=user_id,
|
||||
schedule_item_id=item_id,
|
||||
)
|
||||
accepted_message = _build_message(
|
||||
message_id=message_id,
|
||||
recipient_id=user_id,
|
||||
status=InboxMessageModelStatus.ACCEPTED,
|
||||
schedule_item_id=item_id,
|
||||
)
|
||||
|
||||
repo = AsyncMock()
|
||||
repo.get_by_id.return_value = pending_message
|
||||
repo.update_status.return_value = accepted_message
|
||||
|
||||
session = AsyncMock()
|
||||
session.add = MagicMock()
|
||||
|
||||
service = InboxMessageService(
|
||||
repository=repo,
|
||||
session=session,
|
||||
current_user=CurrentUser(id=user_id),
|
||||
)
|
||||
|
||||
result = await service.accept_invitation(
|
||||
message_id,
|
||||
InboxMessageAcceptRequest(
|
||||
permission_view=True,
|
||||
permission_edit=True,
|
||||
permission_invite=False,
|
||||
),
|
||||
)
|
||||
|
||||
session.add.assert_called_once()
|
||||
subscription = session.add.call_args.args[0]
|
||||
assert isinstance(subscription, ScheduleSubscription)
|
||||
assert subscription.item_id == item_id
|
||||
assert subscription.subscriber_id == user_id
|
||||
assert subscription.permission == 3
|
||||
assert subscription.status == SubscriptionStatus.ACTIVE
|
||||
repo.update_status.assert_awaited_once_with(message_id, user_id, "accepted")
|
||||
session.commit.assert_awaited_once()
|
||||
assert result.status.value == "accepted"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dismiss_invitation_updates_status() -> None:
|
||||
user_id = uuid4()
|
||||
message_id = uuid4()
|
||||
pending_message = _build_message(
|
||||
message_id=message_id,
|
||||
recipient_id=user_id,
|
||||
schedule_item_id=uuid4(),
|
||||
)
|
||||
dismissed_message = _build_message(
|
||||
message_id=message_id,
|
||||
recipient_id=user_id,
|
||||
status=InboxMessageModelStatus.DISMISSED,
|
||||
schedule_item_id=uuid4(),
|
||||
)
|
||||
|
||||
repo = AsyncMock()
|
||||
repo.get_by_id.return_value = pending_message
|
||||
repo.update_status.return_value = dismissed_message
|
||||
|
||||
session = AsyncMock()
|
||||
service = InboxMessageService(
|
||||
repository=repo,
|
||||
session=session,
|
||||
current_user=CurrentUser(id=user_id),
|
||||
)
|
||||
|
||||
result = await service.dismiss_invitation(message_id)
|
||||
|
||||
repo.update_status.assert_awaited_once_with(message_id, user_id, "dismissed")
|
||||
session.commit.assert_awaited_once()
|
||||
assert result.status.value == "dismissed"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_accept_noncalendar_message_fails() -> None:
|
||||
user_id = uuid4()
|
||||
message_id = uuid4()
|
||||
non_calendar_message = _build_message(
|
||||
message_id=message_id,
|
||||
recipient_id=user_id,
|
||||
message_type=InboxMessageModelType.FRIEND_REQUEST,
|
||||
schedule_item_id=None,
|
||||
)
|
||||
|
||||
repo = AsyncMock()
|
||||
repo.get_by_id.return_value = non_calendar_message
|
||||
|
||||
session = AsyncMock()
|
||||
session.add = MagicMock()
|
||||
|
||||
service = InboxMessageService(
|
||||
repository=repo,
|
||||
session=session,
|
||||
current_user=CurrentUser(id=user_id),
|
||||
)
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await service.accept_invitation(message_id, InboxMessageAcceptRequest())
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert exc_info.value.detail == "Message is not a calendar invitation"
|
||||
session.add.assert_not_called()
|
||||
session.commit.assert_not_awaited()
|
||||
@@ -7,8 +7,10 @@ from uuid import UUID, uuid4
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from core.auth.models import CurrentUser
|
||||
from models.inbox_messages import InboxMessage, InboxMessageType
|
||||
from models.schedule_items import ScheduleItem
|
||||
from v1.auth.schemas import UserByEmailResponse
|
||||
from v1.schedule_items.repository import ScheduleItemRepository
|
||||
@@ -72,6 +74,16 @@ class AuthGatewayStub:
|
||||
)
|
||||
|
||||
|
||||
class AuthGatewayInvalidIdStub:
|
||||
async def get_user_by_email(self, email: str) -> UserByEmailResponse:
|
||||
return UserByEmailResponse(
|
||||
id="not-a-uuid",
|
||||
email=email,
|
||||
created_at="2026-02-28T10:00:00Z",
|
||||
email_confirmed_at=None,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_share_forbidden_when_not_owner() -> None:
|
||||
owner_id = UUID("00000000-0000-0000-0000-000000000001")
|
||||
@@ -99,3 +111,127 @@ async def test_share_forbidden_when_not_owner() -> None:
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_share_success_creates_calendar_invitation_message() -> None:
|
||||
owner_id = UUID("00000000-0000-0000-0000-000000000001")
|
||||
item_id = uuid4()
|
||||
session = AsyncMock()
|
||||
session.add = MagicMock()
|
||||
service = ScheduleItemService(
|
||||
repository=cast(
|
||||
ScheduleItemRepository,
|
||||
ShareRepo(_build_item(item_id=item_id, owner_id=owner_id)),
|
||||
),
|
||||
session=session,
|
||||
current_user=CurrentUser(id=owner_id),
|
||||
auth_gateway=AuthGatewayStub(),
|
||||
)
|
||||
|
||||
result = await service.share(
|
||||
item_id,
|
||||
ScheduleItemShareRequest(
|
||||
email="friend@example.com",
|
||||
permission_view=True,
|
||||
permission_edit=True,
|
||||
permission_invite=False,
|
||||
),
|
||||
)
|
||||
|
||||
assert result.message == "Calendar invitation sent"
|
||||
session.add.assert_called_once()
|
||||
message = session.add.call_args.args[0]
|
||||
assert isinstance(message, InboxMessage)
|
||||
assert message.sender_id == owner_id
|
||||
assert message.schedule_item_id == item_id
|
||||
assert message.message_type == InboxMessageType.CALENDAR
|
||||
assert message.content == '{"permission": 5}'
|
||||
session.commit.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_share_returns_not_found_when_item_missing() -> None:
|
||||
requester_id = UUID("00000000-0000-0000-0000-000000000002")
|
||||
service = ScheduleItemService(
|
||||
repository=cast(ScheduleItemRepository, ShareRepo(None)),
|
||||
session=AsyncMock(),
|
||||
current_user=CurrentUser(id=requester_id),
|
||||
auth_gateway=AuthGatewayStub(),
|
||||
)
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await service.share(
|
||||
uuid4(),
|
||||
ScheduleItemShareRequest(
|
||||
email="friend@example.com",
|
||||
permission_view=True,
|
||||
permission_edit=False,
|
||||
permission_invite=False,
|
||||
),
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_share_invalid_auth_user_id_returns_503() -> None:
|
||||
owner_id = UUID("00000000-0000-0000-0000-000000000001")
|
||||
item_id = uuid4()
|
||||
session = AsyncMock()
|
||||
service = ScheduleItemService(
|
||||
repository=cast(
|
||||
ScheduleItemRepository,
|
||||
ShareRepo(_build_item(item_id=item_id, owner_id=owner_id)),
|
||||
),
|
||||
session=session,
|
||||
current_user=CurrentUser(id=owner_id),
|
||||
auth_gateway=AuthGatewayInvalidIdStub(),
|
||||
)
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await service.share(
|
||||
item_id,
|
||||
ScheduleItemShareRequest(
|
||||
email="friend@example.com",
|
||||
permission_view=True,
|
||||
permission_edit=False,
|
||||
permission_invite=False,
|
||||
),
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 503
|
||||
assert exc_info.value.detail == "Auth lookup unavailable"
|
||||
session.rollback.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_share_sqlalchemy_error_rolls_back() -> None:
|
||||
owner_id = UUID("00000000-0000-0000-0000-000000000001")
|
||||
item_id = uuid4()
|
||||
session = AsyncMock()
|
||||
session.add = MagicMock(side_effect=SQLAlchemyError("db error"))
|
||||
service = ScheduleItemService(
|
||||
repository=cast(
|
||||
ScheduleItemRepository,
|
||||
ShareRepo(_build_item(item_id=item_id, owner_id=owner_id)),
|
||||
),
|
||||
session=session,
|
||||
current_user=CurrentUser(id=owner_id),
|
||||
auth_gateway=AuthGatewayStub(),
|
||||
)
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await service.share(
|
||||
item_id,
|
||||
ScheduleItemShareRequest(
|
||||
email="friend@example.com",
|
||||
permission_view=True,
|
||||
permission_edit=False,
|
||||
permission_invite=False,
|
||||
),
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 503
|
||||
assert exc_info.value.detail == "Schedule item store unavailable"
|
||||
session.rollback.assert_awaited_once()
|
||||
|
||||
Reference in New Issue
Block a user