from __future__ import annotations import re from typing import TYPE_CHECKING, Protocol, cast from uuid import UUID from fastapi import HTTPException from sqlalchemy.exc import SQLAlchemyError from core.agentscope.caches.user_context_cache import ( create_user_context_cache, ) from core.auth.models import CurrentUser from core.config.settings import config from core.db.base_service import BaseService from core.logging import get_logger from schemas.shared.user import UserContext, parse_profile_settings from services.base.supabase import supabase_service from v1.users.repository import UserRepository from v1.users.schemas import AvatarUploadResponse, UserSearchRequest, UserUpdateRequest if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession from schemas.shared.user import UserContext logger = get_logger("v1.users.service") _PHONE_QUERY_PATTERN = re.compile(r"^[+()\-\s\d]{4,32}$") def _mime_to_suffix(mime_type: str) -> str: """Convert MIME type to file suffix.""" mapping = { "image/jpeg": "jpg", "image/png": "png", "image/webp": "webp", } return mapping.get(mime_type, "bin") class AuthLookupGateway(Protocol): async def search_user_ids_by_phone( self, query: str, limit: int = 20 ) -> list[str]: ... class AuthByPhoneGateway(Protocol): async def search_user_ids_by_phone( self, query: str, limit: int = 20 ) -> list[str]: ... class UserContextInvalidator(Protocol): async def invalidate_user(self, *, user_id: UUID) -> int: ... class AuthLookupAdapter: def __init__(self, gateway: AuthByPhoneGateway) -> None: self._gateway = gateway async def search_user_ids_by_phone(self, query: str, limit: int = 20) -> list[str]: try: return await self._gateway.search_user_ids_by_phone(query, limit=limit) except HTTPException: return [] class UserService(BaseService): """User service handling business logic and transactions. Responsibilities: - Authorization checks - Transaction boundary (commit/rollback) - Converting ORM models to response schemas """ _repository: UserRepository _session: AsyncSession _auth_gateway: AuthLookupGateway | None _user_context_cache: UserContextInvalidator def __init__( self, repository: UserRepository, session: AsyncSession, current_user: CurrentUser | None, auth_gateway: AuthLookupGateway | None = None, user_context_cache: UserContextInvalidator | None = None, ) -> None: super().__init__(current_user=current_user) self._repository = repository self._session = session self._auth_gateway = auth_gateway self._user_context_cache = cast( UserContextInvalidator, user_context_cache or create_user_context_cache(), ) async def get_me(self) -> UserContext: user_id = self.require_user_id() try: user = await self._repository.get_by_user_id(user_id) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") if user is None: raise HTTPException(status_code=404, detail="User not found") phone = self._current_user.phone if self._current_user else None return UserContext( id=str(user.id), username=user.username, phone=phone, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) async def get_user_by_id(self, user_id: UUID) -> "UserContext": from schemas.shared.user import UserContext try: profile = await self._repository.get_by_user_id(user_id) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") if profile is None: raise HTTPException(status_code=404, detail="User not found") return UserContext( id=str(profile.id), username=profile.username, avatar_url=profile.avatar_url, ) async def update_me(self, update: UserUpdateRequest) -> UserContext: user_id = self.require_user_id() update_data: dict[str, str | None] = { key: value for key, value in { "username": update.username, "avatar_url": update.avatar_url, "bio": update.bio, }.items() if value is not None } if not update_data: raise HTTPException(status_code=400, detail="No fields to update") try: user = await self._repository.update_by_user_id(user_id, update_data) await self._session.commit() except SQLAlchemyError: await self._session.rollback() raise HTTPException(status_code=503, detail="User store unavailable") if user is None: raise HTTPException(status_code=404, detail="User not found") try: await self._user_context_cache.invalidate_user(user_id=user_id) except Exception as exc: logger.warning( "Failed to invalidate user context cache after profile update", user_id=str(user_id), error=str(exc), ) phone = self._current_user.phone if self._current_user else None return UserContext( id=str(user.id), username=user.username, phone=phone, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) async def upload_avatar( self, *, filename: str | None, content_type: str | None, payload: bytes, ) -> AvatarUploadResponse: user_id = self.require_user_id() if not isinstance(content_type, str): raise HTTPException(status_code=422, detail="Unsupported image type") mime_type = content_type.lower() allowed_types = {"image/jpeg", "image/png", "image/webp"} if mime_type not in allowed_types: raise HTTPException( status_code=422, detail="Unsupported image type. Allowed: JPEG, PNG, WebP", ) max_size_bytes = config.storage.avatar.max_size_mb * 1024 * 1024 if len(payload) > max_size_bytes: raise HTTPException( status_code=413, detail=f"Image too large. Maximum size: {config.storage.avatar.max_size_mb}MB", ) if not payload: raise HTTPException(status_code=422, detail="Empty image") suffix = _mime_to_suffix(mime_type) path = f"{user_id}/avatar.{suffix}" bucket_name = config.storage.avatar.bucket try: stored_path = await supabase_service.upload_bytes( bucket=bucket_name, path=path, content=payload, content_type=mime_type, ) except Exception: # noqa: BLE001 logger.exception( "Avatar upload failed", extra={ "bucket": bucket_name, "path": path, "mime_type": mime_type, "user_id": str(user_id), }, ) raise HTTPException(status_code=502, detail="Failed to upload avatar") public_url = f"{config.supabase.public_url}/storage/v1/object/public/{bucket_name}/{stored_path}" update_data: dict[str, str | None] = {"avatar_url": public_url} try: user = await self._repository.update_by_user_id(user_id, update_data) await self._session.commit() except SQLAlchemyError: await self._session.rollback() raise HTTPException(status_code=503, detail="User store unavailable") if user is None: raise HTTPException(status_code=404, detail="User not found") try: await self._user_context_cache.invalidate_user(user_id=user_id) except Exception as exc: logger.warning( "Failed to invalidate user context cache after avatar upload", user_id=str(user_id), error=str(exc), ) return AvatarUploadResponse(url=public_url) async def get_by_username(self, username: str) -> UserContext: try: user = await self._repository.get_by_username(username) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") if user is None: raise HTTPException(status_code=404, detail="User not found") return UserContext( id=str(user.id), username=user.username, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) async def search_users(self, request: UserSearchRequest) -> list[UserContext]: query = request.query.strip() if _looks_like_phone_query(query): phone_results = await self._search_by_phone(query) if not query.isdigit(): return phone_results username_results = await self._search_by_username(query) if not phone_results: return username_results merged_by_id = {result.id: result for result in phone_results} for result in username_results: merged_by_id.setdefault(result.id, result) return list(merged_by_id.values()) return await self._search_by_username(query) async def _search_by_phone(self, phone: str) -> list[UserContext]: if self._auth_gateway is None: raise HTTPException(status_code=503, detail="Auth lookup unavailable") user_id_values = await self._auth_gateway.search_user_ids_by_phone( phone, limit=20 ) if not user_id_values: return [] user_ids: list[UUID] = [] for raw_id in user_id_values: try: user_ids.append(UUID(raw_id)) except ValueError: continue if not user_ids: return [] try: users_by_id = await self._repository.get_by_user_ids(user_ids) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") results: list[UserContext] = [] for user_id in user_ids: user = users_by_id.get(user_id) if user is None: continue results.append( UserContext( id=str(user.id), username=user.username, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) ) return results async def _search_by_username(self, query: str) -> list[UserContext]: try: users = await self._repository.search_users(query, limit=20) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") return [ UserContext( id=str(user.id), username=user.username, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) for user in users ] def _looks_like_phone_query(query: str) -> bool: if not _PHONE_QUERY_PATTERN.fullmatch(query): return False digits_count = sum(char.isdigit() for char in query) return digits_count >= 4