refactor: 重构 schemas 结构,统一枚举定义
This commit is contained in:
@@ -1,87 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import Column, String, Table
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.db.base import Base, SoftDeleteMixin
|
||||
from core.db.base_repository import BaseRepository
|
||||
|
||||
|
||||
class Widget(SoftDeleteMixin, Base):
|
||||
__tablename__ = "widgets"
|
||||
|
||||
id: Mapped[UUID] = mapped_column(primary_key=True)
|
||||
name: Mapped[str] = mapped_column(String(50), nullable=False)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def db_engine():
|
||||
auth_users = Table(
|
||||
"users",
|
||||
Base.metadata,
|
||||
Column("id", String, primary_key=True),
|
||||
schema="auth",
|
||||
extend_existing=True,
|
||||
)
|
||||
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
|
||||
async with engine.begin() as conn:
|
||||
await conn.exec_driver_sql("ATTACH DATABASE ':memory:' AS auth")
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
yield engine
|
||||
Base.metadata.remove(auth_users)
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def db_session(db_engine):
|
||||
async_session = async_sessionmaker(
|
||||
bind=db_engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
)
|
||||
async with async_session() as session:
|
||||
yield session
|
||||
await session.rollback()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_by_id_filters_soft_deleted(db_session: AsyncSession) -> None:
|
||||
repository = BaseRepository(db_session, Widget)
|
||||
widget_id = uuid4()
|
||||
|
||||
widget = Widget(id=widget_id, name="widget")
|
||||
db_session.add(widget)
|
||||
await db_session.commit()
|
||||
|
||||
found = await repository.get_by_id(widget_id)
|
||||
assert found is not None
|
||||
|
||||
deleted = await repository.soft_delete_by_id(widget_id)
|
||||
assert deleted is not None
|
||||
assert deleted.deleted_at is not None
|
||||
|
||||
missing = await repository.get_by_id(widget_id)
|
||||
assert missing is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_soft_delete_sets_timestamp(db_session: AsyncSession) -> None:
|
||||
repository = BaseRepository(db_session, Widget)
|
||||
widget_id = uuid4()
|
||||
|
||||
widget = Widget(id=widget_id, name="widget")
|
||||
db_session.add(widget)
|
||||
await db_session.commit()
|
||||
|
||||
deleted = await repository.soft_delete_by_id(widget_id)
|
||||
assert deleted is not None
|
||||
assert isinstance(deleted.deleted_at, datetime)
|
||||
deleted_at = deleted.deleted_at
|
||||
if deleted_at.tzinfo is None:
|
||||
deleted_at = deleted_at.replace(tzinfo=timezone.utc)
|
||||
assert deleted_at <= datetime.now(timezone.utc)
|
||||
@@ -1,134 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import Column, String, Table, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from core.db.base import Base
|
||||
from models.profile import Profile
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def db_engine():
|
||||
"""Create in-memory SQLite engine for testing."""
|
||||
users_table = Table(
|
||||
"users",
|
||||
Base.metadata,
|
||||
Column("id", String, primary_key=True),
|
||||
schema="auth",
|
||||
extend_existing=True,
|
||||
)
|
||||
engine = create_async_engine(
|
||||
"sqlite+aiosqlite:///:memory:",
|
||||
echo=False,
|
||||
)
|
||||
async with engine.begin() as conn:
|
||||
await conn.exec_driver_sql("ATTACH DATABASE ':memory:' AS auth")
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
yield engine
|
||||
Base.metadata.remove(users_table)
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def db_session(db_engine):
|
||||
"""Create a database session for testing."""
|
||||
async_session = async_sessionmaker(
|
||||
bind=db_engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
)
|
||||
async with async_session() as session:
|
||||
yield session
|
||||
await session.rollback()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_profile_model_create(db_session: AsyncSession) -> None:
|
||||
"""Test creating a Profile model."""
|
||||
profile_id = uuid4()
|
||||
profile = Profile(
|
||||
id=profile_id,
|
||||
username="testuser",
|
||||
)
|
||||
db_session.add(profile)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(profile)
|
||||
|
||||
assert profile.id == profile_id
|
||||
assert profile.username == "testuser"
|
||||
assert profile.created_at is not None
|
||||
assert profile.updated_at is not None
|
||||
assert profile.deleted_at is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_profile_model_get_by_id(db_session: AsyncSession) -> None:
|
||||
"""Test retrieving a Profile by ID."""
|
||||
profile_id = uuid4()
|
||||
profile = Profile(
|
||||
id=profile_id,
|
||||
username="testuser",
|
||||
)
|
||||
db_session.add(profile)
|
||||
await db_session.commit()
|
||||
|
||||
result = await db_session.get(Profile, profile_id)
|
||||
assert result is not None
|
||||
assert result.username == "testuser"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_profile_model_get_by_username(db_session: AsyncSession) -> None:
|
||||
"""Test retrieving a Profile by username."""
|
||||
profile = Profile(
|
||||
id=uuid4(),
|
||||
username="testuser",
|
||||
)
|
||||
db_session.add(profile)
|
||||
await db_session.commit()
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Profile).where(Profile.username == "testuser")
|
||||
)
|
||||
found = result.scalar_one()
|
||||
assert found is not None
|
||||
assert found.username == "testuser"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_profile_model_update(db_session: AsyncSession) -> None:
|
||||
"""Test updating a Profile."""
|
||||
profile = Profile(
|
||||
id=uuid4(),
|
||||
username="testuser",
|
||||
bio="Old bio",
|
||||
)
|
||||
db_session.add(profile)
|
||||
await db_session.commit()
|
||||
|
||||
profile.bio = "New bio"
|
||||
await db_session.commit()
|
||||
await db_session.refresh(profile)
|
||||
|
||||
assert profile.bio == "New bio"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_profile_model_allows_duplicate_usernames(
|
||||
db_session: AsyncSession,
|
||||
) -> None:
|
||||
first = Profile(id=uuid4(), username="same_name")
|
||||
second = Profile(id=uuid4(), username="same_name")
|
||||
|
||||
db_session.add(first)
|
||||
db_session.add(second)
|
||||
await db_session.commit()
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Profile).where(Profile.username == "same_name")
|
||||
)
|
||||
found = result.scalars().all()
|
||||
assert len(found) == 2
|
||||
Reference in New Issue
Block a user