Files
social-app/backend/src/v1/todo/service.py
T

378 lines
12 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy.exc import SQLAlchemyError
from core.auth.models import CurrentUser
from core.db.base_service import BaseService
from core.logging import get_logger
from models.todos import Todo, TodoStatus
from v1.schedule_items.repository import SQLAlchemyScheduleItemRepository
from v1.todo.repository import TodoRepository
from v1.todo.schemas import (
ScheduleItemBasic,
TodoCreate,
TodoReorderRequest,
TodoResponse,
TodoUpdate,
)
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
logger = get_logger("v1.todo.service")
class TodoService(BaseService):
"""Todo service handling todo CRUD operations.
Responsibilities:
- Authorization checks
- Validation (ownership, status transitions)
- Transaction boundary (commit/rollback)
- Converting ORM models to response schemas
"""
_repository: TodoRepository
_schedule_item_repository: SQLAlchemyScheduleItemRepository
_session: AsyncSession
def __init__(
self,
repository: TodoRepository,
schedule_item_repository: SQLAlchemyScheduleItemRepository,
session: AsyncSession,
current_user: CurrentUser | None,
) -> None:
super().__init__(current_user=current_user)
self._repository = repository
self._schedule_item_repository = schedule_item_repository
self._session = session
async def create(self, request: TodoCreate) -> TodoResponse:
user_id = self.require_user_id()
try:
order_value = request.order
if order_value is None:
todos_in_priority = await self._repository.list_by_owner(
owner_id=user_id,
priority=request.priority,
)
order_value = len(todos_in_priority)
todo = await self._repository.create(
owner_id=user_id,
title=request.title,
description=request.description,
priority=request.priority,
order=order_value,
created_by=user_id,
)
if request.schedule_item_ids:
await self._repository.set_schedule_items(
todo.id, request.schedule_item_ids
)
await self._session.commit()
except SQLAlchemyError:
await self._session.rollback()
raise HTTPException(status_code=503, detail="Todo service unavailable")
logger.info(
"todo_created",
extra={
"user_id": str(user_id),
"todo_id": str(todo.id),
},
)
return await self._to_response(todo)
async def get_by_id(self, todo_id: UUID) -> TodoResponse:
user_id = self.require_user_id()
try:
todo = await self._repository.get_by_id(todo_id)
except SQLAlchemyError:
raise HTTPException(status_code=503, detail="Todo service unavailable")
if todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
if todo.owner_id != user_id:
logger.warning(
"todo_access_unauthorized",
extra={
"actor_id": str(user_id),
"todo_id": str(todo_id),
},
)
raise HTTPException(
status_code=403, detail="Not authorized to access this todo"
)
return await self._to_response(todo)
async def update(self, todo_id: UUID, request: TodoUpdate) -> TodoResponse:
user_id = self.require_user_id()
try:
todo = await self._repository.get_by_id(todo_id)
except SQLAlchemyError:
raise HTTPException(status_code=503, detail="Todo service unavailable")
if todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
if todo.owner_id != user_id:
logger.warning(
"todo_update_unauthorized",
extra={
"actor_id": str(user_id),
"todo_id": str(todo_id),
},
)
raise HTTPException(
status_code=403, detail="Not authorized to update this todo"
)
completed_at = None
if request.status == TodoStatus.DONE and todo.status != TodoStatus.DONE:
completed_at = datetime.now(timezone.utc)
elif request.status != TodoStatus.DONE and todo.status == TodoStatus.DONE:
completed_at = None
status_enum: TodoStatus | None = None
if request.status is not None:
status_enum = TodoStatus(request.status)
try:
todo = await self._repository.update(
todo,
title=request.title,
description=request.description,
priority=request.priority,
order=request.order,
status=status_enum,
completed_at=completed_at,
)
if request.schedule_item_ids is not None:
await self._repository.set_schedule_items(
todo.id, request.schedule_item_ids
)
await self._session.commit()
await self._session.refresh(todo)
except SQLAlchemyError:
await self._session.rollback()
raise HTTPException(status_code=503, detail="Todo service unavailable")
logger.info(
"todo_updated",
extra={
"user_id": str(user_id),
"todo_id": str(todo_id),
},
)
return await self._to_response(todo)
async def complete(self, todo_id: UUID) -> TodoResponse:
user_id = self.require_user_id()
try:
todo = await self._repository.get_by_id(todo_id)
except SQLAlchemyError:
raise HTTPException(status_code=503, detail="Todo service unavailable")
if todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
if todo.owner_id != user_id:
logger.warning(
"todo_complete_unauthorized",
extra={
"actor_id": str(user_id),
"todo_id": str(todo_id),
},
)
raise HTTPException(
status_code=403, detail="Not authorized to complete this todo"
)
try:
todo = await self._repository.update(
todo,
status=TodoStatus.DONE,
completed_at=datetime.now(timezone.utc),
)
await self._session.commit()
await self._session.refresh(todo)
except SQLAlchemyError:
await self._session.rollback()
raise HTTPException(status_code=503, detail="Todo service unavailable")
logger.info(
"todo_completed",
extra={
"user_id": str(user_id),
"todo_id": str(todo_id),
},
)
return await self._to_response(todo)
async def delete(self, todo_id: UUID) -> None:
user_id = self.require_user_id()
try:
todo = await self._repository.get_by_id(todo_id)
except SQLAlchemyError:
raise HTTPException(status_code=503, detail="Todo service unavailable")
if todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
if todo.owner_id != user_id:
logger.warning(
"todo_delete_unauthorized",
extra={
"actor_id": str(user_id),
"todo_id": str(todo_id),
},
)
raise HTTPException(
status_code=403, detail="Not authorized to delete this todo"
)
try:
todo.deleted_at = datetime.now(timezone.utc)
await self._session.commit()
except SQLAlchemyError:
await self._session.rollback()
raise HTTPException(status_code=503, detail="Todo service unavailable")
logger.info(
"todo_deleted",
extra={
"user_id": str(user_id),
"todo_id": str(todo_id),
},
)
async def reorder(self, request: TodoReorderRequest) -> None:
user_id = self.require_user_id()
seen_ids: set[UUID] = set()
original_priorities: set[int] = set()
target_priorities: set[int] = set()
try:
for item in request.items:
if item.id in seen_ids:
raise HTTPException(status_code=400, detail="Duplicate todo id")
seen_ids.add(item.id)
todo = await self._repository.get_by_id(item.id)
if todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
if todo.owner_id != user_id:
raise HTTPException(
status_code=403, detail="Not authorized to reorder this todo"
)
original_priorities.add(todo.priority)
target_priorities.add(item.priority)
await self._repository.update(
todo,
priority=item.priority,
order=item.order,
)
affected_priorities = original_priorities.union(target_priorities)
for priority in affected_priorities:
todos = await self._repository.list_by_owner(
owner_id=user_id,
status=TodoStatus.PENDING,
priority=priority,
)
todos.sort(key=lambda current: (current.order, current.created_at))
for index, todo in enumerate(todos):
if todo.order != index:
await self._repository.update(todo, order=index)
await self._session.commit()
except SQLAlchemyError:
await self._session.rollback()
raise HTTPException(status_code=503, detail="Todo service unavailable")
async def list_todos(
self,
status: str | None = None,
priority: int | None = None,
) -> list[TodoResponse]:
user_id = self.require_user_id()
status_enum = None
if status is not None:
try:
status_enum = TodoStatus(status)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid status value")
if priority is not None and (priority < 1 or priority > 4):
raise HTTPException(status_code=400, detail="Invalid priority value")
try:
todos = await self._repository.list_by_owner(
owner_id=user_id,
status=status_enum,
priority=priority,
)
except SQLAlchemyError:
raise HTTPException(status_code=503, detail="Todo service unavailable")
return [await self._to_response(todo) for todo in todos]
async def _to_response(self, todo: Todo) -> TodoResponse:
status_value = (
todo.status.value if hasattr(todo.status, "value") else str(todo.status)
)
schedule_item_ids = await self._repository.get_schedule_items(todo.id)
schedule_items: list[ScheduleItemBasic] = []
for item_id in schedule_item_ids:
item = await self._schedule_item_repository.get_by_id(item_id)
if item:
schedule_items.append(
ScheduleItemBasic(
id=item.id,
title=item.title,
start_at=item.start_at,
end_at=item.end_at,
)
)
return TodoResponse(
id=todo.id,
owner_id=todo.owner_id,
title=todo.title,
description=todo.description,
priority=todo.priority,
order=todo.order,
status=status_value,
completed_at=todo.completed_at,
created_at=todo.created_at,
updated_at=todo.updated_at,
schedule_items=schedule_items,
)