from __future__ import annotations from datetime import datetime from typing import cast from unittest.mock import AsyncMock, MagicMock from uuid import UUID, uuid4 import pytest from fastapi import HTTPException from core.auth.models import CurrentUser from models.friendships import Friendship, FriendshipStatus from models.inbox_messages import InboxMessage, InboxMessageStatus, InboxMessageType from models.profile import Profile from v1.friendships.repository import FriendshipRepository from v1.friendships.schemas import ( FriendRequestCreate, ) from v1.friendships.service import FriendshipService from v1.users.repository import UserRepository def _create_mock_profile( user_id: UUID = UUID("00000000-0000-0000-0000-000000000001"), username: str = "testuser", avatar_url: str | None = None, ) -> Profile: """Create a mock Profile ORM object.""" profile = MagicMock() profile.id = user_id profile.username = username profile.avatar_url = avatar_url profile.bio = None return cast(Profile, profile) class FakeFriendshipRepo: """Fake repository for testing that conforms to FriendshipRepository protocol.""" def __init__( self, friendships: list[Friendship] | None = None, inbox_messages: list[InboxMessage] | None = None, ) -> None: self._friendships = friendships or [] self._inbox_messages = inbox_messages or [] async def create_request( self, initiator_id: UUID, recipient_id: UUID, content: str | None = None, ) -> tuple[Friendship, InboxMessage]: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.initiator_id = initiator_id friendship.user_low_id = min(initiator_id, recipient_id) friendship.user_high_id = max(initiator_id, recipient_id) friendship.status = FriendshipStatus.PENDING friendship.created_at = datetime.utcnow() self._friendships.append(friendship) inbox = MagicMock(spec=InboxMessage) inbox.id = uuid4() inbox.recipient_id = recipient_id inbox.sender_id = initiator_id inbox.status = InboxMessageStatus.PENDING inbox.message_type = InboxMessageType.FRIEND_REQUEST inbox.friendship_id = friendship.id inbox.content = {"type": "request", "message": content} self._inbox_messages.append(inbox) return friendship, inbox async def reactivate_request( self, friendship: Friendship, initiator_id: UUID, content: str | None = None, ) -> tuple[Friendship, InboxMessage]: friendship.status = FriendshipStatus.PENDING friendship.initiator_id = initiator_id recipient_id = ( friendship.user_low_id if initiator_id == friendship.user_high_id else friendship.user_high_id ) inbox = MagicMock(spec=InboxMessage) inbox.id = uuid4() inbox.recipient_id = recipient_id inbox.sender_id = initiator_id inbox.status = InboxMessageStatus.PENDING inbox.message_type = InboxMessageType.FRIEND_REQUEST inbox.friendship_id = friendship.id inbox.content = {"type": "request", "message": content} self._inbox_messages.append(inbox) return friendship, inbox async def get_friendship_between_users( self, user_id_1: UUID, user_id_2: UUID ) -> Friendship | None: for f in self._friendships: user_low = min(user_id_1, user_id_2) user_high = max(user_id_1, user_id_2) if f.user_low_id == user_low and f.user_high_id == user_high: return f return None async def get_pending_inbox_for_recipient( self, recipient_id: UUID, friendship_id: UUID ) -> InboxMessage | None: for msg in self._inbox_messages: if msg.recipient_id == recipient_id and msg.friendship_id == friendship_id: return msg return None async def get_friendship_by_id(self, friendship_id: UUID) -> Friendship | None: for f in self._friendships: if f.id == friendship_id: return f return None async def get_friendships_by_ids( self, friendship_ids: list[UUID] ) -> dict[UUID, Friendship]: friendship_set = set(friendship_ids) return { f.id: f for f in self._friendships if getattr(f, "id", None) in friendship_set } async def get_inbox_messages_for_user( self, user_id: UUID, status: InboxMessageStatus | None = None ) -> list[InboxMessage]: result = [msg for msg in self._inbox_messages if msg.recipient_id == user_id] if status: result = [msg for msg in result if msg.status == status] return result async def get_outgoing_requests(self, user_id: UUID) -> list[Friendship]: return [ f for f in self._friendships if f.initiator_id == user_id and f.status == FriendshipStatus.PENDING ] async def get_friends_list(self, user_id: UUID) -> list[Friendship]: return [ f for f in self._friendships if f.status == FriendshipStatus.ACCEPTED and (f.user_low_id == user_id or f.user_high_id == user_id) ] class FakeUserRepo: """Fake user repository for testing.""" def __init__(self, profiles: dict[UUID, Profile] | None = None) -> None: self._profiles = profiles or {} async def get_by_user_id(self, user_id: UUID) -> Profile | None: return self._profiles.get(user_id) async def get_by_user_ids(self, user_ids: list[UUID]) -> dict[UUID, Profile]: user_id_set = set(user_ids) return { uid: profile for uid, profile in self._profiles.items() if uid in user_id_set } async def get_by_username(self, username: str) -> Profile | None: for profile in self._profiles.values(): if profile.username == username: return profile return None async def update_by_user_id( self, user_id: UUID, update_data: dict[str, str | None] ) -> Profile | None: del update_data return self._profiles.get(user_id) async def search_users(self, query: str, limit: int = 20) -> list[Profile]: del limit query_lower = query.lower() return [ profile for profile in self._profiles.values() if query_lower in profile.username.lower() ] _repo_check: FriendshipRepository = FakeFriendshipRepo() _user_repo_check: UserRepository = FakeUserRepo() USER_A = UUID("00000000-0000-0000-0000-000000000001") USER_B = UUID("00000000-0000-0000-0000-000000000002") USER_C = UUID("00000000-0000-0000-0000-000000000003") @pytest.fixture def mock_session() -> AsyncMock: session = AsyncMock() session.commit = AsyncMock() session.rollback = AsyncMock() return session @pytest.fixture def current_user() -> CurrentUser: return CurrentUser(id=USER_A) @pytest.fixture def mock_friendship_repo() -> FakeFriendshipRepo: return FakeFriendshipRepo() @pytest.fixture def mock_user_repo() -> FakeUserRepo: return FakeUserRepo( { USER_A: _create_mock_profile(USER_A, "user_a"), USER_B: _create_mock_profile(USER_B, "user_b"), USER_C: _create_mock_profile(USER_C, "user_c"), } ) class TestSendRequest: @pytest.mark.asyncio async def test_send_request_creates_friendship_and_inbox( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.send_request( FriendRequestCreate(target_user_id=USER_B, content=None) ) assert result is not None mock_session.commit.assert_awaited_once() @pytest.mark.asyncio async def test_send_request_persists_content_to_inbox( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) content = "你好,我是张三" result = await service.send_request( FriendRequestCreate(target_user_id=USER_B, content=content) ) assert result.content == {"type": "request", "message": content} @pytest.mark.asyncio async def test_send_request_to_self_raises_400( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) with pytest.raises(HTTPException) as exc_info: await service.send_request( FriendRequestCreate(target_user_id=current_user.id, content=None) ) assert exc_info.value.status_code == 400 @pytest.mark.asyncio async def test_send_request_to_existing_friend_raises_400( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: existing_friendship = MagicMock(spec=Friendship) existing_friendship.id = uuid4() existing_friendship.user_low_id = min(USER_A, USER_B) existing_friendship.user_high_id = max(USER_A, USER_B) existing_friendship.status = FriendshipStatus.ACCEPTED mock_friendship_repo._friendships.append(existing_friendship) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) with pytest.raises(HTTPException) as exc_info: await service.send_request( FriendRequestCreate(target_user_id=USER_B, content=None) ) assert exc_info.value.status_code == 400 @pytest.mark.asyncio async def test_send_request_when_blocked_raises_400( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: blocked_friendship = MagicMock(spec=Friendship) blocked_friendship.id = uuid4() blocked_friendship.user_low_id = min(USER_A, USER_B) blocked_friendship.user_high_id = max(USER_A, USER_B) blocked_friendship.status = FriendshipStatus.BLOCKED mock_friendship_repo._friendships.append(blocked_friendship) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) with pytest.raises(HTTPException) as exc_info: await service.send_request( FriendRequestCreate(target_user_id=USER_B, content=None) ) assert exc_info.value.status_code == 400 class TestAcceptRequest: @pytest.mark.asyncio async def test_accept_request_updates_status( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.initiator_id = USER_B friendship.user_low_id = min(USER_A, USER_B) friendship.user_high_id = max(USER_A, USER_B) friendship.status = FriendshipStatus.PENDING mock_friendship_repo._friendships.append(friendship) inbox = MagicMock(spec=InboxMessage) inbox.id = uuid4() inbox.recipient_id = USER_A # current_user is the recipient inbox.content = None inbox.status = InboxMessageStatus.PENDING inbox.friendship_id = friendship.id mock_friendship_repo._inbox_messages.append(inbox) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.accept_request(friendship.id) assert result.status == "accepted" mock_session.commit.assert_awaited_once() @pytest.mark.asyncio async def test_accept_nonexistent_request_raises_404( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) with pytest.raises(HTTPException) as exc_info: await service.accept_request(uuid4()) assert exc_info.value.status_code == 404 @pytest.mark.asyncio async def test_accept_request_not_recipient_raises_403( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.initiator_id = USER_A # current user is the initiator friendship.user_low_id = min(USER_A, USER_B) friendship.user_high_id = max(USER_A, USER_B) friendship.status = FriendshipStatus.PENDING mock_friendship_repo._friendships.append(friendship) inbox = MagicMock(spec=InboxMessage) inbox.id = uuid4() inbox.recipient_id = USER_B # the other user is the recipient inbox.content = None inbox.status = InboxMessageStatus.PENDING inbox.friendship_id = friendship.id mock_friendship_repo._inbox_messages.append(inbox) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) with pytest.raises(HTTPException) as exc_info: await service.accept_request(friendship.id) assert exc_info.value.status_code == 403 class TestDeclineRequest: @pytest.mark.asyncio async def test_decline_request_updates_status( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.initiator_id = USER_B friendship.user_low_id = min(USER_A, USER_B) friendship.user_high_id = max(USER_A, USER_B) friendship.status = FriendshipStatus.PENDING mock_friendship_repo._friendships.append(friendship) inbox = MagicMock(spec=InboxMessage) inbox.id = uuid4() inbox.recipient_id = USER_A inbox.content = None inbox.status = InboxMessageStatus.PENDING inbox.friendship_id = friendship.id mock_friendship_repo._inbox_messages.append(inbox) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.decline_request(friendship.id) assert result.status == "rejected" mock_session.commit.assert_awaited_once() class TestCancelRequest: @pytest.mark.asyncio async def test_cancel_request_removes_request( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.initiator_id = current_user.id friendship.user_low_id = min(USER_A, USER_B) friendship.user_high_id = max(USER_A, USER_B) friendship.status = FriendshipStatus.PENDING mock_friendship_repo._friendships.append(friendship) inbox = MagicMock(spec=InboxMessage) inbox.id = uuid4() inbox.recipient_id = USER_B # The other user inbox.content = None inbox.status = InboxMessageStatus.PENDING inbox.friendship_id = friendship.id mock_friendship_repo._inbox_messages.append(inbox) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.cancel_request(friendship.id) assert result.status == "canceled" mock_session.commit.assert_awaited_once() class TestGetInbox: @pytest.mark.asyncio async def test_get_inbox_returns_pending_messages( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.initiator_id = USER_B friendship.status = FriendshipStatus.PENDING mock_friendship_repo._friendships.append(friendship) inbox = MagicMock(spec=InboxMessage) inbox.id = uuid4() inbox.recipient_id = current_user.id inbox.sender_id = USER_B inbox.content = None inbox.status = InboxMessageStatus.PENDING inbox.message_type = InboxMessageType.FRIEND_REQUEST inbox.friendship_id = friendship.id inbox.created_at = datetime.utcnow() mock_friendship_repo._inbox_messages.append(inbox) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.get_inbox() assert len(result) == 1 class TestGetOutgoingRequests: @pytest.mark.asyncio async def test_get_outgoing_requests_returns_sent_requests( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.initiator_id = current_user.id friendship.status = FriendshipStatus.PENDING mock_friendship_repo._friendships.append(friendship) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.get_outgoing_requests() assert len(result) == 1 class TestGetFriendsList: @pytest.mark.asyncio async def test_get_friends_list_returns_accepted_friends( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.user_low_id = current_user.id friendship.user_high_id = USER_B friendship.status = FriendshipStatus.ACCEPTED friendship.created_at = datetime.utcnow() friendship.updated_at = datetime.utcnow() mock_friendship_repo._friendships.append(friendship) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.get_friends_list() assert len(result) == 1 class TestRemoveFriend: @pytest.mark.asyncio async def test_remove_friend_soft_deletes( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: friendship = MagicMock(spec=Friendship) friendship.id = uuid4() friendship.user_low_id = min(current_user.id, USER_B) friendship.user_high_id = max(current_user.id, USER_B) friendship.status = FriendshipStatus.ACCEPTED friendship.created_at = datetime.utcnow() friendship.updated_at = datetime.utcnow() mock_friendship_repo._friendships.append(friendship) service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) result = await service.remove_friend(USER_B) assert result.status == "active" mock_session.commit.assert_awaited_once() @pytest.mark.asyncio async def test_remove_nonexistent_friend_raises_404( self, mock_session: AsyncMock, mock_friendship_repo: FakeFriendshipRepo, mock_user_repo: FakeUserRepo, current_user: CurrentUser, ) -> None: service = FriendshipService( repository=mock_friendship_repo, user_repository=mock_user_repo, session=mock_session, current_user=current_user, ) with pytest.raises(HTTPException) as exc_info: await service.remove_friend(uuid4()) assert exc_info.value.status_code == 404