Files
social-app/backend/tests/e2e/test_infra_health_e2e.py
T
qzl 105cf82d21 fix: 恢复Celery配置 + 修复测试文件
- 恢复 CelerySettings 和相关计算属性
- 修复 celery/app.py 调用 configure_celery_app 参数
- 创建 core/initialization/init_data.py stub
- 删除不完整的 test_auth_supabase_gateway.py
2026-02-24 16:38:30 +08:00

78 lines
2.2 KiB
Python

from __future__ import annotations
import socket
import threading
import time
from playwright.sync_api import sync_playwright
import uvicorn
from app import app
from v1.infra.dependencies import get_redis_service
class _FakeService:
def __init__(self) -> None:
self._initialized = True
@property
def is_initialized(self) -> bool:
return self._initialized
async def initialize(self) -> bool:
return True
async def health_check(self) -> dict[str, object]:
return {"status": "healthy", "details": {}}
def _find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
def _wait_for_port(host: str, port: int, timeout: float = 5.0) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
if sock.connect_ex((host, port)) == 0:
return
time.sleep(0.05)
raise RuntimeError("Server did not start in time")
def _start_server(host: str, port: int):
config = uvicorn.Config(app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
_wait_for_port(host, port)
return server, thread
def test_infra_health_e2e() -> None:
app.dependency_overrides[get_redis_service] = lambda: _FakeService()
host = "127.0.0.1"
port = _find_free_port()
server, thread = _start_server(host, port)
try:
with sync_playwright() as playwright:
request_context = playwright.request.new_context(
base_url=f"http://{host}:{port}"
)
try:
response = request_context.get("/api/v1/infra/health")
assert response.status == 200
body = response.json()
assert body["status"] == "healthy"
assert "redis" in body["services"]
finally:
request_context.dispose()
finally:
server.should_exit = True
thread.join(timeout=5)
app.dependency_overrides = {}