Files
social-app/backend/src/v1/auth/service.py
T

74 lines
2.4 KiB
Python
Raw Normal View History

from __future__ import annotations
from typing import Protocol
from v1.auth.schemas import (
SessionCreateRequest,
SessionRefreshRequest,
SessionResponse,
UserByEmailResponse,
VerificationCreateRequest,
VerificationCreateResponse,
VerificationResendRequest,
VerificationVerifyRequest,
)
class AuthServiceGateway(Protocol):
async def create_verification(
self, request: VerificationCreateRequest
) -> VerificationCreateResponse:
raise NotImplementedError
async def verify_verification(
self, request: VerificationVerifyRequest
) -> SessionResponse:
raise NotImplementedError
async def resend_verification(self, request: VerificationResendRequest) -> None:
raise NotImplementedError
async def create_session(self, request: SessionCreateRequest) -> SessionResponse:
raise NotImplementedError
async def refresh_session(self, request: SessionRefreshRequest) -> SessionResponse:
raise NotImplementedError
async def delete_session(self, refresh_token: str | None) -> None:
raise NotImplementedError
async def get_user_by_email(self, email: str) -> UserByEmailResponse:
raise NotImplementedError
class AuthService:
_gateway: AuthServiceGateway
def __init__(self, gateway: AuthServiceGateway) -> None:
self._gateway = gateway
async def create_verification(
self, request: VerificationCreateRequest
) -> VerificationCreateResponse:
return await self._gateway.create_verification(request)
async def verify_verification(
self, request: VerificationVerifyRequest
) -> SessionResponse:
return await self._gateway.verify_verification(request)
async def resend_verification(self, request: VerificationResendRequest) -> None:
await self._gateway.resend_verification(request)
async def create_session(self, request: SessionCreateRequest) -> SessionResponse:
return await self._gateway.create_session(request)
async def refresh_session(self, request: SessionRefreshRequest) -> SessionResponse:
return await self._gateway.refresh_session(request)
async def delete_session(self, refresh_token: str | None) -> None:
await self._gateway.delete_session(refresh_token)
async def get_user_by_email(self, email: str) -> UserByEmailResponse:
return await self._gateway.get_user_by_email(email)