from __future__ import annotations import json import time import uuid from typing import TypedDict import httpx import pytest from sqlalchemy import select from core.db.session import AsyncSessionLocal from models.agent_chat_session import AgentChatSession from models.agent_chat_message import AgentChatMessage from models.anonymous_session_snapshot import AnonymousSessionSnapshot class IdentityData(TypedDict): email: str code: str async def _create_email_session( client: httpx.AsyncClient, *, email: str, code: str, ) -> dict[str, object]: resp = await client.post( "/api/v1/auth/email-session", json={"email": email, "token": code}, ) resp.raise_for_status() return resp.json() async def _wait_terminal_event( client: httpx.AsyncClient, *, access_token: str, thread_id: str, run_id: str, timeout_s: int = 180, ) -> str: headers = {"Authorization": f"Bearer {access_token}"} params = {"runId": run_id, "idle_limit": 120} started = time.time() async with client.stream( "GET", f"/api/v1/agent/runs/{thread_id}/events", headers=headers, params=params, ) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if time.time() - started > timeout_s: raise TimeoutError("SSE timed out") if not line or not line.startswith("data: "): continue event = json.loads(line[6:]) event_type = event.get("type") if event_type in {"RUN_FINISHED", "RUN_ERROR"}: return str(event_type) raise RuntimeError("No terminal SSE event") def _build_run_payload(*, thread_id: str, run_id: str) -> dict[str, object]: now = int(time.time() * 1000) return { "threadId": thread_id, "runId": run_id, "state": {}, "messages": [ { "id": f"msg_{run_id}_user_0", "role": "user", "content": "今天事业运如何?", } ], "tools": [], "context": [], "forwardedProps": { "runtime_mode": "chat", "client_time": { "device_timezone": "Asia/Shanghai", "client_now_iso": "2026-04-15T12:00:00Z", "client_epoch_ms": now, }, "divinationPayload": { "divinationMethod": "自动起卦", "questionType": "事业", "question": "今天事业运如何?", "divinationTimeIso": "2026-04-15T12:00:00Z", "yaoLines": ["少阳", "少阴", "老阳", "少阳", "老阴", "少阴"], }, }, } @pytest.mark.asyncio async def test_session_delete_anonymizes_and_hard_deletes( api_client: httpx.AsyncClient, test_identity: IdentityData, db_cleanup: list[str], ) -> None: email = str(test_identity["email"]).strip().lower() db_cleanup.append(email) auth_resp = await _create_email_session( api_client, email=email, code=str(test_identity["code"]), ) user = auth_resp.get("user") assert isinstance(user, dict) access_token = str(auth_resp["access_token"]) headers = {"Authorization": f"Bearer {access_token}"} thread_id = str(uuid.uuid4()) run_id = f"run_{int(time.time() * 1000)}" enqueue = await api_client.post( "/api/v1/agent/runs", headers=headers, json=_build_run_payload(thread_id=thread_id, run_id=run_id), ) assert enqueue.status_code == 202 terminal = await _wait_terminal_event( api_client, access_token=access_token, thread_id=thread_id, run_id=run_id, ) assert terminal in {"RUN_FINISHED", "RUN_ERROR"} async with AsyncSessionLocal() as session: session_result = await session.execute( select(AgentChatSession).where(AgentChatSession.id == uuid.UUID(thread_id)) ) session_obj = session_result.scalar_one_or_none() assert session_obj is not None, "Session should exist before deletion" delete_resp = await api_client.delete( f"/api/v1/agent/sessions/{thread_id}", headers=headers, ) assert delete_resp.status_code == 204 async with AsyncSessionLocal() as session: session_result = await session.execute( select(AgentChatSession).where(AgentChatSession.id == uuid.UUID(thread_id)) ) deleted_session = session_result.scalar_one_or_none() assert deleted_session is None, ( "Session should be hard-deleted, not soft-deleted" ) msg_result = await session.execute( select(AgentChatMessage).where( AgentChatMessage.session_id == uuid.UUID(thread_id) ) ) remaining_messages = msg_result.scalars().all() assert len(remaining_messages) == 0, ( "Messages should be hard-deleted along with session" ) snapshot_result = await session.execute( select(AnonymousSessionSnapshot).order_by( AnonymousSessionSnapshot.anonymized_at.desc() ) ) snapshots = snapshot_result.scalars().all() assert len(snapshots) >= 1, "At least one anonymous snapshot should exist" snapshot = snapshots[0] assert snapshot.session_type == "chat" assert snapshot.anonymous_id is not None assert snapshot.id is not None assert snapshot.anonymized_at is not None