feat: add schedule items CRUD API

- Add ScheduleItem Pydantic schemas with metadata support
- Add repository layer with CRUD operations
- Add service layer with authorization
- Add FastAPI router with all endpoints
- Add unit and integration tests
- Update API documentation
This commit is contained in:
qzl
2026-02-28 11:03:29 +08:00
parent dbd3f68dd4
commit 50b38de488
12 changed files with 1114 additions and 0 deletions
+119
View File
@@ -0,0 +1,119 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Protocol
from uuid import UUID
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.db.base_repository import BaseRepository
from core.logging import get_logger
from models.schedule_items import ScheduleItem
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
logger = get_logger("v1.schedule_items.repository")
class ScheduleItemRepository(Protocol):
async def get_by_item_id(
self, item_id: UUID, owner_id: UUID
) -> ScheduleItem | None: ...
async def create(self, data: dict) -> ScheduleItem: ...
async def update_by_item_id(
self, item_id: UUID, owner_id: UUID, data: dict
) -> ScheduleItem | None: ...
async def delete_by_item_id(
self, item_id: UUID, owner_id: UUID
) -> ScheduleItem | None: ...
async def list_by_date_range(
self, owner_id: UUID, start_at: datetime, end_at: datetime
) -> list[ScheduleItem]: ...
class SQLAlchemyScheduleItemRepository(BaseRepository[ScheduleItem]):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session, ScheduleItem)
async def get_by_item_id(
self, item_id: UUID, owner_id: UUID
) -> ScheduleItem | None:
try:
stmt = (
select(ScheduleItem)
.where(ScheduleItem.id == item_id)
.where(ScheduleItem.owner_id == owner_id)
.where(ScheduleItem.deleted_at.is_(None))
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
except SQLAlchemyError:
logger.exception(
"Schedule item lookup failed",
item_id=str(item_id),
owner_id=str(owner_id),
)
raise
async def create(self, data: dict) -> ScheduleItem:
try:
item = ScheduleItem(**data)
self._session.add(item)
await self._session.flush()
return item
except SQLAlchemyError:
logger.exception("Schedule item creation failed")
raise
async def update_by_item_id(
self, item_id: UUID, owner_id: UUID, data: dict
) -> ScheduleItem | None:
if not data:
return await self.get_by_item_id(item_id, owner_id)
try:
existing = await self.get_by_item_id(item_id, owner_id)
if existing is None:
return None
stmt = (
update(ScheduleItem)
.where(ScheduleItem.id == item_id)
.where(ScheduleItem.owner_id == owner_id)
.where(ScheduleItem.deleted_at.is_(None))
.values(**data)
.returning(ScheduleItem)
)
result = await self._session.execute(stmt)
await self._session.flush()
return result.scalar_one_or_none()
except SQLAlchemyError:
logger.exception("Schedule item update failed", item_id=str(item_id))
raise
async def delete_by_item_id(
self, item_id: UUID, owner_id: UUID
) -> ScheduleItem | None:
try:
return await self.soft_delete_by_id(item_id)
except SQLAlchemyError:
logger.exception("Schedule item delete failed", item_id=str(item_id))
raise
async def list_by_date_range(
self, owner_id: UUID, start_at: datetime, end_at: datetime
) -> list[ScheduleItem]:
try:
stmt = (
select(ScheduleItem)
.where(ScheduleItem.owner_id == owner_id)
.where(ScheduleItem.deleted_at.is_(None))
.where(ScheduleItem.start_at >= start_at)
.where(ScheduleItem.start_at <= end_at)
.order_by(ScheduleItem.start_at.asc())
)
result = await self._session.execute(stmt)
return list(result.scalars().all())
except SQLAlchemyError:
logger.exception("Schedule item list failed", owner_id=str(owner_id))
raise