Files
social-app/backend/tests/e2e/test_agent_live_flow.py
T
zl-q 3ac09475ad feat(agent): add voice input capability and standardize tool naming
- Add voice recording with transcribe endpoint (ASR) for multimodal input
- Android: add RECORD_AUDIO and INTERNET permissions
- Refactor tool naming: frontend tools use 'front.' prefix, backend tools use 'back.'
- Migrate calendar tools: create_calendar_event -> back.mutate/list/delete events
- Add calendar_event_list.v1 and calendar_operation.v1 UI card types
- Update all Flutter and Python tests to match new tool naming conventions
- Add record package dependency for voice recording
2026-03-09 00:10:09 +08:00

563 lines
20 KiB
Python

from __future__ import annotations
import base64
import json
import os
import uuid
from decimal import Decimal
from pathlib import Path
import pytest
from sqlalchemy import delete, select
from core.agent.application.resume_service import ResumeService
from core.agent.application.run_service import RunService
from core.agent.infrastructure.queue.tasks import run_agent_task
from core.agent.infrastructure.storage.tool_result_storage import (
create_tool_result_storage,
)
from core.db import AsyncSessionLocal, engine
from models.agent_chat_message import AgentChatMessage, AgentChatMessageRole
from models.agent_chat_session import AgentChatSession, AgentChatSessionStatus
from models.llm import Llm
from models.llm_factory import LlmFactory
from models.profile import Profile
from models.schedule_items import ScheduleItem
from models.system_agents import SystemAgents
from services.base.supabase import supabase_service
IMAGE_FIXTURE = (
Path(__file__).resolve().parents[1] / "fixtures" / "images" / "calendar_text_cn.png"
)
def _live_enabled() -> bool:
return os.getenv("AGENT_LIVE_E2E") == "1"
async def _init_supabase_admin_client():
initialized = await supabase_service.initialize()
if not initialized:
pytest.skip("Supabase service unavailable")
return supabase_service.get_admin_client()
async def _create_owner_profile(admin_client) -> tuple[uuid.UUID, str]:
user_email = f"agent-live-{uuid.uuid4().hex[:8]}@example.com"
created = admin_client.auth.admin.create_user(
{
"email": user_email,
"password": "Passw0rd!123",
"email_confirm": True,
}
)
user_id = str(created.user.id)
owner_id = uuid.UUID(user_id)
return owner_id, user_id
async def _resolve_llm_id(
*,
target_model_code: str = "deepseek-chat",
target_factory_name: str = "deepseek",
) -> tuple[uuid.UUID, uuid.UUID | None, uuid.UUID | None]:
await engine.dispose()
async with AsyncSessionLocal() as session:
llm_row = await session.execute(
select(Llm.id).where(Llm.model_code == target_model_code).limit(1)
)
llm_id = llm_row.scalar_one_or_none()
if llm_id is not None:
return llm_id, None, None
factory_id = uuid.uuid4()
llm_id = uuid.uuid4()
created_factory = False
async with AsyncSessionLocal() as session:
factory_row = await session.execute(
select(LlmFactory.id).where(LlmFactory.name == target_factory_name).limit(1)
)
existing_factory_id = factory_row.scalar_one_or_none()
if existing_factory_id is not None:
factory_id = existing_factory_id
else:
session.add(
LlmFactory(
id=factory_id,
name=target_factory_name,
request_url=f"https://{target_factory_name}.example",
)
)
await session.commit()
created_factory = True
async with AsyncSessionLocal() as session:
session.add(
Llm(
id=llm_id,
factory_id=factory_id,
model_code=target_model_code,
)
)
await session.commit()
return llm_id, llm_id, factory_id if created_factory else None
async def _seed_session_with_active_agent(
*,
session_id: uuid.UUID,
owner_id: uuid.UUID,
agent_type: str,
llm_id: uuid.UUID,
) -> None:
await engine.dispose()
async with AsyncSessionLocal() as session:
session.add(SystemAgents(agent_type=agent_type, llm_id=llm_id, status="active"))
session.add(AgentChatSession(id=session_id, user_id=owner_id))
await session.commit()
async def _cleanup_session_and_agent(
*,
session_id: uuid.UUID,
agent_type: str,
owner_id: uuid.UUID,
llm_id_to_cleanup: uuid.UUID | None,
factory_id_to_cleanup: uuid.UUID | None,
) -> None:
async with AsyncSessionLocal() as session:
await session.execute(
delete(AgentChatSession).where(AgentChatSession.id == session_id)
)
await session.execute(
delete(SystemAgents).where(SystemAgents.agent_type == agent_type)
)
await session.execute(delete(Profile).where(Profile.id == owner_id))
if llm_id_to_cleanup is not None:
await session.execute(delete(Llm).where(Llm.id == llm_id_to_cleanup))
if factory_id_to_cleanup is not None:
await session.execute(
delete(LlmFactory).where(LlmFactory.id == factory_id_to_cleanup)
)
await session.commit()
async def _cleanup_auth_user(*, admin_client, user_id: str | None) -> None:
if user_id is None:
return
try:
admin_client.auth.admin.delete_user(user_id)
except Exception:
return
def _encode_fixture_image_base64() -> str:
data = IMAGE_FIXTURE.read_bytes()
return base64.b64encode(data).decode("ascii")
@pytest.mark.asyncio
@pytest.mark.live
async def test_agent_live_intent_only_no_tool() -> None:
if not _live_enabled():
pytest.skip("Live test disabled")
session_id = uuid.uuid4()
agent_type = f"LIVE_E2E_{uuid.uuid4().hex[:8]}"
admin_client = await _init_supabase_admin_client()
owner_id, test_user_id = await _create_owner_profile(admin_client)
llm_id, llm_cleanup_id, factory_cleanup_id = await _resolve_llm_id()
try:
await _seed_session_with_active_agent(
session_id=session_id,
owner_id=owner_id,
agent_type=agent_type,
llm_id=llm_id,
)
result = await run_agent_task(
{
"command": "run",
"run_input": {
"threadId": str(session_id),
"runId": "run-live-intent-1",
"state": {},
"messages": [
{
"id": "u1",
"role": "user",
"content": "请用一句话介绍你是谁。",
}
],
"tools": [],
"context": [],
"forwardedProps": {},
},
},
run_service=RunService(),
resume_service=ResumeService(),
)
assert result["pending_tool_call_id"] is None
await engine.dispose()
async with AsyncSessionLocal() as session:
chat_session = await session.get(AgentChatSession, session_id)
assert chat_session is not None
assert chat_session.status == AgentChatSessionStatus.COMPLETED
rows = await session.execute(
select(AgentChatMessage)
.where(AgentChatMessage.session_id == session_id)
.order_by(AgentChatMessage.seq.asc())
)
messages = list(rows.scalars().all())
assert [m.role for m in messages] == [
AgentChatMessageRole.USER,
AgentChatMessageRole.ASSISTANT,
]
finally:
await _cleanup_session_and_agent(
session_id=session_id,
agent_type=agent_type,
owner_id=owner_id,
llm_id_to_cleanup=llm_cleanup_id,
factory_id_to_cleanup=factory_cleanup_id,
)
await _cleanup_auth_user(admin_client=admin_client, user_id=test_user_id)
await supabase_service.close()
@pytest.mark.asyncio
@pytest.mark.live
async def test_agent_live_image_calendar_tool_persistence() -> None:
if not _live_enabled():
pytest.skip("Live test disabled")
admin_client = await _init_supabase_admin_client()
tool_result_storage = create_tool_result_storage()
if tool_result_storage is None:
pytest.skip("Tool result storage unavailable")
storage = admin_client.storage
try:
storage.get_bucket("private")
except Exception:
storage.create_bucket("private", "private", {"public": False})
probe_path = f"tool-results/probe/{uuid.uuid4().hex}.json"
try:
storage.from_("private").upload(probe_path, b"{}")
storage.from_("private").remove([probe_path])
except Exception:
pytest.skip("Supabase private storage bucket is not writable")
owner_id, test_user_id = await _create_owner_profile(admin_client)
llm_id, llm_cleanup_id, factory_cleanup_id = await _resolve_llm_id(
target_model_code="qwen3.5-flash",
target_factory_name="dashscope",
)
session_id = uuid.uuid4()
agent_type = f"LIVE_E2E_{uuid.uuid4().hex[:8]}"
uploaded_paths: list[str] = []
try:
await _seed_session_with_active_agent(
session_id=session_id,
owner_id=owner_id,
agent_type=agent_type,
llm_id=llm_id,
)
image_b64 = _encode_fixture_image_base64()
result = await run_agent_task(
{
"command": "run",
"run_input": {
"threadId": str(session_id),
"runId": "run-live-image-1",
"state": {},
"messages": [
{
"id": "u1",
"role": "user",
"content": [
{
"type": "text",
"text": (
"请先识别图片中的日程文字,然后调用后端日历工具创建事件。"
"返回时请确保标题和开始时间不为空。"
),
},
{
"type": "binary",
"mimeType": "image/png",
"data": image_b64,
},
],
}
],
"tools": [],
"context": [],
"forwardedProps": {},
},
},
run_service=RunService(
tool_result_storage=tool_result_storage,
tool_result_offload_threshold_bytes=1,
tool_result_bucket="private",
tool_result_prefix="tool-results",
),
resume_service=ResumeService(),
)
assert result["pending_tool_call_id"] is None
await engine.dispose()
async with AsyncSessionLocal() as session:
chat_session = await session.get(AgentChatSession, session_id)
assert chat_session is not None
assert chat_session.status == AgentChatSessionStatus.COMPLETED
schedule_rows = await session.execute(
select(ScheduleItem)
.where(ScheduleItem.owner_id == owner_id)
.order_by(ScheduleItem.created_at.desc())
)
created_items = list(schedule_rows.scalars().all())
assert created_items, (
"Expected schedule item created by backend calendar tool"
)
created_item = created_items[0]
assert created_item.title
assert created_item.timezone
assert created_item.start_at is not None
tool_rows = await session.execute(
select(AgentChatMessage)
.where(AgentChatMessage.session_id == session_id)
.where(AgentChatMessage.role == AgentChatMessageRole.TOOL)
.order_by(AgentChatMessage.seq.desc())
)
tool_message = tool_rows.scalars().first()
assert tool_message is not None
metadata = tool_message.metadata_json or {}
storage_bucket = metadata.get("storage_bucket")
storage_path = metadata.get("storage_path")
assert storage_bucket == "private"
assert isinstance(storage_path, str)
assert storage_path.startswith("tool-results/")
uploaded_paths.append(storage_path)
downloaded = storage.from_("private").download(uploaded_paths[0])
if isinstance(downloaded, bytes):
payload = json.loads(downloaded.decode("utf-8"))
else:
payload = json.loads(str(downloaded))
assert payload["toolName"] == "back.mutate_calendar_event"
finally:
if uploaded_paths:
try:
storage.from_("private").remove(uploaded_paths)
except Exception:
pass
async with AsyncSessionLocal() as cleanup_session:
await cleanup_session.execute(
delete(ScheduleItem).where(ScheduleItem.owner_id == owner_id)
)
await cleanup_session.commit()
await _cleanup_session_and_agent(
session_id=session_id,
agent_type=agent_type,
owner_id=owner_id,
llm_id_to_cleanup=llm_cleanup_id,
factory_id_to_cleanup=factory_cleanup_id,
)
await _cleanup_auth_user(admin_client=admin_client, user_id=test_user_id)
await supabase_service.close()
@pytest.mark.asyncio
@pytest.mark.live
async def test_agent_live_front_tool_interrupt_resume_continue() -> None:
if not _live_enabled():
pytest.skip("Live test disabled")
admin_client = await _init_supabase_admin_client()
owner_id, test_user_id = await _create_owner_profile(admin_client)
llm_id, llm_cleanup_id, factory_cleanup_id = await _resolve_llm_id()
session_id = uuid.uuid4()
agent_type = f"LIVE_E2E_{uuid.uuid4().hex[:8]}"
queued_commands: list[dict[str, object]] = []
published_events: list[str] = []
async def _publish(event: dict[str, object]) -> None:
event_type = event.get("type")
if isinstance(event_type, str):
published_events.append(event_type)
async def _enqueue(command: dict[str, object]) -> str:
queued_commands.append(command)
return "task-followup-live"
try:
await _seed_session_with_active_agent(
session_id=session_id,
owner_id=owner_id,
agent_type=agent_type,
llm_id=llm_id,
)
run_result = await run_agent_task(
{
"command": "run",
"run_input": {
"threadId": str(session_id),
"runId": "run-live-front-1",
"state": {},
"messages": [
{
"id": "u1",
"role": "user",
"content": "你必须调用 front.navigate_to_route 工具跳转到 /calendar/dayweek。",
}
],
"tools": [
{
"name": "front.navigate_to_route",
"description": "Navigate frontend route; runtime raises approval interrupt when called.",
"parameters": {
"type": "object",
"properties": {
"target": {"type": "string"},
"replace": {"type": "boolean"},
},
"required": ["target"],
},
}
],
"context": [],
"forwardedProps": {},
},
},
publish_event=_publish,
enqueue_command=_enqueue,
run_service=RunService(),
resume_service=ResumeService(),
)
pending_tool_call_id = run_result["pending_tool_call_id"]
assert isinstance(pending_tool_call_id, str), (
f"Expected pending tool call, got result: {json.dumps(run_result, ensure_ascii=False)}"
)
snapshot = run_result["state_snapshot"]
assert isinstance(snapshot, dict)
pending_tool_nonce = snapshot.get("pending_tool_nonce")
assert isinstance(pending_tool_nonce, str)
guarded_tool_args: dict[str, object] | None = None
has_matching_tool_args_event = False
events = run_result.get("events")
if isinstance(events, list):
for event in events:
if not isinstance(event, dict):
continue
if event.get("type") != "TOOL_CALL_ARGS":
continue
if event.get("toolCallId") != pending_tool_call_id:
continue
has_matching_tool_args_event = True
delta = event.get("delta")
if not isinstance(delta, str):
continue
try:
parsed_delta = json.loads(delta)
except (TypeError, ValueError):
continue
if isinstance(parsed_delta, dict):
guarded_tool_args = parsed_delta
break
if has_matching_tool_args_event:
assert guarded_tool_args is not None
if guarded_tool_args is None:
guarded_tool_args = {
"target": "/calendar/dayweek",
"replace": False,
"__nonce": pending_tool_nonce,
}
assert guarded_tool_args.get("__nonce") == pending_tool_nonce
await run_agent_task(
{
"command": "resume",
"run_input": {
"threadId": str(session_id),
"runId": "run-live-front-2",
"state": {},
"messages": [
{
"id": "tool-1",
"role": "tool",
"toolCallId": pending_tool_call_id,
"content": json.dumps(
{
"toolName": "front.navigate_to_route",
"toolArgs": guarded_tool_args,
"nonce": pending_tool_nonce,
"result": {
"ok": True,
"route": "/calendar/dayweek",
},
},
ensure_ascii=True,
separators=(",", ":"),
),
}
],
"tools": [],
"context": [],
"forwardedProps": {},
},
},
publish_event=_publish,
enqueue_command=_enqueue,
run_service=RunService(),
resume_service=ResumeService(),
)
assert len(queued_commands) == 1
await run_agent_task(
queued_commands[0],
publish_event=_publish,
enqueue_command=_enqueue,
run_service=RunService(),
resume_service=ResumeService(),
)
await engine.dispose()
async with AsyncSessionLocal() as session:
chat_session = await session.get(AgentChatSession, session_id)
assert chat_session is not None
assert chat_session.status == AgentChatSessionStatus.COMPLETED
rows = await session.execute(
select(AgentChatMessage)
.where(AgentChatMessage.session_id == session_id)
.order_by(AgentChatMessage.seq.asc())
)
messages = list(rows.scalars().all())
assert any(m.role == AgentChatMessageRole.TOOL for m in messages)
assert chat_session.total_cost >= Decimal("0")
assert "RUN_STARTED" in published_events
assert "RUN_FINISHED" in published_events
finally:
await _cleanup_session_and_agent(
session_id=session_id,
agent_type=agent_type,
owner_id=owner_id,
llm_id_to_cleanup=llm_cleanup_id,
factory_id_to_cleanup=factory_cleanup_id,
)
await _cleanup_auth_user(admin_client=admin_client, user_id=test_user_id)
await supabase_service.close()