# Calendar Sharing Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** 实现日历事件分享功能 - 用户可以分享日历事件给其他人(通过邮箱),被邀请人会收到待办消息,可以同意或忽略邀请。 **Architecture:** 使用现有的 schemas/repository/service/router 分层架构。新增 inbox_messages 模块处理邀请消息。复用 auth gateway 的 get_user_by_email 通过邮箱查找用户。 **Tech Stack:** FastAPI, SQLAlchemy (async), Pydantic, Supabase Auth --- ## Permission Bits (from design doc) | Permission | Value | Binary | |------------|-------|--------| | view | 1 | 001 | | invite | 2 | 010 | | edit | 4 | 100 | - Owner has all permissions: 7 (111) - Check permission: `permission & 2 == 2` (has invite) - Add permission: `permission | 2` --- ## Task 1: Add inbox_messages module (schemas, repository, service, router) **Files:** - Create: `backend/src/v1/inbox_messages/__init__.py` - Create: `backend/src/v1/inbox_messages/schemas.py` - Create: `backend/src/v1/inbox_messages/repository.py` - Create: `backend/src/v1/inbox_messages/service.py` - Create: `backend/src/v1/inbox_messages/router.py` - Modify: `backend/src/v1/router.py` - include inbox_messages router **Step 1: Write the failing test** ```python # backend/tests/unit/v1/inbox_messages/test_schemas.py import pytest from uuid import uuid4 from v1.inbox_messages.schemas import ( InboxMessageResponse, InboxMessageListRequest, InboxMessageAcceptRequest, ) def test_inbox_message_response_schema(): msg_id = uuid4() response = InboxMessageResponse( id=msg_id, recipient_id=uuid4(), sender_id=uuid4(), message_type="calendar", schedule_item_id=uuid4(), content="Join my calendar", is_read=False, status="pending", ) assert response.message_type == "calendar" assert response.status == "pending" def test_inbox_message_accept_request_schema(): request = InboxMessageAcceptRequest( permission_view=True, permission_edit=False, permission_invite=False, ) assert request.permission_view is True assert request.permission_edit is False ``` **Step 2: Run test to verify it fails** Run: `cd backend && uv run pytest tests/unit/v1/inbox_messages/test_schemas.py -v` Expected: FAIL with "ModuleNotFoundError: No module named 'v1.inbox_messages'" **Step 3: Write minimal implementation** Create `backend/src/v1/inbox_messages/__init__.py`: ```python ``` Create `backend/src/v1/inbox_messages/schemas.py`: ```python from __future__ import annotations from datetime import datetime from enum import Enum from typing import Optional from uuid import UUID from pydantic import BaseModel, Field class InboxMessageType(str, Enum): FRIEND_REQUEST = "friend_request" CALENDAR = "calendar" SYSTEM = "system" GROUP = "group" class InboxMessageStatus(str, Enum): PENDING = "pending" ACCEPTED = "accepted" REJECTED = "rejected" DISMISSED = "dismissed" class InboxMessageResponse(BaseModel): id: UUID recipient_id: UUID sender_id: Optional[UUID] = None message_type: InboxMessageType schedule_item_id: Optional[UUID] = None content: Optional[str] = None is_read: bool = False status: InboxMessageStatus = InboxMessageStatus.PENDING created_at: datetime class InboxMessageListRequest(BaseModel): status: Optional[InboxMessageStatus] = None class InboxMessageAcceptRequest(BaseModel): permission_view: bool = True permission_edit: bool = False permission_invite: bool = False ``` Create `backend/src/v1/inbox_messages/repository.py`: ```python from __future__ import annotations from typing import Optional from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from models.inbox_messages import InboxMessage, InboxMessageStatus class InboxMessageRepository: def __init__(self, session: AsyncSession) -> None: self._session = session async def create(self, data: dict) -> InboxMessage: msg = InboxMessage(**data) self._session.add(msg) await self._session.flush() return msg async def get_by_id(self, message_id: UUID, recipient_id: UUID) -> Optional[InboxMessage]: result = await self._session.execute( select(InboxMessage).where( InboxMessage.id == message_id, InboxMessage.recipient_id == recipient_id, ) ) return result.scalar_one_or_none() async def list_by_recipient( self, recipient_id: UUID, status: Optional[InboxMessageStatus] = None ) -> list[InboxMessage]: query = select(InboxMessage).where(InboxMessage.recipient_id == recipient_id) if status: query = query.where(InboxMessage.status == status) query = query.order_by(InboxMessage.created_at.desc()) result = await self._session.execute(query) return list(result.scalars().all()) async def update_status( self, message_id: UUID, recipient_id: UUID, status: InboxMessageStatus ) -> Optional[InboxMessage]: msg = await self.get_by_id(message_id, recipient_id) if msg: msg.status = status await self._session.flush() return msg ``` Create `backend/src/v1/inbox_messages/service.py`: ```python from __future__ import annotations from typing import TYPE_CHECKING, Optional from uuid import UUID from fastapi import HTTPException from sqlalchemy.exc import SQLAlchemyError from core.auth.models import CurrentUser from core.db.base_service import BaseService from core.logging import get_logger from models.inbox_messages import InboxMessageStatus from v1.inbox_messages.repository import InboxMessageRepository from v1.inbox_messages.schemas import ( InboxMessageAcceptRequest, InboxMessageListRequest, InboxMessageResponse, ) if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession logger = get_logger("v1.inbox_messages.service") class InboxMessageService(BaseService): _repository: InboxMessageRepository _session: AsyncSession def __init__( self, repository: InboxMessageRepository, session: AsyncSession, current_user: Optional[CurrentUser] = None, ) -> None: super().__init__(current_user=current_user) self._repository = repository self._session = session async def list_messages( self, request: InboxMessageListRequest ) -> list[InboxMessageResponse]: user_id = self.require_user_id() try: messages = await self._repository.list_by_recipient( user_id, request.status ) except SQLAlchemyError: logger.exception("Failed to list inbox messages") raise HTTPException(status_code=503, detail="Inbox unavailable") return [ InboxMessageResponse( id=m.id, recipient_id=m.recipient_id, sender_id=m.sender_id, message_type=m.message_type, schedule_item_id=m.schedule_item_id, content=m.content, is_read=m.is_read, status=m.status, created_at=m.created_at, ) for m in messages ] async def accept_invitation( self, message_id: UUID, request: InboxMessageAcceptRequest ) -> None: user_id = self.require_user_id() try: message = await self._repository.get_by_id(message_id, user_id) except SQLAlchemyError: logger.exception("Failed to get inbox message", message_id=str(message_id)) raise HTTPException(status_code=503, detail="Inbox unavailable") if message is None: raise HTTPException(status_code=404, detail="Message not found") if message.message_type != InboxMessageStatus.PENDING: raise HTTPException(status_code=400, detail="Message already processed") message.status = InboxMessageStatus.ACCEPTED await self._session.flush() await self._session.commit() async def dismiss_invitation(self, message_id: UUID) -> None: user_id = self.require_user_id() try: message = await self._repository.get_by_id(message_id, user_id) except SQLAlchemyError: logger.exception("Failed to get inbox message", message_id=str(message_id)) raise HTTPException(status_code=503, detail="Inbox unavailable") if message is None: raise HTTPException(status_code=404, detail="Message not found") message.status = InboxMessageStatus.DISMISSED await self._session.flush() await self._session.commit() ``` Create `backend/src/v1/inbox_messages/dependencies.py`: ```python from typing import Annotated from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession from core.auth.dependencies import get_current_user from core.db.session import get_db from models.auth.models import CurrentUser from v1.inbox_messages.repository import InboxMessageRepository from v1.inbox_messages.service import InboxMessageService def get_inbox_message_repository( session: Annotated[AsyncSession, Depends(get_db)] ) -> InboxMessageRepository: return InboxMessageRepository(session) def get_inbox_message_service( repository: Annotated[InboxMessageRepository, Depends(get_inbox_message_repository)], current_user: Annotated[CurrentUser | None, Depends(get_current_user)], ) -> InboxMessageService: return InboxMessageService( repository=repository, session=repository._session, current_user=current_user, ) ``` Create `backend/src/v1/inbox_messages/router.py`: ```python from __future__ import annotations from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, Query from v1.inbox_messages.dependencies import get_inbox_message_service from v1.inbox_messages.schemas import ( InboxMessageAcceptRequest, InboxMessageListRequest, InboxMessageResponse, InboxMessageStatus, ) from v1.inbox_messages.service import InboxMessageService router = APIRouter(prefix="/inbox", tags=["inbox"]) @router.get("/messages", response_model=list[InboxMessageResponse]) async def list_inbox_messages( service: Annotated[InboxMessageService, Depends(get_inbox_message_service)], status: InboxMessageStatus | None = Query(None, description="Filter by status"), ) -> list[InboxMessageResponse]: request = InboxMessageListRequest(status=status) return await service.list_messages(request) @router.post("/messages/{message_id}/accept", status_code=204) async def accept_invitation( message_id: UUID, request: InboxMessageAcceptRequest, service: Annotated[InboxMessageService, Depends(get_inbox_message_service)], ) -> None: await service.accept_invitation(message_id, request) @router.post("/messages/{message_id}/dismiss", status_code=204) async def dismiss_invitation( message_id: UUID, service: Annotated[InboxMessageService, Depends(get_inbox_message_service)], ) -> None: await service.dismiss_invitation(message_id) ``` Modify `backend/src/v1/router.py`: ```python from fastapi import APIRouter from core.http.models import HealthResponse from v1.agent_chat.router import router as agent_chat_router from v1.auth.router import router as auth_router from v1.infra.router import router as infra_router from v1.inbox_messages.router import router as inbox_messages_router from v1.schedule_items.router import router as schedule_items_router from v1.users.router import router as users_router router = APIRouter(prefix="/api/v1") router.include_router(auth_router) router.include_router(infra_router) router.include_router(users_router) router.include_router(agent_chat_router) router.include_router(schedule_items_router) router.include_router(inbox_messages_router) @router.get("/health", response_model=HealthResponse) async def health() -> HealthResponse: return HealthResponse(status="ok") ``` **Step 4: Run test to verify it passes** Run: `cd backend && uv run pytest tests/unit/v1/inbox_messages/test_schemas.py -v` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/inbox_messages/ backend/src/v1/router.py git commit -m "feat: add inbox messages module for calendar invitations" ``` --- ## Task 2: Add share calendar API to schedule_items **Files:** - Modify: `backend/src/v1/schedule_items/schemas.py` - add share schemas - Modify: `backend/src/v1/schedule_items/repository.py` - add subscription create - Modify: `backend/src/v1/schedule_items/service.py` - add share method - Modify: `backend/src/v1/schedule_items/router.py` - add share endpoint - Modify: `backend/src/v1/schedule_items/dependencies.py` - add dependencies **Step 1: Write the failing test** ```python # backend/tests/unit/v1/schedule_items/test_share.py import pytest from uuid import uuid4 from v1.schedule_items.schemas import ScheduleItemShareRequest def test_share_request_schema(): request = ScheduleItemShareRequest( email="friend@example.com", permission_view=True, permission_edit=True, permission_invite=False, ) assert request.email == "friend@example.com" assert request.permission_view is True def test_permission_bits_calculation(): request = ScheduleItemShareRequest( email="friend@example.com", permission_view=True, permission_edit=True, permission_invite=False, ) # view=1, edit=4, invite=0 -> 1|4 = 5 assert request._permission_value() == 5 ``` **Step 2: Run test to verify it fails** Run: `cd backend && uv run pytest tests/unit/v1/schedule_items/test_share.py -v` Expected: FAIL with "cannot import 'ScheduleItemShareRequest'" **Step 3: Write minimal implementation** Add to `backend/src/v1/schedule_items/schemas.py`: ```python class ScheduleItemShareRequest(BaseModel): email: str = Field(..., description="Email of user to share with") permission_view: bool = Field(True, description="Grant view permission") permission_edit: bool = Field(False, description="Grant edit permission") permission_invite: bool = Field(False, description="Grant invite permission") def _permission_value(self) -> int: value = 0 if self.permission_view: value |= 1 # 001 if self.permission_edit: value |= 4 # 100 if self.permission_invite: value |= 2 # 010 return value class ScheduleItemShareResponse(BaseModel): message: str ``` Add to `backend/src/v1/schedule_items/repository.py`: ```python from models.schedule_subscriptions import ScheduleSubscription class ScheduleItemRepository: # ... existing code ... async def create_subscription(self, data: dict) -> ScheduleSubscription: sub = ScheduleSubscription(**data) self._session.add(sub) await self._session.flush() return sub ``` Add to `backend/src/v1/schedule_items/service.py`: ```python from uuid import UUID from core.auth.models import CurrentUser from v1.auth.gateway import SupabaseAuthGateway from models.schedule_subscriptions import ScheduleSubscription class ScheduleItemService: # ... existing code ... async def share( self, item_id: UUID, request: ScheduleItemShareRequest ) -> ScheduleItemShareResponse: user_id = self.require_user_id() # Check item exists and user is owner try: item = await self._repository.get_by_item_id(item_id, user_id) except SQLAlchemyError: logger.exception("Failed to get schedule item", item_id=str(item_id)) raise HTTPException(status_code=503, detail="Schedule item store unavailable") if item is None: raise HTTPException(status_code=404, detail="Schedule item not found") if item.owner_id != user_id: raise HTTPException(status_code=403, detail="Only owner can share") # Lookup user by email auth_gateway = SupabaseAuthGateway() try: target_user = await auth_gateway.get_user_by_email(request.email) except HTTPException as exc: if exc.status_code == 404: raise HTTPException(status_code=404, detail="User not found") raise target_user_id = UUID(target_user.id) # Create inbox message from models.inbox_messages import InboxMessage, InboxMessageType inbox_data = { "recipient_id": target_user_id, "sender_id": user_id, "message_type": InboxMessageType.CALENDAR, "schedule_item_id": item_id, "content": f"{item.title} shared with you", "created_by": user_id, } try: inbox_msg = InboxMessage(**inbox_data) self._session.add(inbox_msg) await self._session.flush() except SQLAlchemyError: logger.exception("Failed to create inbox message") raise HTTPException(status_code=503, detail="Failed to send invitation") await self._session.commit() return ScheduleItemShareResponse( message=f"Invitation sent to {request.email}" ) ``` Add to `backend/src/v1/schedule_items/router.py`: ```python from v1.schedule_items.schemas import ( ScheduleItemCreateRequest, ScheduleItemListItem, ScheduleItemListRequest, ScheduleItemResponse, ScheduleItemShareRequest, ScheduleItemShareResponse, ScheduleItemUpdateRequest, ) @router.post("/{item_id}/share", response_model=ScheduleItemShareResponse) async def share_schedule_item( item_id: UUID, request: ScheduleItemShareRequest, service: Annotated[ScheduleItemService, Depends(get_schedule_item_service)], ) -> ScheduleItemShareResponse: return await service.share(item_id, request) ``` **Step 4: Run test to verify it passes** Run: `cd backend && uv run pytest tests/unit/v1/schedule_items/test_share.py -v` Expected: PASS **Step 5: Commit** ```bash git add backend/src/v1/schedule_items/ git commit -m "feat: add share calendar API" ``` --- ## Task 3: Add accept invitation - create subscription **Files:** - Modify: `backend/src/v1/inbox_messages/service.py` - add subscription creation on accept **Step 1: Write the failing test** ```python # backend/tests/unit/v1/inbox_messages/test_accept_invitation.py import pytest from unittest.mock import AsyncMock, MagicMock from uuid import uuid4 from v1.inbox_messages.service import InboxMessageService @pytest.mark.asyncio async def test_accept_creates_subscription(): # Setup mocks mock_repo = MagicMock() mock_session = MagicMock() mock_message = MagicMock() mock_message.id = uuid4() mock_message.message_type = "calendar" mock_message.status = "pending" mock_message.schedule_item_id = uuid4() mock_repo.get_by_id = AsyncMock(return_value=mock_message) mock_repo._session = mock_session service = InboxMessageService( repository=mock_repo, session=mock_session, current_user=MagicMock(user_id=uuid4()), ) # This should be implemented await service.accept_invitation(mock_message.id, ...) ``` **Step 2: Run test to verify it fails** Expected: FAIL (test will fail because accept doesn't create subscription yet) **Step 3: Write implementation** Modify `backend/src/v1/inbox_messages/service.py` to import ScheduleSubscriptionRepository and create subscription on accept. **Step 4: Run test to verify it passes** Run tests and verify pass. **Step 5: Commit** --- ## Task 4: Fix permission enum reference bug **Files:** - Modify: `backend/src/v1/inbox_messages/service.py` - fix InboxMessageStatus reference **Bug:** In Task 1, we used `InboxMessageStatus.PENDING` but should check against the actual enum type. Fix the bug. **Step 1: Write test to verify bug** ```python def test_accept_checks_message_type_not_status(): # Current code incorrectly checks message_type == PENDING # Should check status == PENDING ``` **Step 2: Fix the implementation** --- ## Task 5: Write unit tests **Files:** - Create: `backend/tests/unit/v1/inbox_messages/test_service.py` - Create: `backend/tests/unit/v1/schedule_items/test_share.py` --- ## Task 6: Write integration tests **Files:** - Create: `backend/tests/integration/test_inbox_messages_routes.py` - Create: `backend/tests/integration/test_schedule_share_routes.py` --- ## Task 7: Update API documentation **Files:** - Modify: `docs/runtime/runtime-route.md` - add share/inbox endpoints --- ## Task 8: Run all tests and fix issues Run full test suite and fix any issues. --- ## Task 9: Run lint and typecheck Run: ```bash cd backend && uv run ruff check src/v1/schedule_items/ src/v1/inbox_messages/ cd backend && uv run basedpyright src/v1/schedule_items/ src/v1/inbox_messages/ ``` --- ## Task 10: Final commit and create PR ```bash git add . git commit -m "feat: add calendar sharing with invitations" git push -u origin feature-calendar-sharing gh pr create --title "feat: add calendar sharing" --body "..." ```