Files
social-app/docs/plans/2026-02-27-schedule-items-api-implementation.md
T

1245 lines
36 KiB
Markdown

# Schedule Items API Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 实现日历事项(Schedule Items)的后端 CRUD API,支持用户创建、查询、更新、删除日历事项。
**Architecture:** 遵循项目 `schemas / repository / service / router` 分层模式,与现有 `/users` API 保持一致。
**Tech Stack:** FastAPI, SQLAlchemy, Pydantic, PostgreSQL
---
## Task 1: 创建目录结构和 __init__.py
**Files:**
- Create: `backend/src/v1/schedule_items/__init__.py`
**Step 1: 创建目录和 __init__.py**
```python
# backend/src/v1/schedule_items/__init__.py
```
Run: `mkdir -p backend/src/v1/schedule_items && touch backend/src/v1/schedule_items/__init__.py`
---
## Task 2: 创建 Pydantic Schemas
**Files:**
- Create: `backend/src/v1/schedule_items/schemas.py`
**Step 1: 写入 schemas.py**
```python
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import ClassVar
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field, field_validator
class AttachmentType(str, Enum):
DOCUMENT = "document"
REMINDER = "reminder"
class ScheduleItemMetadataAttachment(BaseModel):
name: str
type: AttachmentType
visible_to: list[UUID] = []
url: str | None = None
note: str | None = None
content: str | None = None
class ScheduleItemMetadata(BaseModel):
color: str | None = None
location: str | None = None
notes: str | None = None
attachments: list[ScheduleItemMetadataAttachment] = []
version: int = 1
class ScheduleItemStatus(str, Enum):
ACTIVE = "active"
COMPLETED = "completed"
CANCELED = "canceled"
ARCHIVED = "archived"
class ScheduleItemSourceType(str, Enum):
MANUAL = "manual"
IMPORTED = "imported"
AGENT_GENERATED = "agent_generated"
class ScheduleItemCreateRequest(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
title: str = Field(min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=2000)
start_at: datetime
end_at: datetime | None = None
timezone: str = "UTC"
metadata: ScheduleItemMetadata | None = None
class ScheduleItemUpdateRequest(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
title: str | None = Field(default=None, min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=2000)
start_at: datetime | None = None
end_at: datetime | None = None
timezone: str | None = None
metadata: ScheduleItemMetadata | None = None
status: ScheduleItemStatus | None = None
@field_validator("end_at", mode="before")
@classmethod
def validate_end_at(cls, v: datetime | None, info) -> datetime | None:
return v
class ScheduleItemResponse(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(from_attributes=True)
id: UUID
title: str
description: str | None = None
start_at: datetime
end_at: datetime | None = None
timezone: str
metadata: ScheduleItemMetadata | None = None
status: ScheduleItemStatus
source_type: ScheduleItemSourceType
created_at: datetime
updated_at: datetime
class ScheduleItemListItem(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(from_attributes=True)
id: UUID
title: str
start_at: datetime
end_at: datetime | None = None
timezone: str
status: ScheduleItemStatus
class ScheduleItemListRequest(BaseModel):
start_at: datetime
end_at: datetime
```
---
## Task 3: 创建 Repository
**Files:**
- Create: `backend/src/v1/schedule_items/repository.py`
**Step 1: 写入 repository.py**
```python
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Protocol
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from core.db.base_repository import BaseRepository
from core.logging import get_logger
from models.schedule_items import ScheduleItem
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
logger = get_logger("v1.schedule_items.repository")
class ScheduleItemRepository(Protocol):
async def get_by_id(self, item_id: UUID, owner_id: UUID) -> ScheduleItem | None: ...
async def create(self, data: dict) -> ScheduleItem: ...
async def update_by_id(self, item_id: UUID, owner_id: UUID, data: dict) -> ScheduleItem | None: ...
async def delete_by_id(self, item_id: UUID, owner_id: UUID) -> ScheduleItem | None: ...
async def list_by_date_range(self, owner_id: UUID, start_at: datetime, end_at: datetime) -> list[ScheduleItem]: ...
class SQLAlchemyScheduleItemRepository(BaseRepository[ScheduleItem]):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session, ScheduleItem)
async def get_by_id(self, item_id: UUID, owner_id: UUID) -> ScheduleItem | None:
try:
stmt = (
select(ScheduleItem)
.where(ScheduleItem.id == item_id)
.where(ScheduleItem.owner_id == owner_id)
.where(ScheduleItem.deleted_at.is_(None))
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
except SQLAlchemyError:
logger.exception("Schedule item lookup failed", item_id=str(item_id), owner_id=str(owner_id))
raise
async def create(self, data: dict) -> ScheduleItem:
try:
item = ScheduleItem(**data)
self._session.add(item)
await self._session.flush()
return item
except SQLAlchemyError:
logger.exception("Schedule item creation failed")
raise
async def update_by_id(
self, item_id: UUID, owner_id: UUID, data: dict
) -> ScheduleItem | None:
if not data:
return await self.get_by_id(item_id, owner_id)
try:
return await self.update_by_id(item_id, data)
except SQLAlchemyError:
logger.exception("Schedule item update failed", item_id=str(item_id))
raise
async def delete_by_id(self, item_id: UUID, owner_id: UUID) -> ScheduleItem | None:
try:
return await self.soft_delete_by_id(item_id)
except SQLAlchemyError:
logger.exception("Schedule item delete failed", item_id=str(item_id))
raise
async def list_by_date_range(
self, owner_id: UUID, start_at: datetime, end_at: datetime
) -> list[ScheduleItem]:
try:
stmt = (
select(ScheduleItem)
.where(ScheduleItem.owner_id == owner_id)
.where(ScheduleItem.deleted_at.is_(None))
.where(ScheduleItem.start_at >= start_at)
.where(ScheduleItem.start_at <= end_at)
.order_by(ScheduleItem.start_at.asc())
)
result = await self._session.execute(stmt)
return list(result.scalars().all())
except SQLAlchemyError:
logger.exception("Schedule item list failed", owner_id=str(owner_id))
raise
```
---
## Task 4: 创建 Service
**Files:**
- Create: `backend/src/v1/schedule_items/service.py`
**Step 1: 写入 service.py**
```python
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy.exc import SQLAlchemyError
from core.auth.models import CurrentUser
from core.db.base_service import BaseService
from core.logging import get_logger
from models.schedule_items import ScheduleItem, ScheduleItemSourceType, ScheduleItemStatus
from v1.schedule_items.repository import ScheduleItemRepository
from v1.schedule_items.schemas import (
ScheduleItemCreateRequest,
ScheduleItemListRequest,
ScheduleItemResponse,
ScheduleItemUpdateRequest,
)
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
logger = get_logger("v1.schedule_items.service")
class ScheduleItemService(BaseService):
_repository: ScheduleItemRepository
_session: AsyncSession
def __init__(
self,
repository: ScheduleItemRepository,
session: AsyncSession,
current_user: CurrentUser | None,
) -> None:
super().__init__(current_user=current_user)
self._repository = repository
self._session = session
async def create(self, request: ScheduleItemCreateRequest) -> ScheduleItemResponse:
user_id = self.require_user_id()
if request.end_at and request.end_at <= request.start_at:
raise HTTPException(status_code=400, detail="end_at must be after start_at")
data = {
"owner_id": user_id,
"title": request.title,
"description": request.description,
"start_at": request.start_at,
"end_at": request.end_at,
"timezone": request.timezone,
"metadata": request.metadata.model_dump() if request.metadata else {},
"source_type": ScheduleItemSourceType.MANUAL,
"status": ScheduleItemStatus.ACTIVE,
"created_by": user_id,
}
try:
item = await self._repository.create(data)
await self._session.commit()
except SQLAlchemyError:
await self._session.rollback()
logger.exception("Failed to create schedule item")
raise HTTPException(status_code=503, detail="Schedule item store unavailable")
return self._to_response(item)
async def get_by_id(self, item_id: UUID) -> ScheduleItemResponse:
user_id = self.require_user_id()
try:
item = await self._repository.get_by_id(item_id, user_id)
except SQLAlchemyError:
logger.exception("Failed to get schedule item", item_id=str(item_id))
raise HTTPException(status_code=503, detail="Schedule item store unavailable")
if item is None:
raise HTTPException(status_code=404, detail="Schedule item not found")
return self._to_response(item)
async def update(
self, item_id: UUID, request: ScheduleItemUpdateRequest
) -> ScheduleItemResponse:
user_id = self.require_user_id()
existing = await self._repository.get_by_id(item_id, user_id)
if existing is None:
raise HTTPException(status_code=404, detail="Schedule item not found")
update_data: dict = {}
if request.title is not None:
update_data["title"] = request.title
if request.description is not None:
update_data["description"] = request.description
if request.start_at is not None:
update_data["start_at"] = request.start_at
if request.end_at is not None:
update_data["end_at"] = request.end_at
if request.timezone is not None:
update_data["timezone"] = request.timezone
if request.status is not None:
update_data["status"] = request.status
if request.metadata is not None:
update_data["metadata"] = request.metadata.model_dump()
if request.end_at and request.start_at and request.end_at <= request.start_at:
raise HTTPException(status_code=400, detail="end_at must be after start_at")
if not update_data:
return self._to_response(existing)
try:
item = await self._repository.update_by_id(item_id, user_id, update_data)
await self._session.commit()
except SQLAlchemyError:
await self._session.rollback()
logger.exception("Failed to update schedule item", item_id=str(item_id))
raise HTTPException(status_code=503, detail="Schedule item store unavailable")
if item is None:
raise HTTPException(status_code=404, detail="Schedule item not found")
return self._to_response(item)
async def delete(self, item_id: UUID) -> None:
user_id = self.require_user_id()
existing = await self._repository.get_by_id(item_id, user_id)
if existing is None:
raise HTTPException(status_code=404, detail="Schedule item not found")
try:
await self._repository.delete_by_id(item_id, user_id)
await self._session.commit()
except SQLAlchemyError:
await self._session.rollback()
logger.exception("Failed to delete schedule item", item_id=str(item_id))
raise HTTPException(status_code=503, detail="Schedule item store unavailable")
async def list_by_date_range(
self, request: ScheduleItemListRequest
) -> list[ScheduleItemResponse]:
user_id = self.require_user_id()
if request.end_at <= request.start_at:
raise HTTPException(status_code=400, detail="end_at must be after start_at")
try:
items = await self._repository.list_by_date_range(
user_id, request.start_at, request.end_at
)
except SQLAlchemyError:
logger.exception("Failed to list schedule items")
raise HTTPException(status_code=503, detail="Schedule item store unavailable")
return [self._to_response(item) for item in items]
def _to_response(self, item: ScheduleItem) -> ScheduleItemResponse:
return ScheduleItemResponse(
id=item.id,
title=item.title,
description=item.description,
start_at=item.start_at,
end_at=item.end_at,
timezone=item.timezone,
metadata=item.extra_metadata,
status=item.status,
source_type=item.source_type,
created_at=item.created_at,
updated_at=item.updated_at,
)
```
---
## Task 5: 创建 Dependencies
**Files:**
- Create: `backend/src/v1/schedule_items/dependencies.py`
**Step 1: 写入 dependencies.py**
```python
from __future__ import annotations
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from core.db import get_db
from core.auth.models import CurrentUser
from v1.users.dependencies import get_current_user
from v1.schedule_items.repository import SQLAlchemyScheduleItemRepository
from v1.schedule_items.service import ScheduleItemService
async def get_schedule_item_repository(
session: Annotated[AsyncSession, Depends(get_db)],
) -> SQLAlchemyScheduleItemRepository:
return SQLAlchemyScheduleItemRepository(session)
def get_schedule_item_service(
session: Annotated[AsyncSession, Depends(get_db)],
user: Annotated[CurrentUser, Depends(get_current_user)],
) -> ScheduleItemService:
repository = SQLAlchemyScheduleItemRepository(session)
return ScheduleItemService(
repository=repository,
session=session,
current_user=user,
)
```
---
## Task 6: 创建 Router
**Files:**
- Create: `backend/src/v1/schedule_items/router.py`
**Step 1: 写入 router.py**
```python
from __future__ import annotations
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from v1.schedule_items.dependencies import get_schedule_item_service
from v1.schedule_items.schemas import (
ScheduleItemCreateRequest,
ScheduleItemListItem,
ScheduleItemListRequest,
ScheduleItemResponse,
ScheduleItemUpdateRequest,
)
from v1.schedule_items.service import ScheduleItemService
router = APIRouter(prefix="/schedule-items", tags=["schedule-items"])
@router.post("", response_model=ScheduleItemResponse, status_code=201)
async def create_schedule_item(
request: ScheduleItemCreateRequest,
service: Annotated[ScheduleItemService, Depends(get_schedule_item_service)],
) -> ScheduleItemResponse:
return await service.create(request)
@router.get("", response_model=list[ScheduleItemListItem])
async def list_schedule_items(
start_at: datetime = Query(..., description="Start date/time for range query"),
end_at: datetime = Query(..., description="End date/time for range query"),
service: Annotated[ScheduleItemService, Depends(get_schedule_item_service)],
) -> list[ScheduleItemListItem]:
request = ScheduleItemListRequest(start_at=start_at, end_at=end_at)
items = await service.list_by_date_range(request)
return items
@router.get("/{item_id}", response_model=ScheduleItemResponse)
async def get_schedule_item(
item_id: UUID,
service: Annotated[ScheduleItemService, Depends(get_schedule_item_service)],
) -> ScheduleItemResponse:
return await service.get_by_id(item_id)
@router.patch("/{item_id}", response_model=ScheduleItemResponse)
async def update_schedule_item(
item_id: UUID,
request: ScheduleItemUpdateRequest,
service: Annotated[ScheduleItemService, Depends(get_schedule_item_service)],
) -> ScheduleItemResponse:
return await service.update(item_id, request)
@router.delete("/{item_id}", status_code=204)
async def delete_schedule_item(
item_id: UUID,
service: Annotated[ScheduleItemService, Depends(get_schedule_item_service)],
) -> None:
await service.delete(item_id)
```
---
## Task 7: 注册 Router
**Files:**
- Modify: `backend/src/v1/router.py:9-16`
**Step 1: 添加 router 导入和注册**
`backend/src/v1/router.py` 中添加:
```python
from v1.schedule_items.router import router as schedule_items_router
```
`router.include_router` 部分添加:
```python
router.include_router(schedule_items_router)
```
---
## Task 8: 创建单元测试
**Files:**
- Create: `backend/tests/unit/v1/schedule_items/test_schemas.py`
**Step 1: 写入 test_schemas.py**
```python
from datetime import datetime, timezone
from uuid import UUID
import pytest
from pydantic import ValidationError
from v1.schedule_items.schemas import (
AttachmentType,
ScheduleItemCreateRequest,
ScheduleItemMetadata,
ScheduleItemMetadataAttachment,
ScheduleItemUpdateRequest,
)
def test_create_request_valid() -> None:
request = ScheduleItemCreateRequest(
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
)
assert request.title == "Test Event"
assert request.timezone == "UTC"
def test_create_request_with_end_at() -> None:
request = ScheduleItemCreateRequest(
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 2, 28, 17, 30, 0, tzinfo=timezone.utc),
)
assert request.end_at is not None
def test_create_request_invalid_title_empty() -> None:
with pytest.raises(ValidationError):
ScheduleItemCreateRequest(
title="",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
)
def test_create_request_invalid_title_too_long() -> None:
with pytest.raises(ValidationError):
ScheduleItemCreateRequest(
title="x" * 256,
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
)
def test_create_request_with_metadata() -> None:
metadata = ScheduleItemMetadata(
color="#FF6B6B",
location="Meeting Room A",
notes="Bring documents",
attachments=[],
)
request = ScheduleItemCreateRequest(
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
metadata=metadata,
)
assert request.metadata is not None
assert request.metadata.color == "#FF6B6B"
def test_update_request_partial() -> None:
request = ScheduleItemUpdateRequest(title="Updated Title")
assert request.title == "Updated Title"
assert request.description is None
def test_metadata_attachment_document() -> None:
attachment = ScheduleItemMetadataAttachment(
name="document.pdf",
type=AttachmentType.DOCUMENT,
url="https://example.com/doc.pdf",
)
assert attachment.type == AttachmentType.DOCUMENT
assert attachment.url == "https://example.com/doc.pdf"
def test_metadata_attachment_reminder() -> None:
attachment = ScheduleItemMetadataAttachment(
name="reminder",
type=AttachmentType.REMINDER,
content="Don't forget!",
)
assert attachment.type == AttachmentType.REMINDER
assert attachment.content == "Don't forget!"
```
**Step 2: 运行测试验证**
Run: `cd backend && uv run pytest tests/unit/v1/schedule_items/test_schemas.py -v`
Expected: PASS
---
## Task 9: 创建 Service 单元测试
**Files:**
- Create: `backend/tests/unit/v1/schedule_items/test_service.py`
**Step 1: 写入 test_service.py**
```python
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
from uuid import UUID, uuid4
import pytest
from fastapi import HTTPException
from sqlalchemy.exc import SQLAlchemyError
from core.auth.models import CurrentUser
from models.schedule_items import ScheduleItem, ScheduleItemSourceType, ScheduleItemStatus
from v1.schedule_items.repository import ScheduleItemRepository
from v1.schedule_items.schemas import ScheduleItemCreateRequest, ScheduleItemUpdateRequest
from v1.schedule_items.service import ScheduleItemService
def _create_mock_schedule_item(
item_id: UUID = uuid4(),
owner_id: UUID = UUID("00000000-0000-0000-0000-000000000001"),
title: str = "Test Event",
) -> ScheduleItem:
item = MagicMock(spec=ScheduleItem)
item.id = item_id
item.owner_id = owner_id
item.title = title
item.description = None
item.start_at = datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc)
item.end_at = datetime(2026, 2, 28, 17, 0, 0, tzinfo=timezone.utc)
item.timezone = "UTC"
item.metadata = {}
item.source_type = ScheduleItemSourceType.MANUAL
item.status = ScheduleItemStatus.ACTIVE
item.created_at = datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc)
item.updated_at = datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc)
return item
class FakeRepo:
def __init__(self, item: ScheduleItem | None) -> None:
self._item = item
async def get_by_id(self, item_id: UUID, owner_id: UUID) -> ScheduleItem | None:
if self._item and item_id == self._item.id:
return self._item
return None
async def create(self, data: dict) -> ScheduleItem:
return _create_mock_schedule_item(**data)
async def update_by_id(self, item_id: UUID, owner_id: UUID, data: dict) -> ScheduleItem | None:
if not self._item or item_id != self._item.id:
return None
return self._item
async def delete_by_id(self, item_id: UUID, owner_id: UUID) -> ScheduleItem | None:
if not self._item or item_id != self._item.id:
return None
return self._item
async def list_by_date_range(
self, owner_id: UUID, start_at: datetime, end_at: datetime
) -> list[ScheduleItem]:
return [self._item] if self._item else []
@pytest.fixture
def mock_session() -> AsyncMock:
session = AsyncMock()
session.commit = AsyncMock()
session.rollback = AsyncMock()
return session
@pytest.mark.asyncio
async def test_create_success(mock_session: AsyncMock) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
request = ScheduleItemCreateRequest(
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
)
service = ScheduleItemService(
repository=FakeRepo(None),
session=mock_session,
current_user=CurrentUser(id=user_id),
)
result = await service.create(request)
assert result.title == "Test Event"
mock_session.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_create_invalid_end_at(mock_session: AsyncMock) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
request = ScheduleItemCreateRequest(
title="Test Event",
start_at=datetime(2026, 2, 28, 17, 0, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
)
service = ScheduleItemService(
repository=FakeRepo(None),
session=mock_session,
current_user=CurrentUser(id=user_id),
)
with pytest.raises(HTTPException) as exc_info:
await service.create(request)
assert exc_info.value.status_code == 400
@pytest.mark.asyncio
async def test_get_by_id_success(mock_session: AsyncMock) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
)
result = await service.get_by_id(item.id)
assert result.id == item.id
@pytest.mark.asyncio
async def test_get_by_id_not_found(mock_session: AsyncMock) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
service = ScheduleItemService(
repository=FakeRepo(None),
session=mock_session,
current_user=CurrentUser(id=user_id),
)
with pytest.raises(HTTPException) as exc_info:
await service.get_by_id(uuid4())
assert exc_info.value.status_code == 404
@pytest.mark.asyncio
async def test_update_success(mock_session: AsyncMock) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
)
result = await service.update(item.id, ScheduleItemUpdateRequest(title="Updated"))
assert result.title == "Updated"
@pytest.mark.asyncio
async def test_delete_success(mock_session: AsyncMock) -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = _create_mock_schedule_item()
service = ScheduleItemService(
repository=FakeRepo(item),
session=mock_session,
current_user=CurrentUser(id=user_id),
)
await service.delete(item.id)
mock_session.commit.assert_awaited_once()
```
**Step 2: 运行测试验证**
Run: `cd backend && uv run pytest tests/unit/v1/schedule_items/test_service.py -v`
Expected: PASS
---
## Task 10: 创建集成测试
**Files:**
- Create: `backend/tests/integration/test_schedule_items_routes.py`
**Step 1: 写入 test_schedule_items_routes.py**
```python
from datetime import datetime, timezone
from typing import Callable
from uuid import UUID, uuid4
from fastapi import HTTPException
from fastapi.testclient import TestClient
from app import app
from core.auth.models import CurrentUser
from v1.schedule_items.dependencies import get_schedule_item_service
from v1.schedule_items.schemas import (
ScheduleItemCreateRequest,
ScheduleItemListRequest,
ScheduleItemResponse,
ScheduleItemUpdateRequest,
)
from v1.schedule_items.service import ScheduleItemService
class FakeScheduleItemService:
def __init__(self, item: ScheduleItemResponse | None) -> None:
self._item = item
async def create(self, request: ScheduleItemCreateRequest) -> ScheduleItemResponse:
if not self._item:
raise HTTPException(status_code=503, detail="Store unavailable")
return self._item
async def get_by_id(self, item_id: UUID) -> ScheduleItemResponse:
if not self._item or str(self._item.id) != str(item_id):
raise HTTPException(status_code=404, detail="Schedule item not found")
return self._item
async def update(
self, item_id: UUID, request: ScheduleItemUpdateRequest
) -> ScheduleItemResponse:
if not self._item or str(self._item.id) != str(item_id):
raise HTTPException(status_code=404, detail="Schedule item not found")
return self._item
async def delete(self, item_id: UUID) -> None:
if not self._item or str(self._item.id) != str(item_id):
raise HTTPException(status_code=404, detail="Schedule item not found")
async def list_by_date_range(
self, request: ScheduleItemListRequest
) -> list[ScheduleItemResponse]:
return [self._item] if self._item else []
def _override_schedule_item_service(
service: FakeScheduleItemService,
) -> Callable[[], ScheduleItemService]:
def _get_service() -> ScheduleItemService:
return service # type: ignore[return-value]
return _get_service
def _override_current_user(user_id: UUID) -> Callable[[], CurrentUser]:
def _get_user() -> CurrentUser:
return CurrentUser(id=user_id)
return _get_user
def test_create_schedule_item_returns_201() -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = ScheduleItemResponse(
id=uuid4(),
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
status="active",
source_type="manual",
created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
)
app.dependency_overrides[get_schedule_item_service] = _override_schedule_item_service(
FakeScheduleItemService(item)
)
client = TestClient(app)
try:
response = client.post(
"/api/v1/schedule-items",
json={
"title": "Test Event",
"start_at": "2026-02-28T16:00:00Z",
},
)
assert response.status_code == 201
finally:
app.dependency_overrides = {}
def test_list_schedule_items_returns_200() -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item = ScheduleItemResponse(
id=uuid4(),
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
status="active",
source_type="manual",
created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
)
app.dependency_overrides[get_schedule_item_service] = _override_schedule_item_service(
FakeScheduleItemService(item)
)
client = TestClient(app)
try:
response = client.get(
"/api/v1/schedule-items",
params={
"start_at": "2026-02-01T00:00:00Z",
"end_at": "2026-02-28T23:59:59Z",
},
)
assert response.status_code == 200
assert isinstance(response.json(), list)
finally:
app.dependency_overrides = {}
def test_get_schedule_item_returns_200() -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item_id = uuid4()
item = ScheduleItemResponse(
id=item_id,
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
status="active",
source_type="manual",
created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
)
app.dependency_overrides[get_schedule_item_service] = _override_schedule_item_service(
FakeScheduleItemService(item)
)
client = TestClient(app)
try:
response = client.get(f"/api/v1/schedule-items/{item_id}")
assert response.status_code == 200
finally:
app.dependency_overrides = {}
def test_update_schedule_item_returns_200() -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item_id = uuid4()
item = ScheduleItemResponse(
id=item_id,
title="Updated Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
status="active",
source_type="manual",
created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
)
app.dependency_overrides[get_schedule_item_service] = _override_schedule_item_service(
FakeScheduleItemService(item)
)
client = TestClient(app)
try:
response = client.patch(
f"/api/v1/schedule-items/{item_id}",
json={"title": "Updated Event"},
)
assert response.status_code == 200
finally:
app.dependency_overrides = {}
def test_delete_schedule_item_returns_204() -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
item_id = uuid4()
item = ScheduleItemResponse(
id=item_id,
title="Test Event",
start_at=datetime(2026, 2, 28, 16, 0, 0, tzinfo=timezone.utc),
timezone="UTC",
status="active",
source_type="manual",
created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc),
)
app.dependency_overrides[get_schedule_item_service] = _override_schedule_item_service(
FakeScheduleItemService(item)
)
client = TestClient(app)
try:
response = client.delete(f"/api/v1/schedule-items/{item_id}")
assert response.status_code == 204
finally:
app.dependency_overrides = {}
```
**Step 2: 运行测试验证**
Run: `cd backend && uv run pytest tests/integration/test_schedule_items_routes.py -v`
Expected: PASS
---
## Task 11: 运行 Lint 和 TypeCheck
**Step 1: 运行 ruff**
Run: `cd backend && uv run ruff check src/v1/schedule_items/`
Expected: No errors
**Step 2: 运行 typecheck**
Run: `cd backend && uv run basedpyright src/v1/schedule_items/`
Expected: No errors
---
## Task 12: 更新 API 文档
**Files:**
- Modify: `docs/runtime/runtime-route.md`
`## Auth` 后添加:
```markdown
## Schedule Items
### POST /schedule-items
创建日历事项(需要认证)。
**Request:**
```json
{
"title": "string (1-255 chars, required)",
"description": "string? (max 2000 chars)",
"start_at": "string (ISO 8601 datetime, required)",
"end_at": "string? (ISO 8601 datetime)",
"timezone": "string? (default: UTC)",
"metadata": {
"color": "#FF6B6B",
"location": "会议室A",
"notes": "记得带身份证",
"attachments": [],
"version": 1
}
}
```
**Response:** 201 Created
```json
{
"id": "uuid",
"title": "string",
"description": "string?",
"start_at": "string",
"end_at": "string?",
"timezone": "string",
"metadata": {},
"status": "active",
"source_type": "manual",
"created_at": "string",
"updated_at": "string"
}
```
**Errors:**
- 400: end_at 早于 start_at
- 401: 未认证
- 503: 服务不可用
---
### GET /schedule-items
按时间范围查询日历事项列表(需要认证)。
**Query Parameters:**
- `start_at`: ISO 8601 date/datetime(查询范围起始)
- `end_at`: ISO 8601 date/datetime(查询范围结束)
**Response:** 200 OK
```json
[
{
"id": "uuid",
"title": "string",
"start_at": "string",
"end_at": "string?",
"timezone": "string",
"status": "active"
}
]
```
**Errors:**
- 400: end_at 早于 start_at
- 401: 未认证
---
### GET /schedule-items/{id}
获取单个日历事项详情(需要认证)。
**Response:** 200 OK
**Errors:**
- 401: 未认证
- 404: 事项不存在
---
### PATCH /schedule-items/{id}
更新日历事项(需要认证)。
**Request:** 支持 `title`/`description`/`start_at`/`end_at`/`timezone`/`metadata`/`status` 部分更新
**Response:** 200 OK
**Errors:**
- 401: 未认证
- 404: 事项不存在
---
### DELETE /schedule-items/{id}
删除日历事项(软删除,需要认证)。
**Response:** 204 No Content
**Errors:**
- 401: 未认证
- 404: 事项不存在
---
```
---
## Task 13: 提交代码
**Step 1: 提交所有变更**
Run: `git add -A && git commit -m "feat: add schedule items CRUD API
- Add ScheduleItem Pydantic schemas with metadata support
- Add repository layer with CRUD operations
- Add service layer with authorization
- Add FastAPI router with all endpoints
- Add unit and integration tests
- Update API documentation"`
Expected: Commit created successfully