from __future__ import annotations from typing import TYPE_CHECKING, Protocol from uuid import UUID from sqlalchemy import select, or_ from sqlalchemy.exc import SQLAlchemyError from core.db.base_repository import BaseRepository from core.logging import get_logger from models.profile import Profile if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession logger = get_logger("v1.users.repository") class UserRepository(Protocol): """Protocol defining the user repository interface.""" async def get_by_user_id(self, user_id: UUID) -> Profile | None: """Get user by user ID.""" ... async def get_by_username(self, username: str) -> Profile | None: """Get user by username.""" ... async def update_by_user_id( self, user_id: UUID, update_data: dict[str, str | None] ) -> Profile | None: """Update user by user ID. Returns updated user or None if not found.""" ... async def search_users(self, query: str, limit: int = 20) -> list[Profile]: """Search users by username (ilike) or email (exact match).""" ... class SQLAlchemyUserRepository(BaseRepository[Profile]): """SQLAlchemy implementation of UserRepository. Note: This repository only performs CRUD operations. - No commit (only flush) - service layer handles transactions - No auth logic - service layer handles authorization - No HTTP exceptions - returns None or raises SQLAlchemyError """ def __init__(self, session: AsyncSession) -> None: super().__init__(session, Profile) async def get_by_user_id(self, user_id: UUID) -> Profile | None: try: return await self.get_by_id(user_id) except SQLAlchemyError: logger.exception("User lookup failed", user_id=str(user_id)) raise async def get_by_username(self, username: str) -> Profile | None: try: stmt = ( select(Profile) .where(Profile.username == username) .where(Profile.deleted_at.is_(None)) .order_by(Profile.created_at.asc()) .limit(1) ) result = await self._session.execute(stmt) return result.scalar_one_or_none() except SQLAlchemyError: logger.exception("User lookup failed", username=username) raise async def update_by_user_id( self, user_id: UUID, update_data: dict[str, str | None] ) -> Profile | None: if not update_data: return await self.get_by_user_id(user_id) try: return await self.update_by_id(user_id, update_data) except SQLAlchemyError: logger.exception("User update failed", user=str(user_id)) raise async def search_users(self, query: str, limit: int = 20) -> list[Profile]: try: stmt = ( select(Profile) .where(Profile.deleted_at.is_(None)) .where( or_( Profile.username.ilike(f"%{query}%"), ) ) .order_by(Profile.created_at.asc()) .limit(limit) ) result = await self._session.execute(stmt) return list(result.scalars().all()) except SQLAlchemyError: logger.exception("User search failed", query=query) raise