From c2b726e7bdcb137ffc9c75c605aa5f0191da374e Mon Sep 17 00:00:00 2001 From: qzl Date: Wed, 15 Apr 2026 18:18:39 +0800 Subject: [PATCH] feat(agent): session deletion anonymization for iOS compliance Replace soft-delete with anonymize + hard-delete to meet iOS App Store data retention requirements. Non-PII fields are preserved in anonymous_session_snapshots for analytics. - Add anonymous_session_snapshots table and ORM model - Implement anonymizer to extract non-PII fields before deletion - Remove points_ledger.biz_id FK constraint (snapshot-style reference) - Preserve transaction history while allowing session deletion - Add 14 unit tests + 1 integration test --- ...260415_0001_anonymous_session_snapshots.py | 111 +++++++++ ...415_0002_drop_points_ledger_biz_id_fkey.py | 40 ++++ backend/src/models/__init__.py | 2 + .../src/models/anonymous_session_snapshot.py | 46 ++++ backend/src/v1/agent/anonymizer.py | 162 +++++++++++++ backend/src/v1/agent/repository.py | 50 +++- backend/src/v1/agent/schemas.py | 4 +- backend/src/v1/agent/service.py | 22 +- .../test_session_delete_anonymization.py | 183 +++++++++++++++ backend/tests/unit/test_anonymizer.py | 216 ++++++++++++++++++ 10 files changed, 829 insertions(+), 7 deletions(-) create mode 100644 backend/alembic/versions/20260415_0001_anonymous_session_snapshots.py create mode 100644 backend/alembic/versions/20260415_0002_drop_points_ledger_biz_id_fkey.py create mode 100644 backend/src/models/anonymous_session_snapshot.py create mode 100644 backend/src/v1/agent/anonymizer.py create mode 100644 backend/tests/integration/test_session_delete_anonymization.py create mode 100644 backend/tests/unit/test_anonymizer.py diff --git a/backend/alembic/versions/20260415_0001_anonymous_session_snapshots.py b/backend/alembic/versions/20260415_0001_anonymous_session_snapshots.py new file mode 100644 index 0000000..54b5bef --- /dev/null +++ b/backend/alembic/versions/20260415_0001_anonymous_session_snapshots.py @@ -0,0 +1,111 @@ +"""add anonymous_session_snapshots table for iOS compliance + +Revision ID: 20260415_0001 +Revises: 20260413_0004 +Create Date: 2026-04-15 00:10:00 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision: str = "20260415_0001" +down_revision: Union[str, Sequence[str], None] = "20260413_0004" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "anonymous_session_snapshots", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("anonymous_id", sa.UUID(), nullable=False), + sa.Column("session_type", sa.String(length=20), nullable=False), + sa.Column("message_count", sa.Integer(), nullable=True), + sa.Column("status", sa.String(length=20), nullable=True), + sa.Column("question_type", sa.String(length=50), nullable=True), + sa.Column("tool_name", sa.String(length=100), nullable=True), + sa.Column("gua_name", sa.String(length=50), nullable=True), + sa.Column("gua_name_hant", sa.String(length=50), nullable=True), + sa.Column("target_gua_name", sa.String(length=50), nullable=True), + sa.Column("has_changing_yao", sa.Boolean(), nullable=True), + sa.Column("sign_level", sa.String(length=20), nullable=True), + sa.Column("keywords", postgresql.ARRAY(sa.Text()), nullable=True), + sa.Column("model_code", sa.String(length=50), nullable=True), + sa.Column("total_tokens", sa.Integer(), nullable=True), + sa.Column("total_cost", sa.Numeric(12, 6), nullable=True), + sa.Column("total_latency_ms", sa.Integer(), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + ), + sa.Column("last_activity_at", sa.DateTime(timezone=True), nullable=True), + sa.Column( + "anonymized_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + "ix_anonymous_session_snapshots_anonymous_id", + "anonymous_session_snapshots", + ["anonymous_id"], + unique=False, + ) + op.create_index( + "ix_anonymous_session_snapshots_created_at", + "anonymous_session_snapshots", + ["created_at"], + unique=False, + ) + op.create_index( + "ix_anonymous_session_snapshots_question_type", + "anonymous_session_snapshots", + ["question_type"], + unique=False, + ) + _enable_service_role_only_rls("anonymous_session_snapshots") + + +def downgrade() -> None: + _drop_rls("anonymous_session_snapshots") + op.drop_index( + "ix_anonymous_session_snapshots_question_type", + table_name="anonymous_session_snapshots", + ) + op.drop_index( + "ix_anonymous_session_snapshots_created_at", + table_name="anonymous_session_snapshots", + ) + op.drop_index( + "ix_anonymous_session_snapshots_anonymous_id", + table_name="anonymous_session_snapshots", + ) + op.drop_table("anonymous_session_snapshots") + + +def _enable_service_role_only_rls(table_name: str) -> None: + for role in ["anon", "authenticated"]: + for action in ["select", "insert", "update", "delete"]: + op.execute( + f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}" + ) + op.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY") + op.execute( + f"CREATE POLICY service_role_all_{table_name} ON {table_name} FOR ALL TO service_role USING (true) WITH CHECK (true)" + ) + + +def _drop_rls(table_name: str) -> None: + for role in ["anon", "authenticated"]: + for action in ["select", "insert", "update", "delete"]: + op.execute( + f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}" + ) + op.execute(f"DROP POLICY IF EXISTS service_role_all_{table_name} ON {table_name}") + op.execute(f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY") diff --git a/backend/alembic/versions/20260415_0002_drop_points_ledger_biz_id_fkey.py b/backend/alembic/versions/20260415_0002_drop_points_ledger_biz_id_fkey.py new file mode 100644 index 0000000..7eaf3eb --- /dev/null +++ b/backend/alembic/versions/20260415_0002_drop_points_ledger_biz_id_fkey.py @@ -0,0 +1,40 @@ +"""drop points_ledger.biz_id foreign key for snapshot-style reference + +Revision ID: 20260415_0002 +Revises: 20260415_0001 +Create Date: 2026-04-15 10:00:00 + +points_ledger.biz_id stores a snapshot reference to sessions.id for audit purposes. +This allows sessions to be deleted while preserving the biz_id value in points_ledger +for user-facing transaction history. + +The FK constraint is removed because: +1. Users need to see their points transaction history even after session deletion +2. Session deletion (anonymization for iOS compliance) should not cascade delete + points_ledger records +3. biz_id becomes a "snapshot" reference - the value is kept but no FK enforcement +""" + +from typing import Sequence, Union + +from alembic import op + +revision: str = "20260415_0002" +down_revision: Union[str, Sequence[str], None] = "20260415_0001" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_constraint("points_ledger_biz_id_fkey", "points_ledger", type_="foreignkey") + + +def downgrade() -> None: + op.create_foreign_key( + "points_ledger_biz_id_fkey", + "points_ledger", + "sessions", + ["biz_id"], + ["id"], + ondelete="SET NULL", + ) diff --git a/backend/src/models/__init__.py b/backend/src/models/__init__.py index ba1ba28..d2982c9 100644 --- a/backend/src/models/__init__.py +++ b/backend/src/models/__init__.py @@ -2,6 +2,7 @@ from __future__ import annotations from .agent_chat_message import AgentChatMessage from .agent_chat_session import AgentChatSession +from .anonymous_session_snapshot import AnonymousSessionSnapshot from .auth_user import AuthUser from .invite_code import InviteCode from .llm import Llm @@ -18,6 +19,7 @@ from .user_points import UserPoints __all__ = [ "AgentChatMessage", "AgentChatSession", + "AnonymousSessionSnapshot", "AuthUser", "InviteCode", "Llm", diff --git a/backend/src/models/anonymous_session_snapshot.py b/backend/src/models/anonymous_session_snapshot.py new file mode 100644 index 0000000..9e7fc28 --- /dev/null +++ b/backend/src/models/anonymous_session_snapshot.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from datetime import datetime +from decimal import Decimal +import uuid + +from sqlalchemy import Boolean, DateTime, Integer, Numeric, String, Text +from sqlalchemy.dialects.postgresql import ARRAY, UUID +from sqlalchemy.orm import Mapped, mapped_column + +from core.db.base import Base + +__all__ = ["AnonymousSessionSnapshot"] + + +class AnonymousSessionSnapshot(Base): + __tablename__: str = "anonymous_session_snapshots" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + anonymous_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + session_type: Mapped[str] = mapped_column(String(20), nullable=False) + message_count: Mapped[int | None] = mapped_column(Integer, nullable=True) + status: Mapped[str | None] = mapped_column(String(20), nullable=True) + question_type: Mapped[str | None] = mapped_column(String(50), nullable=True) + tool_name: Mapped[str | None] = mapped_column(String(100), nullable=True) + gua_name: Mapped[str | None] = mapped_column(String(50), nullable=True) + gua_name_hant: Mapped[str | None] = mapped_column(String(50), nullable=True) + target_gua_name: Mapped[str | None] = mapped_column(String(50), nullable=True) + has_changing_yao: Mapped[bool | None] = mapped_column(Boolean, nullable=True) + sign_level: Mapped[str | None] = mapped_column(String(20), nullable=True) + keywords: Mapped[list[str] | None] = mapped_column(ARRAY(Text()), nullable=True) + model_code: Mapped[str | None] = mapped_column(String(50), nullable=True) + total_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) + total_cost: Mapped[Decimal | None] = mapped_column(Numeric(12, 6), nullable=True) + total_latency_ms: Mapped[int | None] = mapped_column(Integer, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False + ) + last_activity_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + anonymized_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False + ) diff --git a/backend/src/v1/agent/anonymizer.py b/backend/src/v1/agent/anonymizer.py new file mode 100644 index 0000000..4b4a185 --- /dev/null +++ b/backend/src/v1/agent/anonymizer.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any +from uuid import uuid4 + +from models.agent_chat_message import AgentChatMessage +from models.agent_chat_session import AgentChatSession +from models.anonymous_session_snapshot import AnonymousSessionSnapshot +from core.logging import get_logger + +logger = get_logger(__name__) + + +def _truncate_to_day(dt: datetime) -> datetime: + return dt.replace(hour=0, minute=0, second=0, microsecond=0) + + +def _extract_derived_fields( + messages: list[AgentChatMessage], +) -> dict[str, Any]: + for message in messages: + metadata_raw = message.metadata_json + if not isinstance(metadata_raw, dict): + continue + agent_output = metadata_raw.get("agent_output") + if not isinstance(agent_output, dict): + continue + derived = agent_output.get("divination_derived") + if isinstance(derived, dict) and derived: + return derived + return {} + + +def _extract_sign_level( + messages: list[AgentChatMessage], +) -> str | None: + for message in messages: + metadata_raw = message.metadata_json + if not isinstance(metadata_raw, dict): + continue + agent_output = metadata_raw.get("agent_output") + if not isinstance(agent_output, dict): + continue + sign_level = agent_output.get("sign_level") + if isinstance(sign_level, str) and sign_level: + return sign_level + return None + + +def _extract_keywords( + messages: list[AgentChatMessage], +) -> list[str] | None: + for message in messages: + metadata_raw = message.metadata_json + if not isinstance(metadata_raw, dict): + continue + agent_output = metadata_raw.get("agent_output") + if not isinstance(agent_output, dict): + continue + keywords = agent_output.get("keywords") + if isinstance(keywords, list) and keywords: + return keywords + return None + + +def _extract_question_type( + messages: list[AgentChatMessage], +) -> str | None: + derived = _extract_derived_fields(messages) + if not derived: + for message in messages: + metadata_raw = message.metadata_json + if not isinstance(metadata_raw, dict): + continue + agent_output = metadata_raw.get("agent_output") + if not isinstance(agent_output, dict): + continue + question_type = agent_output.get("questionType") + if isinstance(question_type, str) and question_type: + return question_type + return None + question_type = derived.get("questionType") + if isinstance(question_type, str) and question_type: + return question_type + return None + + +def _extract_model_code( + messages: list[AgentChatMessage], +) -> str | None: + for message in messages: + if message.model_code: + return message.model_code + return None + + +def _extract_tool_name( + messages: list[AgentChatMessage], +) -> str | None: + for message in messages: + if message.tool_name: + return message.tool_name + return None + + +def _aggregate_latency( + messages: list[AgentChatMessage], +) -> int | None: + total = 0 + found = False + for message in messages: + if message.latency_ms is not None: + total += message.latency_ms + found = True + return total if found else None + + +def anonymize( + session: AgentChatSession, + messages: list[AgentChatMessage], +) -> AnonymousSessionSnapshot: + derived = _extract_derived_fields(messages) + + gua_name = derived.get("guaName") if derived else None + gua_name_hant = derived.get("guaNameHant") if derived else None + target_gua_name = derived.get("targetGuaName") if derived else None + has_changing_yao = derived.get("hasChangingYao") if derived else None + + created_at = _truncate_to_day(session.created_at) + last_activity_at = ( + _truncate_to_day(session.last_activity_at) if session.last_activity_at else None + ) + + return AnonymousSessionSnapshot( + id=uuid4(), + anonymous_id=uuid4(), + session_type=session.session_type.value + if hasattr(session.session_type, "value") + else str(session.session_type), + message_count=session.message_count, + status=session.status.value + if hasattr(session.status, "value") + else str(session.status), + question_type=_extract_question_type(messages), + tool_name=_extract_tool_name(messages), + gua_name=gua_name if isinstance(gua_name, str) else None, + gua_name_hant=gua_name_hant if isinstance(gua_name_hant, str) else None, + target_gua_name=target_gua_name if isinstance(target_gua_name, str) else None, + has_changing_yao=has_changing_yao + if isinstance(has_changing_yao, bool) + else None, + sign_level=_extract_sign_level(messages), + keywords=_extract_keywords(messages), + model_code=_extract_model_code(messages), + total_tokens=session.total_tokens, + total_cost=session.total_cost, + total_latency_ms=_aggregate_latency(messages), + created_at=created_at, + last_activity_at=last_activity_at, + anonymized_at=datetime.now(timezone.utc), + ) diff --git a/backend/src/v1/agent/repository.py b/backend/src/v1/agent/repository.py index 8f5305b..01b18a5 100644 --- a/backend/src/v1/agent/repository.py +++ b/backend/src/v1/agent/repository.py @@ -5,7 +5,7 @@ from decimal import Decimal from typing import Any, Protocol from uuid import UUID, uuid4 -from sqlalchemy import Select, func, select +from sqlalchemy import Select, delete, func, select from sqlalchemy.ext.asyncio import AsyncSession from core.http.errors import ApiProblemError @@ -17,6 +17,7 @@ from schemas.domain.chat_message import ( AgentChatMessage as AgentChatMessageSchema, AgentChatMessageMetadata, ) +from v1.agent.anonymizer import anonymize class ToolResultPayloadStorage(Protocol): @@ -96,7 +97,7 @@ class AgentRepository: async def rollback(self) -> None: await self._session.rollback() - async def delete_session(self, *, session_id: str) -> None: + async def delete_session(self, *, session_id: str) -> list[dict[str, str]]: try: session_uuid = UUID(session_id) except ValueError as exc: @@ -112,11 +113,50 @@ class AgentRepository: ) session = (await self._session.execute(stmt)).scalar_one_or_none() if session is None: - return + return [] if session.deleted_at is not None: - return - session.deleted_at = datetime.now(timezone.utc) + return [] + messages_stmt = ( + select(AgentChatMessage) + .where(AgentChatMessage.session_id == session_uuid) + .order_by(AgentChatMessage.seq) + ) + messages = list((await self._session.execute(messages_stmt)).scalars().all()) + attachment_paths = self._collect_attachment_paths(messages) + snapshot = anonymize(session=session, messages=messages) + self._session.add(snapshot) await self._session.flush() + stmt_delete_messages = delete(AgentChatMessage).where( + AgentChatMessage.session_id == session_uuid + ) + await self._session.execute(stmt_delete_messages) + stmt_delete_session = delete(AgentChatSession).where( + AgentChatSession.id == session_uuid + ) + await self._session.execute(stmt_delete_session) + await self._session.flush() + return attachment_paths + + @staticmethod + def _collect_attachment_paths( + messages: list[AgentChatMessage], + ) -> list[dict[str, str]]: + paths: list[dict[str, str]] = [] + for message in messages: + metadata_raw = message.metadata_json + if not isinstance(metadata_raw, dict): + continue + attachments_raw = metadata_raw.get("user_message_attachments") + if not isinstance(attachments_raw, list): + continue + for attachment in attachments_raw: + if not isinstance(attachment, dict): + continue + bucket = attachment.get("bucket") + path = attachment.get("path") + if isinstance(bucket, str) and isinstance(path, str): + paths.append({"bucket": bucket, "path": path}) + return paths async def persist_user_message( self, diff --git a/backend/src/v1/agent/schemas.py b/backend/src/v1/agent/schemas.py index 4a3b99c..50c67b2 100644 --- a/backend/src/v1/agent/schemas.py +++ b/backend/src/v1/agent/schemas.py @@ -23,7 +23,7 @@ class AgentRepositoryLike(Protocol): async def rollback(self) -> None: ... - async def delete_session(self, *, session_id: str) -> None: ... + async def delete_session(self, *, session_id: str) -> list[dict[str, str]]: ... async def get_history_day( self, @@ -126,6 +126,8 @@ class AttachmentStorageLike(Protocol): expires_in_seconds: int, ) -> str: ... + async def delete_prefix(self, *, bucket: str, prefix: str) -> int: ... + def parse_signed_url(self, url: str) -> tuple[str, str]: ... diff --git a/backend/src/v1/agent/service.py b/backend/src/v1/agent/service.py index 353056a..e60779a 100644 --- a/backend/src/v1/agent/service.py +++ b/backend/src/v1/agent/service.py @@ -235,8 +235,28 @@ class AgentService: return raise ensure_session_owner(owner_id=owner, current_user=current_user) - await self._repository.delete_session(session_id=thread_id) + attachment_paths = await self._repository.delete_session(session_id=thread_id) await self._repository.commit() + await self._cleanup_attachments(attachment_paths) + + async def _cleanup_attachments( + self, attachment_paths: list[dict[str, str]] + ) -> None: + if not attachment_paths or self._attachment_storage is None: + return + for attachment in attachment_paths: + bucket = attachment.get("bucket") + path = attachment.get("path") + if not bucket or not path: + continue + try: + await self._attachment_storage.delete_prefix(bucket=bucket, prefix=path) + except Exception: + logger.warning( + "attachment_cleanup_failed", + bucket=bucket, + path=path, + ) async def _append_context_cache_user_message( self, diff --git a/backend/tests/integration/test_session_delete_anonymization.py b/backend/tests/integration/test_session_delete_anonymization.py new file mode 100644 index 0000000..e916a04 --- /dev/null +++ b/backend/tests/integration/test_session_delete_anonymization.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +import json +import time +import uuid +from typing import TypedDict + +import httpx +import pytest +from sqlalchemy import select + +from core.db.session import AsyncSessionLocal +from models.agent_chat_session import AgentChatSession +from models.agent_chat_message import AgentChatMessage +from models.anonymous_session_snapshot import AnonymousSessionSnapshot + + +class IdentityData(TypedDict): + email: str + code: str + + +async def _create_email_session( + client: httpx.AsyncClient, + *, + email: str, + code: str, +) -> dict[str, object]: + resp = await client.post( + "/api/v1/auth/email-session", + json={"email": email, "token": code}, + ) + resp.raise_for_status() + return resp.json() + + +async def _wait_terminal_event( + client: httpx.AsyncClient, + *, + access_token: str, + thread_id: str, + run_id: str, + timeout_s: int = 180, +) -> str: + headers = {"Authorization": f"Bearer {access_token}"} + params = {"runId": run_id, "idle_limit": 120} + started = time.time() + + async with client.stream( + "GET", + f"/api/v1/agent/runs/{thread_id}/events", + headers=headers, + params=params, + ) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + if time.time() - started > timeout_s: + raise TimeoutError("SSE timed out") + if not line or not line.startswith("data: "): + continue + event = json.loads(line[6:]) + event_type = event.get("type") + if event_type in {"RUN_FINISHED", "RUN_ERROR"}: + return str(event_type) + + raise RuntimeError("No terminal SSE event") + + +def _build_run_payload(*, thread_id: str, run_id: str) -> dict[str, object]: + now = int(time.time() * 1000) + return { + "threadId": thread_id, + "runId": run_id, + "state": {}, + "messages": [ + { + "id": f"msg_{run_id}_user_0", + "role": "user", + "content": "今天事业运如何?", + } + ], + "tools": [], + "context": [], + "forwardedProps": { + "runtime_mode": "chat", + "client_time": { + "device_timezone": "Asia/Shanghai", + "client_now_iso": "2026-04-15T12:00:00Z", + "client_epoch_ms": now, + }, + "divinationPayload": { + "divinationMethod": "自动起卦", + "questionType": "事业", + "question": "今天事业运如何?", + "divinationTimeIso": "2026-04-15T12:00:00Z", + "yaoLines": ["少阳", "少阴", "老阳", "少阳", "老阴", "少阴"], + }, + }, + } + + +@pytest.mark.asyncio +async def test_session_delete_anonymizes_and_hard_deletes( + api_client: httpx.AsyncClient, + test_identity: IdentityData, + db_cleanup: list[str], +) -> None: + email = str(test_identity["email"]).strip().lower() + db_cleanup.append(email) + + auth_resp = await _create_email_session( + api_client, + email=email, + code=str(test_identity["code"]), + ) + user = auth_resp.get("user") + assert isinstance(user, dict) + access_token = str(auth_resp["access_token"]) + headers = {"Authorization": f"Bearer {access_token}"} + + thread_id = str(uuid.uuid4()) + run_id = f"run_{int(time.time() * 1000)}" + + enqueue = await api_client.post( + "/api/v1/agent/runs", + headers=headers, + json=_build_run_payload(thread_id=thread_id, run_id=run_id), + ) + assert enqueue.status_code == 202 + + terminal = await _wait_terminal_event( + api_client, + access_token=access_token, + thread_id=thread_id, + run_id=run_id, + ) + assert terminal in {"RUN_FINISHED", "RUN_ERROR"} + + async with AsyncSessionLocal() as session: + session_result = await session.execute( + select(AgentChatSession).where(AgentChatSession.id == uuid.UUID(thread_id)) + ) + session_obj = session_result.scalar_one_or_none() + assert session_obj is not None, "Session should exist before deletion" + + delete_resp = await api_client.delete( + f"/api/v1/agent/sessions/{thread_id}", + headers=headers, + ) + assert delete_resp.status_code == 204 + + async with AsyncSessionLocal() as session: + session_result = await session.execute( + select(AgentChatSession).where(AgentChatSession.id == uuid.UUID(thread_id)) + ) + deleted_session = session_result.scalar_one_or_none() + assert deleted_session is None, ( + "Session should be hard-deleted, not soft-deleted" + ) + + msg_result = await session.execute( + select(AgentChatMessage).where( + AgentChatMessage.session_id == uuid.UUID(thread_id) + ) + ) + remaining_messages = msg_result.scalars().all() + assert len(remaining_messages) == 0, ( + "Messages should be hard-deleted along with session" + ) + + snapshot_result = await session.execute( + select(AnonymousSessionSnapshot).order_by( + AnonymousSessionSnapshot.anonymized_at.desc() + ) + ) + snapshots = snapshot_result.scalars().all() + assert len(snapshots) >= 1, "At least one anonymous snapshot should exist" + + snapshot = snapshots[0] + assert snapshot.session_type == "chat" + assert snapshot.anonymous_id is not None + assert snapshot.id is not None + assert snapshot.anonymized_at is not None diff --git a/backend/tests/unit/test_anonymizer.py b/backend/tests/unit/test_anonymizer.py new file mode 100644 index 0000000..41224ae --- /dev/null +++ b/backend/tests/unit/test_anonymizer.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from decimal import Decimal +from uuid import uuid4 + +from schemas.enums import AgentChatMessageRole, AgentChatSessionStatus, SessionType + +from models.agent_chat_message import AgentChatMessage +from models.agent_chat_session import AgentChatSession +from v1.agent.anonymizer import ( + _aggregate_latency, + _extract_derived_fields, + _extract_keywords, + _extract_model_code, + _extract_question_type, + _extract_sign_level, + _extract_tool_name, + _truncate_to_day, + anonymize, +) + + +def _make_session(**overrides: object) -> AgentChatSession: + defaults: dict[str, object] = { + "id": uuid4(), + "user_id": uuid4(), + "session_type": SessionType.CHAT, + "status": AgentChatSessionStatus.COMPLETED, + "message_count": 3, + "total_tokens": 1500, + "total_cost": Decimal("0.05"), + "created_at": datetime(2026, 4, 15, 14, 32, 0, tzinfo=timezone.utc), + "last_activity_at": datetime(2026, 4, 15, 14, 45, 0, tzinfo=timezone.utc), + "job_id": None, + "title": "Will I get the job?", + "state_snapshot": None, + "updated_at": datetime(2026, 4, 15, 14, 45, 0, tzinfo=timezone.utc), + "deleted_at": None, + } + defaults.update(overrides) + return AgentChatSession(**defaults) + + +def _make_message( + *, + session_id: object | None = None, + role: AgentChatMessageRole = AgentChatMessageRole.ASSISTANT, + metadata_json: dict[str, object] | None = None, + model_code: str | None = None, + tool_name: str | None = None, + latency_ms: int | None = None, +) -> AgentChatMessage: + return AgentChatMessage( + id=uuid4(), + session_id=session_id or uuid4(), + seq=1, + role=role, + content="some content", + model_code=model_code, + tool_name=tool_name, + input_tokens=100, + output_tokens=200, + cost=Decimal("0.02"), + latency_ms=latency_ms, + visibility_mask=0, + metadata_json=metadata_json, + created_at=datetime(2026, 4, 15, 14, 33, 0, tzinfo=timezone.utc), + updated_at=datetime(2026, 4, 15, 14, 33, 0, tzinfo=timezone.utc), + deleted_at=None, + ) + + +def test_truncate_to_day() -> None: + dt = datetime(2026, 4, 15, 14, 32, 45, 123456, tzinfo=timezone.utc) + result = _truncate_to_day(dt) + assert result == datetime(2026, 4, 15, 0, 0, 0, 0, tzinfo=timezone.utc) + + +def test_extract_derived_fields_found() -> None: + msg = _make_message( + metadata_json={ + "agent_output": { + "divination_derived": { + "guaName": "乾", + "questionType": "career", + "hasChangingYao": True, + } + } + } + ) + derived = _extract_derived_fields([msg]) + assert derived.get("guaName") == "乾" + assert derived.get("questionType") == "career" + + +def test_extract_derived_fields_missing() -> None: + msg = _make_message(metadata_json={"run_id": "abc"}) + derived = _extract_derived_fields([msg]) + assert derived == {} + + +def test_extract_sign_level() -> None: + msg = _make_message(metadata_json={"agent_output": {"sign_level": "中上签"}}) + assert _extract_sign_level([msg]) == "中上签" + + +def test_extract_sign_level_none() -> None: + msg = _make_message(metadata_json={"agent_output": {}}) + assert _extract_sign_level([msg]) is None + + +def test_extract_keywords() -> None: + msg = _make_message(metadata_json={"agent_output": {"keywords": ["事业", "贵人"]}}) + assert _extract_keywords([msg]) == ["事业", "贵人"] + + +def test_extract_question_type_from_derived() -> None: + msg = _make_message( + metadata_json={ + "agent_output": { + "divination_derived": {"questionType": "career"}, + } + } + ) + assert _extract_question_type([msg]) == "career" + + +def test_extract_question_type_from_agent_output() -> None: + msg = _make_message(metadata_json={"agent_output": {"questionType": "love"}}) + assert _extract_question_type([msg]) == "love" + + +def test_extract_model_code() -> None: + msg = _make_message(model_code="qwen3.5-flash") + assert _extract_model_code([msg]) == "qwen3.5-flash" + + +def test_extract_tool_name() -> None: + msg = _make_message(tool_name="liuyao") + assert _extract_tool_name([msg]) == "liuyao" + + +def test_aggregate_latency() -> None: + msg1 = _make_message(latency_ms=500) + msg2 = _make_message(latency_ms=300) + assert _aggregate_latency([msg1, msg2]) == 800 + + +def test_aggregate_latency_none() -> None: + msg = _make_message(latency_ms=None) + assert _aggregate_latency([msg]) is None + + +def test_anonymize_full_snapshot() -> None: + session = _make_session() + msg = _make_message( + session_id=session.id, + role=AgentChatMessageRole.ASSISTANT, + model_code="qwen3.5-flash", + tool_name="liuyao", + latency_ms=1200, + metadata_json={ + "agent_output": { + "sign_level": "上上签", + "keywords": ["事业", "贵人"], + "divination_derived": { + "questionType": "career", + "guaName": "乾", + "guaNameHant": "乾", + "targetGuaName": "姤", + "hasChangingYao": True, + }, + } + }, + ) + user_msg = _make_message( + session_id=session.id, + role=AgentChatMessageRole.USER, + latency_ms=None, + ) + snapshot = anonymize(session=session, messages=[msg, user_msg]) + + assert snapshot.session_type == "chat" + assert snapshot.message_count == 3 + assert snapshot.status == "completed" + assert snapshot.question_type == "career" + assert snapshot.tool_name == "liuyao" + assert snapshot.model_code == "qwen3.5-flash" + assert snapshot.gua_name == "乾" + assert snapshot.gua_name_hant == "乾" + assert snapshot.target_gua_name == "姤" + assert snapshot.has_changing_yao is True + assert snapshot.sign_level == "上上签" + assert snapshot.keywords == ["事业", "贵人"] + assert snapshot.total_tokens == 1500 + assert snapshot.total_cost == Decimal("0.05") + assert snapshot.total_latency_ms == 1200 + assert snapshot.created_at == datetime(2026, 4, 15, 0, 0, 0, tzinfo=timezone.utc) + assert snapshot.last_activity_at == datetime( + 2026, 4, 15, 0, 0, 0, tzinfo=timezone.utc + ) + assert snapshot.anonymous_id is not None + assert snapshot.id is not None + + +def test_anonymize_no_metadata() -> None: + session = _make_session() + msg = _make_message(session_id=session.id, metadata_json=None) + snapshot = anonymize(session=session, messages=[msg]) + + assert snapshot.question_type is None + assert snapshot.gua_name is None + assert snapshot.sign_level is None + assert snapshot.keywords is None + assert snapshot.has_changing_yao is None