from __future__ import annotations import re from typing import TYPE_CHECKING, Protocol, cast from uuid import UUID from fastapi import HTTPException from sqlalchemy.exc import SQLAlchemyError from core.auth.models import CurrentUser from core.agentscope.persistence.user_context_cache import ( create_user_context_cache, ) from core.db.base_service import BaseService from core.logging import get_logger from schemas.user.context import UserContext, parse_profile_settings from v1.users.repository import UserRepository from v1.users.schemas import UserSearchRequest, UserUpdateRequest if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession from schemas.user.context import UserContext from v1.auth.schemas import UserByEmailResponse logger = get_logger("v1.users.service") _EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") class AuthLookupGateway(Protocol): async def get_user_id_by_email(self, email: str) -> str | None: ... class AuthByEmailGateway(Protocol): async def get_user_by_email(self, email: str) -> "UserByEmailResponse": ... class UserContextInvalidator(Protocol): async def invalidate_user(self, *, user_id: UUID) -> int: ... class AuthLookupAdapter: def __init__(self, gateway: AuthByEmailGateway) -> None: self._gateway = gateway async def get_user_id_by_email(self, email: str) -> str | None: try: response = await self._gateway.get_user_by_email(email) return response.id except HTTPException: return None class UserService(BaseService): """User service handling business logic and transactions. Responsibilities: - Authorization checks - Transaction boundary (commit/rollback) - Converting ORM models to response schemas """ _repository: UserRepository _session: AsyncSession _auth_gateway: AuthLookupGateway | None _user_context_cache: UserContextInvalidator def __init__( self, repository: UserRepository, session: AsyncSession, current_user: CurrentUser | None, auth_gateway: AuthLookupGateway | None = None, user_context_cache: UserContextInvalidator | None = None, ) -> None: super().__init__(current_user=current_user) self._repository = repository self._session = session self._auth_gateway = auth_gateway self._user_context_cache = cast( UserContextInvalidator, user_context_cache or create_user_context_cache(), ) async def get_me(self) -> UserContext: user_id = self.require_user_id() try: user = await self._repository.get_by_user_id(user_id) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") if user is None: raise HTTPException(status_code=404, detail="User not found") email = self._current_user.email if self._current_user else None return UserContext( id=str(user.id), username=user.username, email=email, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) async def get_user_by_id(self, user_id: UUID) -> "UserContext": from schemas.user.context import UserContext try: profile = await self._repository.get_by_user_id(user_id) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") if profile is None: raise HTTPException(status_code=404, detail="User not found") return UserContext( id=str(profile.id), username=profile.username, avatar_url=profile.avatar_url, ) async def update_me(self, update: UserUpdateRequest) -> UserContext: user_id = self.require_user_id() update_data: dict[str, str | None] = { key: value for key, value in { "username": update.username, "avatar_url": update.avatar_url, "bio": update.bio, }.items() if value is not None } if not update_data: raise HTTPException(status_code=400, detail="No fields to update") try: user = await self._repository.update_by_user_id(user_id, update_data) await self._session.commit() except SQLAlchemyError: await self._session.rollback() raise HTTPException(status_code=503, detail="User store unavailable") if user is None: raise HTTPException(status_code=404, detail="User not found") try: await self._user_context_cache.invalidate_user(user_id=user_id) except Exception as exc: logger.warning( "Failed to invalidate user context cache after profile update", user_id=str(user_id), error=str(exc), ) email = self._current_user.email if self._current_user else None return UserContext( id=str(user.id), username=user.username, email=email, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) async def get_by_username(self, username: str) -> UserContext: try: user = await self._repository.get_by_username(username) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") if user is None: raise HTTPException(status_code=404, detail="User not found") return UserContext( id=str(user.id), username=user.username, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) async def search_users(self, request: UserSearchRequest) -> list[UserContext]: query = request.query.strip() if _EMAIL_PATTERN.match(query): return await self._search_by_email(query) return await self._search_by_username(query) async def _search_by_email(self, email: str) -> list[UserContext]: if self._auth_gateway is None: raise HTTPException(status_code=503, detail="Auth lookup unavailable") user_id_str = await self._auth_gateway.get_user_id_by_email(email) if user_id_str is None: return [] try: user = await self._repository.get_by_user_id(UUID(user_id_str)) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") if user is None: return [] return [ UserContext( id=str(user.id), username=user.username, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) ] async def _search_by_username(self, query: str) -> list[UserContext]: try: users = await self._repository.search_users(query, limit=20) except SQLAlchemyError: raise HTTPException(status_code=503, detail="User store unavailable") return [ UserContext( id=str(user.id), username=user.username, avatar_url=user.avatar_url, bio=user.bio, settings=parse_profile_settings(user.settings), ) for user in users ]