feat(agent): session deletion anonymization for iOS compliance

Replace soft-delete with anonymize + hard-delete to meet iOS App Store
data retention requirements. Non-PII fields are preserved in
anonymous_session_snapshots for analytics.

- Add anonymous_session_snapshots table and ORM model
- Implement anonymizer to extract non-PII fields before deletion
- Remove points_ledger.biz_id FK constraint (snapshot-style reference)
- Preserve transaction history while allowing session deletion
- Add 14 unit tests + 1 integration test
This commit is contained in:
qzl
2026-04-15 18:18:39 +08:00
parent a244eaa666
commit c2b726e7bd
10 changed files with 829 additions and 7 deletions
@@ -0,0 +1,111 @@
"""add anonymous_session_snapshots table for iOS compliance
Revision ID: 20260415_0001
Revises: 20260413_0004
Create Date: 2026-04-15 00:10:00
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = "20260415_0001"
down_revision: Union[str, Sequence[str], None] = "20260413_0004"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"anonymous_session_snapshots",
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("anonymous_id", sa.UUID(), nullable=False),
sa.Column("session_type", sa.String(length=20), nullable=False),
sa.Column("message_count", sa.Integer(), nullable=True),
sa.Column("status", sa.String(length=20), nullable=True),
sa.Column("question_type", sa.String(length=50), nullable=True),
sa.Column("tool_name", sa.String(length=100), nullable=True),
sa.Column("gua_name", sa.String(length=50), nullable=True),
sa.Column("gua_name_hant", sa.String(length=50), nullable=True),
sa.Column("target_gua_name", sa.String(length=50), nullable=True),
sa.Column("has_changing_yao", sa.Boolean(), nullable=True),
sa.Column("sign_level", sa.String(length=20), nullable=True),
sa.Column("keywords", postgresql.ARRAY(sa.Text()), nullable=True),
sa.Column("model_code", sa.String(length=50), nullable=True),
sa.Column("total_tokens", sa.Integer(), nullable=True),
sa.Column("total_cost", sa.Numeric(12, 6), nullable=True),
sa.Column("total_latency_ms", sa.Integer(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
),
sa.Column("last_activity_at", sa.DateTime(timezone=True), nullable=True),
sa.Column(
"anonymized_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_anonymous_session_snapshots_anonymous_id",
"anonymous_session_snapshots",
["anonymous_id"],
unique=False,
)
op.create_index(
"ix_anonymous_session_snapshots_created_at",
"anonymous_session_snapshots",
["created_at"],
unique=False,
)
op.create_index(
"ix_anonymous_session_snapshots_question_type",
"anonymous_session_snapshots",
["question_type"],
unique=False,
)
_enable_service_role_only_rls("anonymous_session_snapshots")
def downgrade() -> None:
_drop_rls("anonymous_session_snapshots")
op.drop_index(
"ix_anonymous_session_snapshots_question_type",
table_name="anonymous_session_snapshots",
)
op.drop_index(
"ix_anonymous_session_snapshots_created_at",
table_name="anonymous_session_snapshots",
)
op.drop_index(
"ix_anonymous_session_snapshots_anonymous_id",
table_name="anonymous_session_snapshots",
)
op.drop_table("anonymous_session_snapshots")
def _enable_service_role_only_rls(table_name: str) -> None:
for role in ["anon", "authenticated"]:
for action in ["select", "insert", "update", "delete"]:
op.execute(
f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}"
)
op.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY")
op.execute(
f"CREATE POLICY service_role_all_{table_name} ON {table_name} FOR ALL TO service_role USING (true) WITH CHECK (true)"
)
def _drop_rls(table_name: str) -> None:
for role in ["anon", "authenticated"]:
for action in ["select", "insert", "update", "delete"]:
op.execute(
f"DROP POLICY IF EXISTS {role}_{action}_{table_name} ON {table_name}"
)
op.execute(f"DROP POLICY IF EXISTS service_role_all_{table_name} ON {table_name}")
op.execute(f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY")
@@ -0,0 +1,40 @@
"""drop points_ledger.biz_id foreign key for snapshot-style reference
Revision ID: 20260415_0002
Revises: 20260415_0001
Create Date: 2026-04-15 10:00:00
points_ledger.biz_id stores a snapshot reference to sessions.id for audit purposes.
This allows sessions to be deleted while preserving the biz_id value in points_ledger
for user-facing transaction history.
The FK constraint is removed because:
1. Users need to see their points transaction history even after session deletion
2. Session deletion (anonymization for iOS compliance) should not cascade delete
points_ledger records
3. biz_id becomes a "snapshot" reference - the value is kept but no FK enforcement
"""
from typing import Sequence, Union
from alembic import op
revision: str = "20260415_0002"
down_revision: Union[str, Sequence[str], None] = "20260415_0001"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.drop_constraint("points_ledger_biz_id_fkey", "points_ledger", type_="foreignkey")
def downgrade() -> None:
op.create_foreign_key(
"points_ledger_biz_id_fkey",
"points_ledger",
"sessions",
["biz_id"],
["id"],
ondelete="SET NULL",
)
+2
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
from .agent_chat_message import AgentChatMessage
from .agent_chat_session import AgentChatSession
from .anonymous_session_snapshot import AnonymousSessionSnapshot
from .auth_user import AuthUser
from .invite_code import InviteCode
from .llm import Llm
@@ -18,6 +19,7 @@ from .user_points import UserPoints
__all__ = [
"AgentChatMessage",
"AgentChatSession",
"AnonymousSessionSnapshot",
"AuthUser",
"InviteCode",
"Llm",
@@ -0,0 +1,46 @@
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
import uuid
from sqlalchemy import Boolean, DateTime, Integer, Numeric, String, Text
from sqlalchemy.dialects.postgresql import ARRAY, UUID
from sqlalchemy.orm import Mapped, mapped_column
from core.db.base import Base
__all__ = ["AnonymousSessionSnapshot"]
class AnonymousSessionSnapshot(Base):
__tablename__: str = "anonymous_session_snapshots"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
anonymous_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False)
session_type: Mapped[str] = mapped_column(String(20), nullable=False)
message_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
status: Mapped[str | None] = mapped_column(String(20), nullable=True)
question_type: Mapped[str | None] = mapped_column(String(50), nullable=True)
tool_name: Mapped[str | None] = mapped_column(String(100), nullable=True)
gua_name: Mapped[str | None] = mapped_column(String(50), nullable=True)
gua_name_hant: Mapped[str | None] = mapped_column(String(50), nullable=True)
target_gua_name: Mapped[str | None] = mapped_column(String(50), nullable=True)
has_changing_yao: Mapped[bool | None] = mapped_column(Boolean, nullable=True)
sign_level: Mapped[str | None] = mapped_column(String(20), nullable=True)
keywords: Mapped[list[str] | None] = mapped_column(ARRAY(Text()), nullable=True)
model_code: Mapped[str | None] = mapped_column(String(50), nullable=True)
total_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
total_cost: Mapped[Decimal | None] = mapped_column(Numeric(12, 6), nullable=True)
total_latency_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
last_activity_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
anonymized_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
+162
View File
@@ -0,0 +1,162 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from uuid import uuid4
from models.agent_chat_message import AgentChatMessage
from models.agent_chat_session import AgentChatSession
from models.anonymous_session_snapshot import AnonymousSessionSnapshot
from core.logging import get_logger
logger = get_logger(__name__)
def _truncate_to_day(dt: datetime) -> datetime:
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _extract_derived_fields(
messages: list[AgentChatMessage],
) -> dict[str, Any]:
for message in messages:
metadata_raw = message.metadata_json
if not isinstance(metadata_raw, dict):
continue
agent_output = metadata_raw.get("agent_output")
if not isinstance(agent_output, dict):
continue
derived = agent_output.get("divination_derived")
if isinstance(derived, dict) and derived:
return derived
return {}
def _extract_sign_level(
messages: list[AgentChatMessage],
) -> str | None:
for message in messages:
metadata_raw = message.metadata_json
if not isinstance(metadata_raw, dict):
continue
agent_output = metadata_raw.get("agent_output")
if not isinstance(agent_output, dict):
continue
sign_level = agent_output.get("sign_level")
if isinstance(sign_level, str) and sign_level:
return sign_level
return None
def _extract_keywords(
messages: list[AgentChatMessage],
) -> list[str] | None:
for message in messages:
metadata_raw = message.metadata_json
if not isinstance(metadata_raw, dict):
continue
agent_output = metadata_raw.get("agent_output")
if not isinstance(agent_output, dict):
continue
keywords = agent_output.get("keywords")
if isinstance(keywords, list) and keywords:
return keywords
return None
def _extract_question_type(
messages: list[AgentChatMessage],
) -> str | None:
derived = _extract_derived_fields(messages)
if not derived:
for message in messages:
metadata_raw = message.metadata_json
if not isinstance(metadata_raw, dict):
continue
agent_output = metadata_raw.get("agent_output")
if not isinstance(agent_output, dict):
continue
question_type = agent_output.get("questionType")
if isinstance(question_type, str) and question_type:
return question_type
return None
question_type = derived.get("questionType")
if isinstance(question_type, str) and question_type:
return question_type
return None
def _extract_model_code(
messages: list[AgentChatMessage],
) -> str | None:
for message in messages:
if message.model_code:
return message.model_code
return None
def _extract_tool_name(
messages: list[AgentChatMessage],
) -> str | None:
for message in messages:
if message.tool_name:
return message.tool_name
return None
def _aggregate_latency(
messages: list[AgentChatMessage],
) -> int | None:
total = 0
found = False
for message in messages:
if message.latency_ms is not None:
total += message.latency_ms
found = True
return total if found else None
def anonymize(
session: AgentChatSession,
messages: list[AgentChatMessage],
) -> AnonymousSessionSnapshot:
derived = _extract_derived_fields(messages)
gua_name = derived.get("guaName") if derived else None
gua_name_hant = derived.get("guaNameHant") if derived else None
target_gua_name = derived.get("targetGuaName") if derived else None
has_changing_yao = derived.get("hasChangingYao") if derived else None
created_at = _truncate_to_day(session.created_at)
last_activity_at = (
_truncate_to_day(session.last_activity_at) if session.last_activity_at else None
)
return AnonymousSessionSnapshot(
id=uuid4(),
anonymous_id=uuid4(),
session_type=session.session_type.value
if hasattr(session.session_type, "value")
else str(session.session_type),
message_count=session.message_count,
status=session.status.value
if hasattr(session.status, "value")
else str(session.status),
question_type=_extract_question_type(messages),
tool_name=_extract_tool_name(messages),
gua_name=gua_name if isinstance(gua_name, str) else None,
gua_name_hant=gua_name_hant if isinstance(gua_name_hant, str) else None,
target_gua_name=target_gua_name if isinstance(target_gua_name, str) else None,
has_changing_yao=has_changing_yao
if isinstance(has_changing_yao, bool)
else None,
sign_level=_extract_sign_level(messages),
keywords=_extract_keywords(messages),
model_code=_extract_model_code(messages),
total_tokens=session.total_tokens,
total_cost=session.total_cost,
total_latency_ms=_aggregate_latency(messages),
created_at=created_at,
last_activity_at=last_activity_at,
anonymized_at=datetime.now(timezone.utc),
)
+45 -5
View File
@@ -5,7 +5,7 @@ from decimal import Decimal
from typing import Any, Protocol
from uuid import UUID, uuid4
from sqlalchemy import Select, func, select
from sqlalchemy import Select, delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.http.errors import ApiProblemError
@@ -17,6 +17,7 @@ from schemas.domain.chat_message import (
AgentChatMessage as AgentChatMessageSchema,
AgentChatMessageMetadata,
)
from v1.agent.anonymizer import anonymize
class ToolResultPayloadStorage(Protocol):
@@ -96,7 +97,7 @@ class AgentRepository:
async def rollback(self) -> None:
await self._session.rollback()
async def delete_session(self, *, session_id: str) -> None:
async def delete_session(self, *, session_id: str) -> list[dict[str, str]]:
try:
session_uuid = UUID(session_id)
except ValueError as exc:
@@ -112,11 +113,50 @@ class AgentRepository:
)
session = (await self._session.execute(stmt)).scalar_one_or_none()
if session is None:
return
return []
if session.deleted_at is not None:
return
session.deleted_at = datetime.now(timezone.utc)
return []
messages_stmt = (
select(AgentChatMessage)
.where(AgentChatMessage.session_id == session_uuid)
.order_by(AgentChatMessage.seq)
)
messages = list((await self._session.execute(messages_stmt)).scalars().all())
attachment_paths = self._collect_attachment_paths(messages)
snapshot = anonymize(session=session, messages=messages)
self._session.add(snapshot)
await self._session.flush()
stmt_delete_messages = delete(AgentChatMessage).where(
AgentChatMessage.session_id == session_uuid
)
await self._session.execute(stmt_delete_messages)
stmt_delete_session = delete(AgentChatSession).where(
AgentChatSession.id == session_uuid
)
await self._session.execute(stmt_delete_session)
await self._session.flush()
return attachment_paths
@staticmethod
def _collect_attachment_paths(
messages: list[AgentChatMessage],
) -> list[dict[str, str]]:
paths: list[dict[str, str]] = []
for message in messages:
metadata_raw = message.metadata_json
if not isinstance(metadata_raw, dict):
continue
attachments_raw = metadata_raw.get("user_message_attachments")
if not isinstance(attachments_raw, list):
continue
for attachment in attachments_raw:
if not isinstance(attachment, dict):
continue
bucket = attachment.get("bucket")
path = attachment.get("path")
if isinstance(bucket, str) and isinstance(path, str):
paths.append({"bucket": bucket, "path": path})
return paths
async def persist_user_message(
self,
+3 -1
View File
@@ -23,7 +23,7 @@ class AgentRepositoryLike(Protocol):
async def rollback(self) -> None: ...
async def delete_session(self, *, session_id: str) -> None: ...
async def delete_session(self, *, session_id: str) -> list[dict[str, str]]: ...
async def get_history_day(
self,
@@ -126,6 +126,8 @@ class AttachmentStorageLike(Protocol):
expires_in_seconds: int,
) -> str: ...
async def delete_prefix(self, *, bucket: str, prefix: str) -> int: ...
def parse_signed_url(self, url: str) -> tuple[str, str]: ...
+21 -1
View File
@@ -235,8 +235,28 @@ class AgentService:
return
raise
ensure_session_owner(owner_id=owner, current_user=current_user)
await self._repository.delete_session(session_id=thread_id)
attachment_paths = await self._repository.delete_session(session_id=thread_id)
await self._repository.commit()
await self._cleanup_attachments(attachment_paths)
async def _cleanup_attachments(
self, attachment_paths: list[dict[str, str]]
) -> None:
if not attachment_paths or self._attachment_storage is None:
return
for attachment in attachment_paths:
bucket = attachment.get("bucket")
path = attachment.get("path")
if not bucket or not path:
continue
try:
await self._attachment_storage.delete_prefix(bucket=bucket, prefix=path)
except Exception:
logger.warning(
"attachment_cleanup_failed",
bucket=bucket,
path=path,
)
async def _append_context_cache_user_message(
self,
@@ -0,0 +1,183 @@
from __future__ import annotations
import json
import time
import uuid
from typing import TypedDict
import httpx
import pytest
from sqlalchemy import select
from core.db.session import AsyncSessionLocal
from models.agent_chat_session import AgentChatSession
from models.agent_chat_message import AgentChatMessage
from models.anonymous_session_snapshot import AnonymousSessionSnapshot
class IdentityData(TypedDict):
email: str
code: str
async def _create_email_session(
client: httpx.AsyncClient,
*,
email: str,
code: str,
) -> dict[str, object]:
resp = await client.post(
"/api/v1/auth/email-session",
json={"email": email, "token": code},
)
resp.raise_for_status()
return resp.json()
async def _wait_terminal_event(
client: httpx.AsyncClient,
*,
access_token: str,
thread_id: str,
run_id: str,
timeout_s: int = 180,
) -> str:
headers = {"Authorization": f"Bearer {access_token}"}
params = {"runId": run_id, "idle_limit": 120}
started = time.time()
async with client.stream(
"GET",
f"/api/v1/agent/runs/{thread_id}/events",
headers=headers,
params=params,
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if time.time() - started > timeout_s:
raise TimeoutError("SSE timed out")
if not line or not line.startswith("data: "):
continue
event = json.loads(line[6:])
event_type = event.get("type")
if event_type in {"RUN_FINISHED", "RUN_ERROR"}:
return str(event_type)
raise RuntimeError("No terminal SSE event")
def _build_run_payload(*, thread_id: str, run_id: str) -> dict[str, object]:
now = int(time.time() * 1000)
return {
"threadId": thread_id,
"runId": run_id,
"state": {},
"messages": [
{
"id": f"msg_{run_id}_user_0",
"role": "user",
"content": "今天事业运如何?",
}
],
"tools": [],
"context": [],
"forwardedProps": {
"runtime_mode": "chat",
"client_time": {
"device_timezone": "Asia/Shanghai",
"client_now_iso": "2026-04-15T12:00:00Z",
"client_epoch_ms": now,
},
"divinationPayload": {
"divinationMethod": "自动起卦",
"questionType": "事业",
"question": "今天事业运如何?",
"divinationTimeIso": "2026-04-15T12:00:00Z",
"yaoLines": ["少阳", "少阴", "老阳", "少阳", "老阴", "少阴"],
},
},
}
@pytest.mark.asyncio
async def test_session_delete_anonymizes_and_hard_deletes(
api_client: httpx.AsyncClient,
test_identity: IdentityData,
db_cleanup: list[str],
) -> None:
email = str(test_identity["email"]).strip().lower()
db_cleanup.append(email)
auth_resp = await _create_email_session(
api_client,
email=email,
code=str(test_identity["code"]),
)
user = auth_resp.get("user")
assert isinstance(user, dict)
access_token = str(auth_resp["access_token"])
headers = {"Authorization": f"Bearer {access_token}"}
thread_id = str(uuid.uuid4())
run_id = f"run_{int(time.time() * 1000)}"
enqueue = await api_client.post(
"/api/v1/agent/runs",
headers=headers,
json=_build_run_payload(thread_id=thread_id, run_id=run_id),
)
assert enqueue.status_code == 202
terminal = await _wait_terminal_event(
api_client,
access_token=access_token,
thread_id=thread_id,
run_id=run_id,
)
assert terminal in {"RUN_FINISHED", "RUN_ERROR"}
async with AsyncSessionLocal() as session:
session_result = await session.execute(
select(AgentChatSession).where(AgentChatSession.id == uuid.UUID(thread_id))
)
session_obj = session_result.scalar_one_or_none()
assert session_obj is not None, "Session should exist before deletion"
delete_resp = await api_client.delete(
f"/api/v1/agent/sessions/{thread_id}",
headers=headers,
)
assert delete_resp.status_code == 204
async with AsyncSessionLocal() as session:
session_result = await session.execute(
select(AgentChatSession).where(AgentChatSession.id == uuid.UUID(thread_id))
)
deleted_session = session_result.scalar_one_or_none()
assert deleted_session is None, (
"Session should be hard-deleted, not soft-deleted"
)
msg_result = await session.execute(
select(AgentChatMessage).where(
AgentChatMessage.session_id == uuid.UUID(thread_id)
)
)
remaining_messages = msg_result.scalars().all()
assert len(remaining_messages) == 0, (
"Messages should be hard-deleted along with session"
)
snapshot_result = await session.execute(
select(AnonymousSessionSnapshot).order_by(
AnonymousSessionSnapshot.anonymized_at.desc()
)
)
snapshots = snapshot_result.scalars().all()
assert len(snapshots) >= 1, "At least one anonymous snapshot should exist"
snapshot = snapshots[0]
assert snapshot.session_type == "chat"
assert snapshot.anonymous_id is not None
assert snapshot.id is not None
assert snapshot.anonymized_at is not None
+216
View File
@@ -0,0 +1,216 @@
from __future__ import annotations
from datetime import datetime, timezone
from decimal import Decimal
from uuid import uuid4
from schemas.enums import AgentChatMessageRole, AgentChatSessionStatus, SessionType
from models.agent_chat_message import AgentChatMessage
from models.agent_chat_session import AgentChatSession
from v1.agent.anonymizer import (
_aggregate_latency,
_extract_derived_fields,
_extract_keywords,
_extract_model_code,
_extract_question_type,
_extract_sign_level,
_extract_tool_name,
_truncate_to_day,
anonymize,
)
def _make_session(**overrides: object) -> AgentChatSession:
defaults: dict[str, object] = {
"id": uuid4(),
"user_id": uuid4(),
"session_type": SessionType.CHAT,
"status": AgentChatSessionStatus.COMPLETED,
"message_count": 3,
"total_tokens": 1500,
"total_cost": Decimal("0.05"),
"created_at": datetime(2026, 4, 15, 14, 32, 0, tzinfo=timezone.utc),
"last_activity_at": datetime(2026, 4, 15, 14, 45, 0, tzinfo=timezone.utc),
"job_id": None,
"title": "Will I get the job?",
"state_snapshot": None,
"updated_at": datetime(2026, 4, 15, 14, 45, 0, tzinfo=timezone.utc),
"deleted_at": None,
}
defaults.update(overrides)
return AgentChatSession(**defaults)
def _make_message(
*,
session_id: object | None = None,
role: AgentChatMessageRole = AgentChatMessageRole.ASSISTANT,
metadata_json: dict[str, object] | None = None,
model_code: str | None = None,
tool_name: str | None = None,
latency_ms: int | None = None,
) -> AgentChatMessage:
return AgentChatMessage(
id=uuid4(),
session_id=session_id or uuid4(),
seq=1,
role=role,
content="some content",
model_code=model_code,
tool_name=tool_name,
input_tokens=100,
output_tokens=200,
cost=Decimal("0.02"),
latency_ms=latency_ms,
visibility_mask=0,
metadata_json=metadata_json,
created_at=datetime(2026, 4, 15, 14, 33, 0, tzinfo=timezone.utc),
updated_at=datetime(2026, 4, 15, 14, 33, 0, tzinfo=timezone.utc),
deleted_at=None,
)
def test_truncate_to_day() -> None:
dt = datetime(2026, 4, 15, 14, 32, 45, 123456, tzinfo=timezone.utc)
result = _truncate_to_day(dt)
assert result == datetime(2026, 4, 15, 0, 0, 0, 0, tzinfo=timezone.utc)
def test_extract_derived_fields_found() -> None:
msg = _make_message(
metadata_json={
"agent_output": {
"divination_derived": {
"guaName": "",
"questionType": "career",
"hasChangingYao": True,
}
}
}
)
derived = _extract_derived_fields([msg])
assert derived.get("guaName") == ""
assert derived.get("questionType") == "career"
def test_extract_derived_fields_missing() -> None:
msg = _make_message(metadata_json={"run_id": "abc"})
derived = _extract_derived_fields([msg])
assert derived == {}
def test_extract_sign_level() -> None:
msg = _make_message(metadata_json={"agent_output": {"sign_level": "中上签"}})
assert _extract_sign_level([msg]) == "中上签"
def test_extract_sign_level_none() -> None:
msg = _make_message(metadata_json={"agent_output": {}})
assert _extract_sign_level([msg]) is None
def test_extract_keywords() -> None:
msg = _make_message(metadata_json={"agent_output": {"keywords": ["事业", "贵人"]}})
assert _extract_keywords([msg]) == ["事业", "贵人"]
def test_extract_question_type_from_derived() -> None:
msg = _make_message(
metadata_json={
"agent_output": {
"divination_derived": {"questionType": "career"},
}
}
)
assert _extract_question_type([msg]) == "career"
def test_extract_question_type_from_agent_output() -> None:
msg = _make_message(metadata_json={"agent_output": {"questionType": "love"}})
assert _extract_question_type([msg]) == "love"
def test_extract_model_code() -> None:
msg = _make_message(model_code="qwen3.5-flash")
assert _extract_model_code([msg]) == "qwen3.5-flash"
def test_extract_tool_name() -> None:
msg = _make_message(tool_name="liuyao")
assert _extract_tool_name([msg]) == "liuyao"
def test_aggregate_latency() -> None:
msg1 = _make_message(latency_ms=500)
msg2 = _make_message(latency_ms=300)
assert _aggregate_latency([msg1, msg2]) == 800
def test_aggregate_latency_none() -> None:
msg = _make_message(latency_ms=None)
assert _aggregate_latency([msg]) is None
def test_anonymize_full_snapshot() -> None:
session = _make_session()
msg = _make_message(
session_id=session.id,
role=AgentChatMessageRole.ASSISTANT,
model_code="qwen3.5-flash",
tool_name="liuyao",
latency_ms=1200,
metadata_json={
"agent_output": {
"sign_level": "上上签",
"keywords": ["事业", "贵人"],
"divination_derived": {
"questionType": "career",
"guaName": "",
"guaNameHant": "",
"targetGuaName": "",
"hasChangingYao": True,
},
}
},
)
user_msg = _make_message(
session_id=session.id,
role=AgentChatMessageRole.USER,
latency_ms=None,
)
snapshot = anonymize(session=session, messages=[msg, user_msg])
assert snapshot.session_type == "chat"
assert snapshot.message_count == 3
assert snapshot.status == "completed"
assert snapshot.question_type == "career"
assert snapshot.tool_name == "liuyao"
assert snapshot.model_code == "qwen3.5-flash"
assert snapshot.gua_name == ""
assert snapshot.gua_name_hant == ""
assert snapshot.target_gua_name == ""
assert snapshot.has_changing_yao is True
assert snapshot.sign_level == "上上签"
assert snapshot.keywords == ["事业", "贵人"]
assert snapshot.total_tokens == 1500
assert snapshot.total_cost == Decimal("0.05")
assert snapshot.total_latency_ms == 1200
assert snapshot.created_at == datetime(2026, 4, 15, 0, 0, 0, tzinfo=timezone.utc)
assert snapshot.last_activity_at == datetime(
2026, 4, 15, 0, 0, 0, tzinfo=timezone.utc
)
assert snapshot.anonymous_id is not None
assert snapshot.id is not None
def test_anonymize_no_metadata() -> None:
session = _make_session()
msg = _make_message(session_id=session.id, metadata_json=None)
snapshot = anonymize(session=session, messages=[msg])
assert snapshot.question_type is None
assert snapshot.gua_name is None
assert snapshot.sign_level is None
assert snapshot.keywords is None
assert snapshot.has_changing_yao is None