from __future__ import annotations from uuid import UUID from sqlalchemy.dialects.postgresql import insert from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from models.points_ledger import PointsLedger from models.user_points import UserPoints from schemas.domain.points import ApplyPointsChangeCommand class PointsRepository: def __init__(self, session: AsyncSession) -> None: self._session = session async def get_or_create_user_points_for_update( self, *, user_id: UUID ) -> UserPoints: insert_stmt = ( insert(UserPoints) .values(user_id=user_id) .on_conflict_do_nothing(index_elements=[UserPoints.user_id]) ) await self._session.execute(insert_stmt) stmt = select(UserPoints).where(UserPoints.user_id == user_id).with_for_update() return (await self._session.execute(stmt)).scalar_one() async def has_ledger_event(self, *, user_id: UUID, event_id: str) -> bool: stmt = select(PointsLedger.id).where( PointsLedger.user_id == user_id, PointsLedger.event_id == event_id, ) row = (await self._session.execute(stmt)).scalar_one_or_none() return row is not None async def append_ledger( self, *, command: ApplyPointsChangeCommand, balance_after: int, ) -> None: entry = PointsLedger( user_id=command.user_id, direction=command.direction, amount=command.amount, balance_after=balance_after, change_type=command.change_type.value, biz_type=command.biz_type.value if command.biz_type is not None else None, biz_id=command.biz_id, event_id=command.event_id, operator_id=command.operator_id, metadata_json=command.metadata.model_dump(mode="json", exclude_none=True), ) self._session.add(entry) await self._session.flush() async def get_user_points(self, *, user_id: UUID) -> UserPoints: insert_stmt = ( insert(UserPoints) .values(user_id=user_id) .on_conflict_do_nothing(index_elements=[UserPoints.user_id]) ) await self._session.execute(insert_stmt) stmt = select(UserPoints).where(UserPoints.user_id == user_id) return (await self._session.execute(stmt)).scalar_one()