feat(agent): add voice input capability and standardize tool naming

- Add voice recording with transcribe endpoint (ASR) for multimodal input
- Android: add RECORD_AUDIO and INTERNET permissions
- Refactor tool naming: frontend tools use 'front.' prefix, backend tools use 'back.'
- Migrate calendar tools: create_calendar_event -> back.mutate/list/delete events
- Add calendar_event_list.v1 and calendar_operation.v1 UI card types
- Update all Flutter and Python tests to match new tool naming conventions
- Add record package dependency for voice recording
This commit is contained in:
zl-q
2026-03-09 00:10:09 +08:00
parent 6c83e35a69
commit 3ac09475ad
30 changed files with 1593 additions and 438 deletions
@@ -141,11 +141,17 @@ class CrewAIRuntime:
@staticmethod
def _sanitize_backend_args(execution_data: dict[str, Any]) -> dict[str, object]:
dropped = {"event_id", "id", "message", "status", "result"}
dropped = {"event_id", "id", "message", "result"}
cleaned: dict[str, object] = {}
for key, value in execution_data.items():
if not isinstance(key, str) or key in dropped:
continue
if (
key == "status"
and isinstance(value, str)
and value.upper() in {"SUCCESS", "PARTIAL", "FAILED"}
):
continue
if isinstance(value, (str, int, float, bool)) or value is None:
cleaned[key] = value
return cleaned
@@ -170,7 +176,7 @@ class CrewAIRuntime:
):
return None
backend_names = self._backend_tool_names(execution_tools)
if len(backend_names) != 1:
if not backend_names:
return None
if not hasattr(execution_result, "status") or not hasattr(
execution_result, "execution_data"
@@ -190,7 +196,39 @@ class CrewAIRuntime:
args = self._sanitize_backend_args(raw_data)
if not args:
return None
tool_name = backend_names[0]
if len(backend_names) == 1:
tool_name = backend_names[0]
else:
mutate_name = "back.mutate_calendar_event"
list_name = "back.list_calendar_events"
write_keys = {
"operation",
"eventId",
"title",
"description",
"startAt",
"endAt",
"timezone",
"location",
"color",
"status",
}
list_keys = {"page", "pageSize"}
has_write_keys = any(key in args for key in write_keys)
has_event_id = "eventId" in args
if mutate_name in backend_names and has_write_keys:
tool_name = mutate_name
if "operation" not in args:
if has_event_id:
return None
args = {"operation": "create", **args}
elif list_name in backend_names and (
any(key in args for key in list_keys)
or not any(key in args for key in write_keys)
):
tool_name = list_name
else:
return None
result = self._backend_tool_handler(tool_name, args)
synthesized_call = {
"name": tool_name,
@@ -1,11 +1,13 @@
from __future__ import annotations
from core.agent.infrastructure.crewai.tools.create_calendar_event_tool import (
CREATE_CALENDAR_EVENT_TOOL,
LIST_CALENDAR_EVENTS_TOOL,
MUTATE_CALENDAR_EVENT_TOOL,
)
REGISTERED_TOOLS = {
CREATE_CALENDAR_EVENT_TOOL.name: CREATE_CALENDAR_EVENT_TOOL,
LIST_CALENDAR_EVENTS_TOOL.name: LIST_CALENDAR_EVENTS_TOOL,
MUTATE_CALENDAR_EVENT_TOOL.name: MUTATE_CALENDAR_EVENT_TOOL,
}
__all__ = ["REGISTERED_TOOLS"]
@@ -1,5 +1,6 @@
from __future__ import annotations
import re
from datetime import datetime, timedelta, timezone
from uuid import UUID
@@ -8,10 +9,18 @@ from sqlalchemy.ext.asyncio import AsyncSession
from core.auth.models import CurrentUser
from core.agent.infrastructure.crewai.tools.base import CrewAIToolSpec
from v1.schedule_items.repository import SQLAlchemyScheduleItemRepository
from v1.schedule_items.schemas import ScheduleItemCreateRequest, ScheduleItemMetadata
from v1.schedule_items.schemas import (
ScheduleItemCreateRequest,
ScheduleItemMetadata,
ScheduleItemStatus,
ScheduleItemUpdateRequest,
)
from v1.schedule_items.service import ScheduleItemService
_HEX_COLOR_PATTERN = re.compile(r"^#[0-9A-Fa-f]{6}$")
def _parse_datetime(value: object) -> datetime | None:
if not isinstance(value, str) or not value:
return None
@@ -24,10 +33,122 @@ def _parse_datetime(value: object) -> datetime | None:
return None
async def _execute_create_calendar_event(
def _parse_positive_int(
value: object,
*,
default: int,
minimum: int,
maximum: int,
) -> int:
if isinstance(value, bool):
return default
candidate: int | float | str
if isinstance(value, (int, float, str)):
candidate = value
else:
return default
if isinstance(candidate, str):
candidate = candidate.strip()
try:
parsed = int(candidate)
except (TypeError, ValueError):
return default
if parsed < minimum:
return minimum
if parsed > maximum:
return maximum
return parsed
def _parse_event_id(value: object) -> UUID:
if not isinstance(value, str) or not value.strip():
raise ValueError("eventId is required")
try:
return UUID(value)
except ValueError as exc:
raise ValueError("eventId must be a valid UUID") from exc
def _service(session: AsyncSession, owner_id: UUID) -> ScheduleItemService:
return ScheduleItemService(
repository=SQLAlchemyScheduleItemRepository(session),
session=session,
current_user=CurrentUser(id=owner_id),
)
def _resolve_metadata(tool_args: dict[str, object]) -> ScheduleItemMetadata:
location = tool_args.get("location")
location_value = location.strip() if isinstance(location, str) else None
color = tool_args.get("color")
raw_color = color.strip() if isinstance(color, str) and color.strip() else "#4F46E5"
color_value = raw_color if _HEX_COLOR_PATTERN.match(raw_color) else "#4F46E5"
return ScheduleItemMetadata(location=location_value, color=color_value)
def _event_payload(event: object) -> dict[str, object]:
event_id = str(getattr(event, "id"))
metadata = getattr(event, "metadata", None)
location_value = getattr(metadata, "location", None)
color_value = getattr(metadata, "color", None) or "#4F46E5"
return {
"id": event_id,
"title": getattr(event, "title"),
"description": getattr(event, "description"),
"startAt": getattr(event, "start_at").isoformat(),
"endAt": (
getattr(event, "end_at").isoformat()
if getattr(event, "end_at") is not None
else None
),
"timezone": getattr(event, "timezone"),
"location": location_value,
"color": color_value,
}
async def _execute_list_calendar_events(
session: AsyncSession,
owner_id: UUID,
tool_args: dict[str, object],
) -> dict[str, object]:
page = _parse_positive_int(
tool_args.get("page"),
default=1,
minimum=1,
maximum=100000,
)
page_size = _parse_positive_int(
tool_args.get("pageSize"),
default=20,
minimum=1,
maximum=100,
)
service = _service(session, owner_id)
items, total = await service.list_paginated(page=page, page_size=page_size)
total_pages = max(1, (total + page_size - 1) // page_size) if total else 0
return {
"type": "calendar_event_list.v1",
"version": "v1",
"data": {
"items": [_event_payload(item) for item in items],
"pagination": {
"page": page,
"pageSize": page_size,
"total": total,
"totalPages": total_pages,
},
"ok": True,
"message": "已获取日程列表",
},
"actions": [],
}
async def _execute_create(
*,
service: ScheduleItemService,
tool_args: dict[str, object],
) -> dict[str, object]:
title = str(tool_args.get("title", "新的日程")).strip() or "新的日程"
description = str(tool_args.get("description", "")).strip() or None
@@ -35,15 +156,8 @@ async def _execute_create_calendar_event(
if start_at is None:
start_at = datetime.now(timezone.utc) + timedelta(hours=1)
end_at = _parse_datetime(tool_args.get("endAt"))
timezone_value = str(tool_args.get("timezone", "Asia/Shanghai"))
location = tool_args.get("location")
location_value = str(location) if isinstance(location, str) else None
metadata = ScheduleItemMetadata(location=location_value, color="#4F46E5")
service = ScheduleItemService(
repository=SQLAlchemyScheduleItemRepository(session),
session=session,
current_user=CurrentUser(id=owner_id),
timezone_value = (
str(tool_args.get("timezone", "Asia/Shanghai")).strip() or "Asia/Shanghai"
)
created = await service.create_agent_generated(
ScheduleItemCreateRequest(
@@ -52,22 +166,16 @@ async def _execute_create_calendar_event(
start_at=start_at,
end_at=end_at,
timezone=timezone_value,
metadata=metadata,
metadata=_resolve_metadata(tool_args),
)
)
event_id = str(created.id)
event_data = _event_payload(created)
event_id = str(event_data["id"])
return {
"type": "calendar_card.v1",
"version": "v1",
"data": {
"id": event_id,
"title": created.title,
"description": created.description,
"startAt": created.start_at.isoformat(),
"endAt": created.end_at.isoformat() if created.end_at is not None else None,
"timezone": created.timezone,
"location": location_value,
"color": "#4F46E5",
**event_data,
"sourceType": "agent_generated",
"ok": True,
"message": "日程已创建",
@@ -82,8 +190,125 @@ async def _execute_create_calendar_event(
}
CREATE_CALENDAR_EVENT_TOOL = CrewAIToolSpec(
name="back.create_calendar_event",
async def _execute_update(
*,
service: ScheduleItemService,
tool_args: dict[str, object],
) -> dict[str, object]:
event_id = _parse_event_id(tool_args.get("eventId"))
update_data: dict[str, object] = {}
for source_key, target_key in (
("title", "title"),
("description", "description"),
("timezone", "timezone"),
):
value = tool_args.get(source_key)
if isinstance(value, str):
update_data[target_key] = value.strip()
start_at = _parse_datetime(tool_args.get("startAt"))
if start_at is not None:
update_data["start_at"] = start_at
end_at = _parse_datetime(tool_args.get("endAt"))
if end_at is not None:
update_data["end_at"] = end_at
status_value = tool_args.get("status")
if isinstance(status_value, str) and status_value.strip():
try:
update_data["status"] = ScheduleItemStatus(status_value.strip().lower())
except ValueError as exc:
raise ValueError(
"status must be one of: active, completed, canceled, archived"
) from exc
has_location = isinstance(tool_args.get("location"), str)
has_color = isinstance(tool_args.get("color"), str)
if has_location or has_color:
existing = await service.get_by_id(event_id)
metadata_dump = (
existing.metadata.model_dump() if existing.metadata is not None else {}
)
if has_location:
metadata_dump["location"] = str(tool_args.get("location")).strip() or None
if has_color:
color = str(tool_args.get("color")).strip()
if not color:
metadata_dump["color"] = None
elif _HEX_COLOR_PATTERN.match(color):
metadata_dump["color"] = color
else:
raise ValueError("color must be a hex string like #RRGGBB")
update_data["metadata"] = ScheduleItemMetadata.model_validate(metadata_dump)
updated = await service.update(
event_id,
ScheduleItemUpdateRequest.model_validate(update_data),
)
event_data = _event_payload(updated)
return {
"type": "calendar_card.v1",
"version": "v1",
"data": {
**event_data,
"sourceType": "agent_generated",
"ok": True,
"message": "日程已更新",
},
"actions": [
{
"type": "link",
"label": "查看详情",
"target": f"/calendar/events/{event_data['id']}",
}
],
}
async def _execute_delete(
*,
service: ScheduleItemService,
tool_args: dict[str, object],
) -> dict[str, object]:
event_id = _parse_event_id(tool_args.get("eventId"))
await service.delete(event_id)
return {
"type": "calendar_operation.v1",
"version": "v1",
"data": {
"operation": "delete",
"id": str(event_id),
"ok": True,
"message": "日程已删除",
},
"actions": [],
}
async def _execute_mutate_calendar_event(
session: AsyncSession,
owner_id: UUID,
tool_args: dict[str, object],
) -> dict[str, object]:
operation_raw = tool_args.get("operation")
if not isinstance(operation_raw, str) or not operation_raw.strip():
raise ValueError("operation is required")
operation = operation_raw.strip().lower()
service = _service(session, owner_id)
if operation == "create":
return await _execute_create(service=service, tool_args=tool_args)
if operation == "update":
return await _execute_update(service=service, tool_args=tool_args)
if operation == "delete":
return await _execute_delete(service=service, tool_args=tool_args)
raise ValueError("operation must be one of: create, update, delete")
LIST_CALENDAR_EVENTS_TOOL = CrewAIToolSpec(
name="back.list_calendar_events",
target="backend",
executor=_execute_create_calendar_event,
executor=_execute_list_calendar_events,
)
MUTATE_CALENDAR_EVENT_TOOL = CrewAIToolSpec(
name="back.mutate_calendar_event",
target="backend",
executor=_execute_mutate_calendar_event,
)
@@ -4,7 +4,10 @@ from core.agent.infrastructure.crewai.tools import REGISTERED_TOOLS
STAGE_TOOL_ALLOWLIST: dict[str, list[str]] = {
"intent": [],
"execution": ["back.create_calendar_event"],
"execution": [
"back.list_calendar_events",
"back.mutate_calendar_event",
],
"organization": [],
}
+46 -4
View File
@@ -4,7 +4,7 @@ from datetime import datetime, timezone
from typing import TYPE_CHECKING, Protocol
from uuid import UUID
from sqlalchemy import select, update
from sqlalchemy import func, select, update
from sqlalchemy.exc import SQLAlchemyError
from core.db.base_repository import BaseRepository
@@ -33,6 +33,13 @@ class ScheduleItemRepository(Protocol):
async def list_by_date_range(
self, owner_id: UUID, start_at: datetime, end_at: datetime
) -> list[ScheduleItem]: ...
async def list_paginated(
self,
owner_id: UUID,
*,
page: int,
page_size: int,
) -> tuple[list[ScheduleItem], int]: ...
async def create_subscription(self, data: dict) -> ScheduleSubscription: ...
@@ -131,11 +138,46 @@ class SQLAlchemyScheduleItemRepository(BaseRepository[ScheduleItem]):
logger.exception("Schedule item list failed", owner_id=str(owner_id))
raise
async def list_paginated(
self,
owner_id: UUID,
*,
page: int,
page_size: int,
) -> tuple[list[ScheduleItem], int]:
offset = (page - 1) * page_size
try:
count_stmt = (
select(func.count())
.select_from(ScheduleItem)
.where(ScheduleItem.owner_id == owner_id)
.where(ScheduleItem.deleted_at.is_(None))
)
count_result = await self._session.execute(count_stmt)
total = int(count_result.scalar_one() or 0)
items_stmt = (
select(ScheduleItem)
.where(ScheduleItem.owner_id == owner_id)
.where(ScheduleItem.deleted_at.is_(None))
.order_by(ScheduleItem.start_at.asc(), ScheduleItem.id.asc())
.offset(offset)
.limit(page_size)
)
items_result = await self._session.execute(items_stmt)
items = list(items_result.scalars().all())
return items, total
except SQLAlchemyError:
logger.exception(
"Schedule item paginated list failed",
owner_id=str(owner_id),
page=page,
page_size=page_size,
)
raise
async def create_subscription(self, data: dict) -> ScheduleSubscription:
sub = ScheduleSubscription(**data)
self._session.add(sub)
await self._session.flush()
return sub
async def get_by_id(self, entity_id: UUID) -> ScheduleItem | None:
return await super().get_by_id(entity_id)
+28
View File
@@ -202,6 +202,34 @@ class ScheduleItemService(BaseService):
return [self._to_response(item) for item in items]
async def list_paginated(
self,
*,
page: int,
page_size: int,
) -> tuple[list[ScheduleItemResponse], int]:
user_id = self.require_user_id()
if page < 1:
raise HTTPException(status_code=400, detail="page must be >= 1")
if page_size < 1 or page_size > 100:
raise HTTPException(status_code=400, detail="page_size must be 1..100")
try:
items, total = await self._repository.list_paginated(
user_id,
page=page,
page_size=page_size,
)
except SQLAlchemyError:
logger.exception(
"Failed to list schedule items with pagination",
page=page,
page_size=page_size,
)
raise HTTPException(
status_code=503, detail="Schedule item store unavailable"
)
return [self._to_response(item) for item in items], total
async def share(
self, item_id: UUID, request: ScheduleItemShareRequest
) -> ScheduleItemShareResponse:
+1 -1
View File
@@ -355,7 +355,7 @@ async def test_agent_live_image_calendar_tool_persistence() -> None:
else:
payload = json.loads(str(downloaded))
assert payload["toolName"] == "back.create_calendar_event"
assert payload["toolName"] == "back.mutate_calendar_event"
finally:
if uploaded_paths:
try:
@@ -1,65 +0,0 @@
from __future__ import annotations
from datetime import datetime, timezone
from types import SimpleNamespace
from typing import cast
from uuid import uuid4
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from core.agent.infrastructure.crewai.tools.backend.create_calendar_event_tool import (
_execute_create_calendar_event,
)
@pytest.mark.asyncio
async def test_create_calendar_event_tool_returns_ui_schema_v1_top_level(
monkeypatch: pytest.MonkeyPatch,
) -> None:
event_id = uuid4()
created = SimpleNamespace(
id=event_id,
title="晨会",
description="同步计划",
start_at=datetime(2026, 3, 8, 1, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 3, 8, 2, 0, tzinfo=timezone.utc),
timezone="Asia/Shanghai",
)
class _FakeService:
def __init__(self, **kwargs) -> None:
del kwargs
async def create_agent_generated(self, payload):
del payload
return created
class _FakeRepository:
def __init__(self, session) -> None:
del session
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.backend.create_calendar_event_tool.ScheduleItemService",
_FakeService,
)
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.backend.create_calendar_event_tool.SQLAlchemyScheduleItemRepository",
_FakeRepository,
)
result = cast(
dict[str, object],
await _execute_create_calendar_event(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={"title": "晨会"},
),
)
assert result["type"] == "calendar_card.v1"
assert result["version"] == "v1"
data = cast(dict[str, object], result["data"])
actions = cast(list[dict[str, object]], result["actions"])
assert data["id"] == str(event_id)
assert actions
@@ -119,7 +119,8 @@ def test_runtime_needs_execution_and_collects_front_tool_call() -> None:
assert isinstance(tools, list)
assert any(t.get("name") == "front.navigate_to_route" for t in tools)
execution_tools = cast(list[dict[str, object]], calls[1]["tools"])
assert any(t.get("name") == "back.create_calendar_event" for t in execution_tools)
assert any(t.get("name") == "back.list_calendar_events" for t in execution_tools)
assert any(t.get("name") == "back.mutate_calendar_event" for t in execution_tools)
assert result["assistant_text"] == "do it"
assert result["pending_front_tool"] == {
"name": "front.navigate_to_route",
@@ -191,7 +192,7 @@ def test_runtime_multimodal_intent_receives_execution_tool_awareness() -> None:
calls.append({"stage": stage, "tools": tools})
if stage == "intent":
return (
'{"route":"NEEDS_EXECUTION","intent_summary":"need tool","execution_brief":"call back.create_calendar_event","safety_flags":[]}',
'{"route":"NEEDS_EXECUTION","intent_summary":"need tool","execution_brief":"call back.mutate_calendar_event","safety_flags":[]}',
UsageCost(1, 1, 2, 0.01),
[],
None,
@@ -218,7 +219,8 @@ def test_runtime_multimodal_intent_receives_execution_tool_awareness() -> None:
)
intent_tools = cast(list[dict[str, object]], calls[0]["tools"])
assert any(t.get("name") == "back.create_calendar_event" for t in intent_tools)
assert any(t.get("name") == "back.list_calendar_events" for t in intent_tools)
assert any(t.get("name") == "back.mutate_calendar_event" for t in intent_tools)
def test_runtime_synthesizes_backend_call_when_model_skips_react_tool_call() -> None:
@@ -267,18 +269,78 @@ def test_runtime_synthesizes_backend_call_when_model_skips_react_tool_call() ->
assert backend_calls == [
(
"back.create_calendar_event",
{"title": "项目评审", "timezone": "Asia/Shanghai"},
"back.mutate_calendar_event",
{
"operation": "create",
"title": "项目评审",
"timezone": "Asia/Shanghai",
},
)
]
tool_calls = cast(list[dict[str, object]], result["tool_calls"])
assert any(
call.get("target") == "backend"
and call.get("name") == "back.create_calendar_event"
and call.get("name") == "back.mutate_calendar_event"
for call in tool_calls
)
def test_runtime_does_not_synthesize_mutate_create_when_event_id_without_operation() -> (
None
):
runtime = _build_runtime()
backend_calls: list[tuple[str, dict[str, object]]] = []
def _backend_handler(
tool_name: str, tool_args: dict[str, object]
) -> dict[str, object]:
backend_calls.append((tool_name, tool_args))
return {"type": "ok", "version": "v1", "data": {}, "actions": []}
runtime.set_backend_tool_handler(_backend_handler)
def _fake_run_stage(self, **kwargs):
stage = kwargs["stage"]
if stage == "intent":
return (
'{"route":"NEEDS_EXECUTION","intent_summary":"update event","execution_brief":"update via backend tool","safety_flags":[]}',
UsageCost(1, 1, 2, 0.01),
[],
None,
)
if stage == "execution":
return (
'{"status":"SUCCESS","execution_summary":"updated","execution_data":{"eventId":"1c7e85f6-a2b4-4da3-a143-7f9af8ea1a3d","title":"修正标题"},"report_brief":"done"}',
UsageCost(2, 2, 4, 0.02),
[],
None,
)
return (
'{"assistant_text":"ok","response_metadata":{}}',
UsageCost(1, 1, 2, 0.01),
[],
None,
)
runtime._run_stage_with_crewai = MethodType(_fake_run_stage, runtime) # type: ignore[method-assign]
runtime.execute(user_input="更新日程", tools=[])
assert backend_calls == []
def test_runtime_sanitize_backend_args_keeps_business_status() -> None:
payload = {
"status": "completed",
"title": "日程",
"result": "ignore",
"id": "ignore",
}
assert CrewAIRuntime._sanitize_backend_args(payload) == {
"status": "completed",
"title": "日程",
}
def test_runtime_extracts_pending_front_tool_from_approval_required_shape() -> None:
runtime = _build_runtime()
@@ -423,7 +485,8 @@ def test_run_stage_with_crewai_uses_output_pydantic_for_stage(
def test_runtime_backend_registry_check() -> None:
runtime = _build_runtime()
assert runtime.is_registered_backend_tool("back.create_calendar_event") is True
assert runtime.is_registered_backend_tool("back.list_calendar_events") is True
assert runtime.is_registered_backend_tool("back.mutate_calendar_event") is True
assert runtime.is_registered_backend_tool("back.unknown") is False
@@ -0,0 +1,128 @@
from __future__ import annotations
from datetime import datetime, timezone
from types import SimpleNamespace
from typing import cast
from uuid import uuid4
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from core.agent.infrastructure.crewai.tools.create_calendar_event_tool import (
_execute_list_calendar_events,
)
@pytest.mark.asyncio
async def test_list_calendar_events_tool_returns_paginated_payload_v1(
monkeypatch: pytest.MonkeyPatch,
) -> None:
first_id = uuid4()
second_id = uuid4()
items = [
SimpleNamespace(
id=first_id,
title="晨会",
description="同步",
start_at=datetime(2026, 3, 8, 1, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 3, 8, 2, 0, tzinfo=timezone.utc),
timezone="Asia/Shanghai",
metadata=SimpleNamespace(location="会议室A", color="#4F46E5"),
),
SimpleNamespace(
id=second_id,
title="评审",
description=None,
start_at=datetime(2026, 3, 8, 3, 0, tzinfo=timezone.utc),
end_at=None,
timezone="Asia/Shanghai",
metadata=None,
),
]
class _FakeService:
def __init__(self, **kwargs) -> None:
del kwargs
async def list_paginated(self, *, page: int, page_size: int):
assert page == 2
assert page_size == 10
return items, 37
class _FakeRepository:
def __init__(self, session) -> None:
del session
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.ScheduleItemService",
_FakeService,
)
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.SQLAlchemyScheduleItemRepository",
_FakeRepository,
)
result = cast(
dict[str, object],
await _execute_list_calendar_events(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={"page": 2, "pageSize": 10},
),
)
assert result["type"] == "calendar_event_list.v1"
assert result["version"] == "v1"
data = cast(dict[str, object], result["data"])
pagination = cast(dict[str, object], data["pagination"])
events = cast(list[dict[str, object]], data["items"])
assert pagination == {
"page": 2,
"pageSize": 10,
"total": 37,
"totalPages": 4,
}
assert events[0]["id"] == str(first_id)
assert events[0]["title"] == "晨会"
assert events[1]["id"] == str(second_id)
@pytest.mark.asyncio
async def test_list_calendar_events_tool_uses_default_pagination_when_missing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class _FakeService:
def __init__(self, **kwargs) -> None:
del kwargs
async def list_paginated(self, *, page: int, page_size: int):
assert page == 1
assert page_size == 20
return [], 0
class _FakeRepository:
def __init__(self, session) -> None:
del session
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.ScheduleItemService",
_FakeService,
)
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.SQLAlchemyScheduleItemRepository",
_FakeRepository,
)
result = cast(
dict[str, object],
await _execute_list_calendar_events(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={},
),
)
data = cast(dict[str, object], result["data"])
pagination = cast(dict[str, object], data["pagination"])
assert pagination["page"] == 1
assert pagination["pageSize"] == 20
@@ -0,0 +1,173 @@
from __future__ import annotations
from datetime import datetime, timezone
from types import SimpleNamespace
from typing import cast
from uuid import uuid4
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from core.agent.infrastructure.crewai.tools.create_calendar_event_tool import (
_execute_mutate_calendar_event,
)
@pytest.mark.asyncio
async def test_mutate_calendar_event_create_returns_calendar_card_v1(
monkeypatch: pytest.MonkeyPatch,
) -> None:
created_id = uuid4()
class _FakeService:
def __init__(self, **kwargs) -> None:
del kwargs
async def create_agent_generated(self, payload):
assert payload.title == "晨会"
return SimpleNamespace(
id=created_id,
title="晨会",
description="同步计划",
start_at=datetime(2026, 3, 8, 1, 0, tzinfo=timezone.utc),
end_at=datetime(2026, 3, 8, 2, 0, tzinfo=timezone.utc),
timezone="Asia/Shanghai",
metadata=SimpleNamespace(location="会议室A", color="#4F46E5"),
)
class _FakeRepository:
def __init__(self, session) -> None:
del session
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.ScheduleItemService",
_FakeService,
)
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.SQLAlchemyScheduleItemRepository",
_FakeRepository,
)
result = cast(
dict[str, object],
await _execute_mutate_calendar_event(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={
"operation": "create",
"title": "晨会",
"description": "同步计划",
"startAt": "2026-03-08T09:00:00+08:00",
"endAt": "2026-03-08T10:00:00+08:00",
"timezone": "Asia/Shanghai",
"location": "会议室A",
},
),
)
assert result["type"] == "calendar_card.v1"
data = cast(dict[str, object], result["data"])
assert data["id"] == str(created_id)
assert data["ok"] is True
@pytest.mark.asyncio
async def test_mutate_calendar_event_update_requires_event_id() -> None:
with pytest.raises(ValueError, match="eventId is required"):
await _execute_mutate_calendar_event(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={"operation": "update", "title": "新标题"},
)
@pytest.mark.asyncio
async def test_mutate_calendar_event_delete_returns_ack(
monkeypatch: pytest.MonkeyPatch,
) -> None:
deleted_id = uuid4()
class _FakeService:
def __init__(self, **kwargs) -> None:
del kwargs
async def delete(self, item_id):
assert item_id == deleted_id
return None
class _FakeRepository:
def __init__(self, session) -> None:
del session
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.ScheduleItemService",
_FakeService,
)
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.SQLAlchemyScheduleItemRepository",
_FakeRepository,
)
result = cast(
dict[str, object],
await _execute_mutate_calendar_event(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={"operation": "delete", "eventId": str(deleted_id)},
),
)
assert result["type"] == "calendar_operation.v1"
data = cast(dict[str, object], result["data"])
assert data["operation"] == "delete"
assert data["id"] == str(deleted_id)
assert data["ok"] is True
@pytest.mark.asyncio
async def test_mutate_calendar_event_rejects_invalid_operation() -> None:
with pytest.raises(ValueError, match="operation"):
await _execute_mutate_calendar_event(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={"operation": "upsert"},
)
@pytest.mark.asyncio
async def test_mutate_calendar_event_update_rejects_invalid_color(
monkeypatch: pytest.MonkeyPatch,
) -> None:
event_id = uuid4()
class _FakeService:
def __init__(self, **kwargs) -> None:
del kwargs
async def get_by_id(self, item_id):
assert item_id == event_id
return SimpleNamespace(metadata=None)
class _FakeRepository:
def __init__(self, session) -> None:
del session
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.ScheduleItemService",
_FakeService,
)
monkeypatch.setattr(
"core.agent.infrastructure.crewai.tools.create_calendar_event_tool.SQLAlchemyScheduleItemRepository",
_FakeRepository,
)
with pytest.raises(ValueError, match="color"):
await _execute_mutate_calendar_event(
session=cast(AsyncSession, SimpleNamespace()),
owner_id=uuid4(),
tool_args={
"operation": "update",
"eventId": str(event_id),
"color": "blue",
},
)
@@ -646,7 +646,7 @@ async def test_run_service_passes_user_context_system_prompt_to_runtime(
tool_args,
):
del session, owner_id
assert tool_name == "back.create_calendar_event"
assert tool_name == "back.mutate_calendar_event"
assert "title" in tool_args
return {
"result": {"eventId": "evt-1", "ok": True},
@@ -788,7 +788,7 @@ async def test_run_service_emits_frontend_tool_pending_events(
class _FakeRuntime:
def is_registered_backend_tool(self, tool_name: str) -> bool:
return tool_name == "back.create_calendar_event"
return tool_name == "back.mutate_calendar_event"
async def execute_backend_tool(
self,
@@ -799,7 +799,7 @@ async def test_run_service_emits_frontend_tool_pending_events(
tool_args,
):
del session, owner_id
assert tool_name == "back.create_calendar_event"
assert tool_name == "back.mutate_calendar_event"
assert "title" in tool_args
return {
"result": {"eventId": "evt-1", "ok": True},
@@ -957,7 +957,7 @@ async def test_run_service_executes_backend_calendar_tool_and_emits_result(
class _FakeRuntime:
def is_registered_backend_tool(self, tool_name: str) -> bool:
return tool_name == "back.create_calendar_event"
return tool_name == "back.mutate_calendar_event"
async def execute_backend_tool(
self,
@@ -968,7 +968,7 @@ async def test_run_service_executes_backend_calendar_tool_and_emits_result(
tool_args,
):
del session, owner_id
assert tool_name == "back.create_calendar_event"
assert tool_name == "back.mutate_calendar_event"
assert "title" in tool_args
return {
"result": {"eventId": "evt-1", "ok": True},
@@ -1043,7 +1043,7 @@ async def test_run_service_executes_backend_calendar_tool_and_emits_result(
text="请安排一个明早会议",
tools=[
{
"name": "back.create_calendar_event",
"name": "back.mutate_calendar_event",
"description": "create calendar",
"parameters": {"type": "object"},
}
@@ -10,7 +10,10 @@ def test_load_crewai_stage_tools_returns_expected_defaults() -> None:
assert result == {
"intent": [],
"execution": ["back.create_calendar_event"],
"execution": [
"back.list_calendar_events",
"back.mutate_calendar_event",
],
"organization": [],
}