Files
qzl ad06fe7de4 refactor: align backend layout and supabase infra
Consolidate backend modules/tests under the backend package while syncing Supabase compose/env config and related plans.
2026-02-05 15:13:06 +08:00

116 lines
3.7 KiB
Python

from __future__ import annotations
import json
import socket
import threading
import time
from uuid import UUID
from playwright.sync_api import sync_playwright
import uvicorn
from app import app
from core.auth.models import CurrentUser
from v1.profile.dependencies import get_current_user, get_profile_service
from v1.profile.schemas import ProfileResponse, ProfileUpdateRequest
class FakeProfileService:
"""Fake service for E2E testing."""
def __init__(self, profile: ProfileResponse) -> None:
self._profile = profile
async def get_me(self) -> ProfileResponse:
return self._profile
async def update_me(self, update: ProfileUpdateRequest) -> ProfileResponse:
return ProfileResponse(
id=self._profile.id,
username=self._profile.username,
display_name=(
update.display_name
if update.display_name is not None
else self._profile.display_name
),
avatar_url=(
update.avatar_url
if update.avatar_url is not None
else self._profile.avatar_url
),
bio=update.bio if update.bio is not None else self._profile.bio,
)
async def get_by_username(self, username: str) -> ProfileResponse:
return self._profile
def _find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
def _wait_for_port(host: str, port: int, timeout: float = 5.0) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
if sock.connect_ex((host, port)) == 0:
return
time.sleep(0.05)
raise RuntimeError("Server did not start in time")
def _start_server(host: str, port: int):
config = uvicorn.Config(app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_for_port(host, port)
return server, thread
def test_profile_flow_e2e() -> None:
user_id = UUID("00000000-0000-0000-0000-000000000001")
profile = ProfileResponse(
id=str(user_id),
username="demo",
display_name="Demo User",
avatar_url=None,
bio=None,
)
app.dependency_overrides[get_profile_service] = lambda: FakeProfileService(profile) # type: ignore[return-value]
app.dependency_overrides[get_current_user] = lambda: CurrentUser(id=user_id)
host = "127.0.0.1"
port = _find_free_port()
server, thread = _start_server(host, port)
try:
with sync_playwright() as playwright:
request_context = playwright.request.new_context(
base_url=f"http://{host}:{port}"
)
try:
me = request_context.get("/api/v1/profile/me")
assert me.status == 200
assert me.json()["username"] == "demo"
updated = request_context.patch(
"/api/v1/profile/me",
data=json.dumps({"display_name": "Updated"}),
headers={"Content-Type": "application/json"},
)
assert updated.status == 200
assert updated.json()["display_name"] == "Updated"
public = request_context.get("/api/v1/profile/demo")
assert public.status == 200
assert public.json()["username"] == "demo"
finally:
request_context.dispose()
finally:
app.dependency_overrides = {}
server.should_exit = True
thread.join(timeout=5)