feat(agent): redesign project_cli with module/method/input protocol

- Replace command/subcommand/args with module/method/input envelope
- Calendar handler uses discriminated union (mode) for read operations
- Strict Pydantic models with extra='forbid' for all calendar methods
- Worker max_iters=7, router prompt simplified (removed project_cli_defaults)
- Skill index cards + per-action files for progressive disclosure
- Frontend/AG-UI aligned to module/method dispatch
- Protocol docs updated to module/method/input contract

WIP: action cards need envelope fix, 2 tests need update, memory
handler needs Pydantic models.
This commit is contained in:
qzl
2026-04-24 13:24:13 +08:00
parent ab526af2c4
commit d060962a5f
62 changed files with 4802 additions and 805 deletions
@@ -32,7 +32,9 @@ def test_react_agent_sys_prompt_includes_registered_skill_prompt() -> None:
assert "# Agent Skills" in prompt
assert "## calendar" in prompt
assert "## contacts" in prompt
assert "SKILL.md" in prompt
assert "view_skill_file" in prompt
assert 'file_path="calendar/SKILL.md"' in prompt
assert 'file_path="contacts/SKILL.md"' in prompt
def test_view_skill_file_tool_reads_registered_skill_content() -> None:
@@ -47,3 +49,18 @@ def test_view_skill_file_tool_reads_registered_skill_content() -> None:
block = response.content[0]
text = block["text"] if isinstance(block, dict) else block.text
assert "Calendar Skill" in text or "name: calendar" in text
def test_view_skill_file_tool_reads_calendar_action_card() -> None:
toolkit = build_toolkit(enabled_skill_names={"calendar"})
tool = toolkit.tools["view_skill_file"].original_func
response = asyncio.run(
tool(file_path="calendar/actions/create_event.md", ranges=[1, 20]),
)
assert response.content
block = response.content[0]
text = block["text"] if isinstance(block, dict) else block.text
assert "create_event" in text
assert "input.title" in text
@@ -252,8 +252,8 @@ async def test_calendar_create_skill_creates_db_record() -> None:
assert cli_result.get("status") == "success", f"Tool call failed: {cli_result}"
args = cli_result.get("tool_call_args", {})
assert args.get("command") == "calendar"
assert args.get("subcommand") == "create"
assert args.get("module") == "calendar"
assert args.get("method") == "create"
result_payload = cli_result.get("result")
assert isinstance(result_payload, dict), f"Unexpected result payload: {cli_result}"
@@ -317,8 +317,8 @@ async def test_calendar_read_skill_queries_db() -> None:
assert cli_result.get("status") in {"success", "partial"}, f"Tool call failed: {cli_result}"
args = cli_result.get("tool_call_args", {})
assert args.get("command") == "calendar"
assert args.get("subcommand") == "read"
assert args.get("module") == "calendar"
assert args.get("method") in {"read"}
@pytest.mark.asyncio
@@ -355,8 +355,8 @@ async def test_contacts_read_skill_queries_db() -> None:
assert cli_result.get("status") in {"success", "partial"}, f"Tool call failed: {cli_result}"
args = cli_result.get("tool_call_args", {})
assert args.get("command") == "contacts"
assert args.get("subcommand") == "read"
assert args.get("module") == "contacts"
assert args.get("method") == "read"
@pytest.mark.asyncio
@@ -398,8 +398,8 @@ async def test_memory_update_skill_via_automation() -> None:
assert cli_result.get("status") in {"success", "partial"}, f"Tool call failed: {cli_result}"
args = cli_result.get("tool_call_args", {})
assert args.get("command") == "memory"
assert args.get("subcommand") == "update"
assert args.get("module") == "memory"
assert args.get("method") == "update"
if user_id:
time.sleep(1)
+16 -10
View File
@@ -183,7 +183,6 @@ async def test_agent_calendar_read_via_cli() -> None:
tool_names = [result.get("tool_name") for result in tool_call_results]
assert "view_skill_file" in tool_names
assert "project_cli" in tool_names
assert tool_names.index("view_skill_file") < tool_names.index("project_cli")
view_result = next(
result for result in tool_call_results if result.get("tool_name") == "view_skill_file"
@@ -193,22 +192,27 @@ async def test_agent_calendar_read_via_cli() -> None:
assert isinstance(view_args, dict)
assert view_args.get("file_path") == "calendar/SKILL.md"
result = next(
result for result in tool_call_results if result.get("tool_name") == "project_cli"
)
successful_project_cli_results = [
result
for result in tool_call_results
if result.get("tool_name") == "project_cli"
and result.get("status") in {"success", "partial"}
]
assert successful_project_cli_results, "expected at least one successful project_cli result"
result = successful_project_cli_results[-1]
assert result.get("status") in {"success", "failure", "partial"}
tool_call_args = result.get("tool_call_args")
assert isinstance(tool_call_args, dict)
assert tool_call_args.get("command") == "calendar"
assert tool_call_args.get("subcommand") == "read"
assert tool_call_args.get("module") == "calendar"
assert tool_call_args.get("method") in {"read"}
raw_result = result.get("result")
if isinstance(raw_result, str):
raw_result = json.loads(raw_result)
assert isinstance(raw_result, dict), f"result should be dict, got {type(raw_result)}"
assert raw_result.get("command") == "calendar"
assert raw_result.get("subcommand") == "read"
assert raw_result.get("module") == "calendar"
assert raw_result.get("method") in {"read"}
if "ui_schema" in result:
ui_schema = result["ui_schema"]
@@ -285,8 +289,10 @@ async def test_tool_ui_schema_in_history() -> None:
except (json.JSONDecodeError, ValueError):
pass
assert isinstance(result, dict), f"result in DB should be dict, got {type(result)}: {result!r}"
assert result.get("command") == "calendar"
assert result.get("subcommand") == "read"
if tool_agent_output.get("status") == "failure":
continue
assert result.get("module") == "calendar"
assert result.get("method") in {"read"}
ui_hints = tool_agent_output.get("ui_hints")
assert isinstance(ui_hints, dict), f"ui_hints should be dict, got {type(ui_hints)}"
View File
+196
View File
@@ -0,0 +1,196 @@
from __future__ import annotations
import os
import time
from pathlib import Path
from uuid import uuid4
import httpx
import jwt
def _load_env() -> None:
env_path = Path(__file__).resolve().parents[3] / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
_load_env()
BASE_URL = os.getenv("AGENT_LIVE_BASE_URL", "http://localhost:5775")
def get_jwt_secret() -> str:
secret = (
os.getenv("SOCIAL_SUPABASE__JWT_SECRET")
or os.getenv("SUPABASE_JWT_SECRET")
or os.getenv("JWT_SECRET")
)
if not secret:
raise RuntimeError("JWT_SECRET not found in environment")
return secret
def get_supabase_url() -> str:
return (
os.getenv("SOCIAL_SUPABASE__URL")
or os.getenv("SUPABASE_URL")
or "http://localhost:54321"
)
def get_test_user_id() -> str:
user_id = os.getenv("TEST_USER_ID")
if user_id:
return user_id
raise RuntimeError("TEST_USER_ID not set")
def create_test_jwt(user_id: str) -> str:
now = int(time.time())
payload = {
"sub": user_id,
"role": "authenticated",
"aud": "authenticated",
"iss": get_supabase_url(),
"iat": now,
"exp": now + 3600,
}
return jwt.encode(payload, get_jwt_secret(), algorithm="HS256")
async def run_agent_and_collect(
*,
user_message: str,
client: httpx.AsyncClient,
headers: dict,
run_id: str | None = None,
thread_id: str | None = None,
timeout: float = 120.0,
) -> AgentRunResult:
if thread_id is None:
thread_id = str(uuid4())
if run_id is None:
run_id = f"quality-{thread_id[:8]}"
t_start = time.monotonic()
run_resp = await client.post(
f"{BASE_URL}/api/v1/agent/runs",
headers=headers,
json={
"threadId": thread_id,
"runId": run_id,
"state": {},
"messages": [
{"id": "u1", "role": "user", "content": user_message}
],
"tools": [],
"context": [],
"forwardedProps": {"runtime_mode": "chat"},
},
)
run_data = run_resp.json()
effective_thread_id = str(run_data.get("threadId", thread_id))
effective_run_id = run_data.get("runId", run_id)
events_url = (
f"{BASE_URL}/api/v1/agent/runs/{effective_thread_id}/events"
f"?runId={effective_run_id}"
)
import json
tool_results: list[dict] = []
all_events: list[dict] = []
run_finished = False
final_answer = ""
async with client.stream(
"GET", events_url, headers=headers, timeout=timeout
) as sse_resp:
buffer = ""
async for line in sse_resp.aiter_lines():
if line.startswith("data:"):
data_str = line.split(":", 1)[1].strip()
if data_str:
buffer = data_str
elif line == "" and buffer:
try:
event_data = json.loads(buffer)
event_type = event_data.get("type")
all_events.append(event_data)
if event_type == "TOOL_CALL_RESULT":
tool_results.append(event_data)
elif event_type == "TEXT_MESSAGE_END":
final_answer = event_data.get("answer", "") or event_data.get("text", "")
elif event_type in {"RUN_FINISHED", "RUN_ERROR"}:
run_finished = True
except json.JSONDecodeError:
pass
buffer = ""
t_end = time.monotonic()
return AgentRunResult(
thread_id=effective_thread_id,
run_id=effective_run_id,
user_message=user_message,
final_answer=final_answer,
tool_results=tool_results,
all_events=all_events,
run_finished=run_finished,
latency_ms=round((t_end - t_start) * 1000),
)
class AgentRunResult:
def __init__(
self,
*,
thread_id: str,
run_id: str,
user_message: str,
final_answer: str,
tool_results: list[dict],
all_events: list[dict],
run_finished: bool,
latency_ms: int,
) -> None:
self.thread_id = thread_id
self.run_id = run_id
self.user_message = user_message
self.final_answer = final_answer
self.tool_results = tool_results
self.all_events = all_events
self.run_finished = run_finished
self.latency_ms = latency_ms
@property
def tool_names_called(self) -> list[str]:
return [
tr.get("tool_name", "") or tr.get("toolName", "")
for tr in self.tool_results
]
@property
def successful_tool_names(self) -> list[str]:
return [
tr.get("tool_name", "") or tr.get("toolName", "")
for tr in self.tool_results
if tr.get("status") in ("success", "partial")
]
@property
def has_tool_success(self) -> bool:
return len(self.successful_tool_names) > 0
@@ -0,0 +1,99 @@
from __future__ import annotations
from pydantic import BaseModel
class ScoreDetail(BaseModel):
criterion: str
passed: bool
note: str = ""
class ScenarioScore(BaseModel):
scenario_id: str
model_code: str
latency_ms: int
input_tokens: int = 0
output_tokens: int = 0
cost_usd: float = 0.0
tool_called: bool
tool_succeeded: bool
answer_quality: float
details: list[ScoreDetail]
raw_answer: str = ""
run_finished: bool = True
@property
def overall_score(self) -> float:
weights = {
"tool_correctness": 0.3,
"answer_quality": 0.5,
"latency": 0.2,
}
tool_score = 1.0 if self.tool_succeeded else (0.5 if self.tool_called else 0.0)
latency_score = self._latency_score()
return (
weights["tool_correctness"] * tool_score
+ weights["answer_quality"] * self.answer_quality
+ weights["latency"] * latency_score
)
def _latency_score(self) -> float:
if self.latency_ms <= 5000:
return 1.0
if self.latency_ms <= 15000:
return 0.7
if self.latency_ms <= 30000:
return 0.4
return 0.1
class ModelScorecard(BaseModel):
model_code: str
scenario_scores: list[ScenarioScore]
@property
def avg_overall(self) -> float:
if not self.scenario_scores:
return 0.0
return sum(s.overall_score for s in self.scenario_scores) / len(self.scenario_scores)
@property
def avg_latency_ms(self) -> float:
if not self.scenario_scores:
return 0.0
return sum(s.latency_ms for s in self.scenario_scores) / len(self.scenario_scores)
@property
def avg_cost_usd(self) -> float:
if not self.scenario_scores:
return 0.0
return sum(s.cost_usd for s in self.scenario_scores) / len(self.scenario_scores)
@property
def tool_success_rate(self) -> float:
if not self.scenario_scores:
return 0.0
return sum(1 for s in self.scenario_scores if s.tool_succeeded) / len(self.scenario_scores)
def summary_table(self) -> str:
lines = [
f"\n{'='*60}",
f"Model Scorecard: {self.model_code}",
f"{'='*60}",
f" Avg Overall Score : {self.avg_overall:.2f}",
f" Avg Latency : {self.avg_latency_ms:.0f}ms",
f" Avg Cost : ${self.avg_cost_usd:.6f}",
f" Tool Success Rate : {self.tool_success_rate:.0%}",
f"{'-'*60}",
]
for s in self.scenario_scores:
status = "PASS" if s.tool_succeeded else "FAIL"
lines.append(
f" [{status}] {s.scenario_id:<25} "
f"score={s.overall_score:.2f} "
f"lat={s.latency_ms}ms "
f"cost=${s.cost_usd:.6f}"
)
lines.append(f"{'='*60}")
return "\n".join(lines)
@@ -0,0 +1,82 @@
from __future__ import annotations
from pydantic import BaseModel
class EvalScenario(BaseModel):
id: str
prompt: str
category: str
expect_tool_use: bool
expect_tool_success: bool
quality_criteria: list[str]
CALENDAR_SCENARIOS: list[EvalScenario] = [
EvalScenario(
id="calendar-read-today",
prompt="请查询我今天的日程安排",
category="calendar",
expect_tool_use=True,
expect_tool_success=True,
quality_criteria=[
"应调用 project_cli 的 calendar.read 方法",
"input 应包含 mode=day 和具体日期",
"回答应基于工具返回的实际数据",
"如果无日程,应明确告知无日程",
],
),
EvalScenario(
id="calendar-create-event",
prompt="帮我创建一个明天下午3点两小时的会议,标题是项目周会",
category="calendar",
expect_tool_use=True,
expect_tool_success=True,
quality_criteria=[
"应调用 project_cli 的 calendar.create 方法",
"input 应包含 title、start_at、timezone",
"start_at 应为具体的时间戳而非自然语言",
"应返回创建结果(包含 event_id)",
],
),
EvalScenario(
id="calendar-read-range",
prompt="这周一到周五我有哪些日程?",
category="calendar",
expect_tool_use=True,
expect_tool_success=True,
quality_criteria=[
"应调用 project_cli 的 calendar.read 方法",
"input 应使用 mode=range 或多次 mode=day",
"应提供完整时间范围",
],
),
]
GENERAL_SCENARIOS: list[EvalScenario] = [
EvalScenario(
id="general-greeting",
prompt="你好,你是谁?",
category="general",
expect_tool_use=False,
expect_tool_success=False,
quality_criteria=[
"应简短自我介绍",
"不应调用任何工具",
"回答简洁不啰嗦",
],
),
EvalScenario(
id="general-farewell",
prompt="好的谢谢,再见",
category="general",
expect_tool_use=False,
expect_tool_success=False,
quality_criteria=[
"应礼貌告别",
"不应调用任何工具",
],
),
]
ALL_SCENARIOS = CALENDAR_SCENARIOS + GENERAL_SCENARIOS
+440
View File
@@ -0,0 +1,440 @@
from __future__ import annotations
import json
import os
import time
from uuid import uuid4
import httpx
import jwt
import pytest
from backend.tests.quality.evaluators import ModelScorecard, ScoreDetail, ScenarioScore
from backend.tests.quality.scenarios import ALL_SCENARIOS
CANDIDATE_MODELS = ["qwen3.5-flash", "deepseek-chat"]
MODEL_LLM_IDS = {
"qwen3.5-flash": "c625bce4-970e-4a76-bebe-cb8840fed854",
"deepseek-chat": "12bc1963-4b67-404b-b952-5948bea0f690",
}
BASE_URL = os.getenv("AGENT_LIVE_BASE_URL", "http://localhost:5775")
def _load_env() -> None:
from pathlib import Path
env_path = Path(__file__).resolve().parents[3] / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
_load_env()
def _get_jwt_secret() -> str:
secret = (
os.getenv("SOCIAL_SUPABASE__JWT_SECRET")
or os.getenv("SUPABASE_JWT_SECRET")
or os.getenv("JWT_SECRET")
)
if not secret:
raise RuntimeError("JWT_SECRET not found in environment")
return secret
def _get_supabase_url() -> str:
return (
os.getenv("SOCIAL_SUPABASE__PUBLIC_URL")
or os.getenv("SOCIAL_SUPABASE__URL")
or os.getenv("SUPABASE_URL")
or "http://localhost:54321"
)
def _get_supabase_key() -> str:
from core.config.settings import config
key = os.getenv("SOCIAL_SUPABASE__SERVICE_ROLE_KEY", "")
if key:
return key
return config.supabase.service_role_key
def _get_test_user_id() -> str:
user_id = os.getenv("TEST_USER_ID")
if user_id:
return user_id
raise RuntimeError("TEST_USER_ID not set")
def _create_jwt(user_id: str) -> str:
now = int(time.time())
payload = {
"sub": user_id,
"role": "authenticated",
"aud": "authenticated",
"iss": _get_supabase_url(),
"iat": now,
"exp": now + 3600,
}
return jwt.encode(payload, _get_jwt_secret(), algorithm="HS256")
async def _run_via_http(
*,
user_message: str,
token: str,
timeout: float = 120.0,
) -> dict:
thread_id = str(uuid4())
run_id = f"q-{uuid4().hex[:12]}"
async with httpx.AsyncClient(timeout=httpx.Timeout(timeout)) as client:
headers = {"Authorization": f"Bearer {token}"}
run_resp = await client.post(
f"{BASE_URL}/api/v1/agent/runs",
headers=headers,
json={
"threadId": thread_id,
"runId": run_id,
"state": {},
"messages": [
{"id": "u1", "role": "user", "content": user_message}
],
"tools": [],
"context": [],
"forwardedProps": {"runtime_mode": "chat"},
},
)
run_data = run_resp.json()
eff_thread = str(run_data.get("threadId", thread_id))
eff_run = run_data.get("runId", run_id)
events_url = (
f"{BASE_URL}/api/v1/agent/runs/{eff_thread}/events"
f"?runId={eff_run}"
)
t_start = time.monotonic()
tool_results: list[dict] = []
all_events: list[dict] = []
final_answer = ""
run_finished = False
token_usage: dict = {}
async with client.stream(
"GET", events_url, headers=headers, timeout=timeout
) as sse:
buffer = ""
async for line in sse.aiter_lines():
if line.startswith("data:"):
data_str = line.split(":", 1)[1].strip()
if data_str:
buffer = data_str
elif line == "" and buffer:
try:
ev = json.loads(buffer)
all_events.append(ev)
etype = ev.get("type")
if etype == "TOOL_CALL_RESULT":
tool_results.append(ev)
elif etype == "TEXT_MESSAGE_END":
final_answer = ev.get("answer", "") or ev.get("text", "")
token_usage = {
"totalTokens": ev.get("totalTokens", 0),
"inputTokens": ev.get("inputTokens", 0),
"outputTokens": ev.get("outputTokens", 0),
"promptCacheMissTokens": ev.get(
"promptCacheMissTokens", 0
),
"promptCacheHitTokens": ev.get(
"promptCacheHitTokens", 0
),
}
elif etype in {"RUN_FINISHED", "RUN_ERROR"}:
run_finished = True
except json.JSONDecodeError:
pass
buffer = ""
t_end = time.monotonic()
tool_names = [
tr.get("tool_name", "") or tr.get("toolName", "")
for tr in tool_results
]
successful_tool_names = [
tr.get("tool_name", "") or tr.get("toolName", "")
for tr in tool_results
if tr.get("status") in ("success", "partial")
]
return {
"final_answer": final_answer,
"tool_results": tool_results,
"tool_names": tool_names,
"successful_tool_names": successful_tool_names,
"run_finished": run_finished,
"latency_ms": round((t_end - t_start) * 1000),
"token_usage": token_usage,
}
def _switch_model(model_code: str) -> None:
from supabase import create_client
sb = create_client(_get_supabase_url(), _get_supabase_key())
llm_id = MODEL_LLM_IDS[model_code]
for agent_type in ("router", "worker"):
(
sb.table("system_agents")
.update({"llm_id": llm_id})
.eq("agent_type", agent_type)
.execute()
)
def _save_original_models() -> list[dict]:
from supabase import create_client
sb = create_client(_get_supabase_url(), _get_supabase_key())
return (
sb.table("system_agents")
.select("agent_type, llm_id")
.execute()
.data
)
def _restore_models(original_rows: list[dict]) -> None:
from supabase import create_client
sb = create_client(_get_supabase_url(), _get_supabase_key())
for row in original_rows:
(
sb.table("system_agents")
.update({"llm_id": row["llm_id"]})
.eq("agent_type", row["agent_type"])
.execute()
)
def _evaluate_answer_quality(
*,
answer: str,
run_finished: bool,
expect_tool_use: bool,
has_tool_success: bool,
tool_names: list[str],
) -> float:
if not run_finished:
return 0.0
if not answer or not answer.strip():
return 0.0
score = 0.6
if expect_tool_use:
if has_tool_success:
score += 0.2
elif tool_names:
score += 0.1
else:
score -= 0.3
else:
if not tool_names:
score += 0.2
else:
score -= 0.1
if len(answer) > 10:
score += 0.1
if "无法" in answer or "失败" in answer or "错误" in answer:
if expect_tool_use:
score -= 0.1
return max(0.0, min(1.0, score))
def _evaluate_criteria(
*,
answer: str,
run_finished: bool,
tool_names: list[str],
has_tool_success: bool,
tool_results: list[dict],
scenario: object,
) -> list[ScoreDetail]:
details: list[ScoreDetail] = []
for criterion in getattr(scenario, "quality_criteria", []):
passed = False
note = ""
if "调用" in criterion or "project_cli" in criterion:
passed = any("project_cli" in tn for tn in tool_names)
note = f"tools: {tool_names}" if not passed else ""
elif "mode" in criterion and "day" in criterion:
for tr in tool_results:
args = tr.get("tool_call_args", {}) or tr.get("toolCallArgs", {})
inp = args.get("input", {})
if isinstance(inp, dict) and inp.get("mode") == "day":
passed = True
break
elif "具体" in criterion or "时间戳" in criterion:
passed = has_tool_success
elif "基于工具" in criterion or "返回" in criterion:
passed = has_tool_success
elif "无日程" in criterion:
passed = "" in answer or "没有" in answer
elif "简短" in criterion or "简洁" in criterion:
passed = 0 < len(answer) < 200
elif "自我介绍" in criterion:
passed = "Linksy" in answer or "助手" in answer
elif "礼貌" in criterion:
passed = len(answer) > 0
else:
passed = run_finished and len(answer) > 0
details.append(ScoreDetail(criterion=criterion, passed=passed, note=note))
return details
async def _run_model_scenarios(model_code: str, user_id: str) -> ModelScorecard:
from services.llm_pricing.service import LlmPricingService
pricing = LlmPricingService()
token = _create_jwt(user_id)
scores: list[ScenarioScore] = []
for scenario in ALL_SCENARIOS:
result = await _run_via_http(
user_message=scenario.prompt,
token=token,
)
answer = result["final_answer"]
tool_names = result["tool_names"]
has_tool_success = len(result["successful_tool_names"]) > 0
tu = result["token_usage"]
total_tokens = tu.get("totalTokens", 0)
input_tokens = tu.get("inputTokens", 0) or tu.get("promptCacheMissTokens", 0)
output_tokens = tu.get("outputTokens", 0) or max(total_tokens - input_tokens, 0)
try:
cost_usd = pricing.calculate_cost(
model=model_code,
prompt_tokens=input_tokens,
completion_tokens=output_tokens,
cached_prompt_tokens=tu.get("promptCacheHitTokens", 0),
)
except ValueError:
cost_usd = 0.0
cost_usd = round(cost_usd, 8)
tool_called = any("project_cli" in tn for tn in tool_names)
tool_succeeded = has_tool_success if scenario.expect_tool_use else True
answer_quality = _evaluate_answer_quality(
answer=answer,
run_finished=result["run_finished"],
expect_tool_use=scenario.expect_tool_use,
has_tool_success=has_tool_success,
tool_names=tool_names,
)
details = _evaluate_criteria(
answer=answer,
run_finished=result["run_finished"],
tool_names=tool_names,
has_tool_success=has_tool_success,
tool_results=result["tool_results"],
scenario=scenario,
)
print(
f" [{model_code}] {scenario.id:<25} "
f"lat={result['latency_ms']}ms "
f"tokens={total_tokens} "
f"cost=${cost_usd:.6f} "
f"tool={'OK' if has_tool_success else 'FAIL'} "
f"answer={answer[:60]}"
)
scores.append(
ScenarioScore(
scenario_id=scenario.id,
model_code=model_code,
latency_ms=result["latency_ms"],
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost_usd,
tool_called=tool_called,
tool_succeeded=tool_succeeded,
answer_quality=answer_quality,
details=details,
raw_answer=answer[:500],
run_finished=result["run_finished"],
)
)
return ModelScorecard(model_code=model_code, scenario_scores=scores)
@pytest.fixture(autouse=True)
def _check_env():
if os.getenv("QUALITY_TEST") != "1":
pytest.skip("set QUALITY_TEST=1 to run quality tests")
@pytest.fixture(autouse=True)
def _require_test_user_id():
_get_test_user_id()
@pytest.mark.asyncio
@pytest.mark.quality
@pytest.mark.live
async def test_model_ab_comparison():
user_id = _get_test_user_id()
original_rows = _save_original_models()
scorecards: list[ModelScorecard] = []
try:
for model_code in CANDIDATE_MODELS:
_switch_model(model_code)
card = await _run_model_scenarios(model_code, user_id)
scorecards.append(card)
print(card.summary_table())
finally:
_restore_models(original_rows)
print("\n" + "=" * 60)
print("COMPARISON")
print("=" * 60)
for card in scorecards:
print(
f" {card.model_code:<20} "
f"overall={card.avg_overall:.2f} "
f"latency={card.avg_latency_ms:.0f}ms "
f"cost=${card.avg_cost_usd:.6f} "
f"tool_success={card.tool_success_rate:.0%}"
)
if len(scorecards) == 2:
a, b = scorecards
winner = a.model_code if a.avg_overall >= b.avg_overall else b.model_code
print(f"\n Winner: {winner} (by overall score)")
@@ -7,6 +7,7 @@ from ag_ui.core import RunAgentInput
import core.agentscope.runtime.runner as runner_module
from core.agentscope.runtime.runner import AgentScopeRunner
from schemas.agent.runtime_models import (
RunStatus,
RouterAgentOutput,
WorkerAgentOutputLite,
)
@@ -60,6 +61,31 @@ def test_build_worker_input_messages_only_contains_router_contract() -> None:
assert "[RouterAgentOutput]" in str(input_messages[0].content)
def test_build_agent_sets_worker_max_iters(
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}
class _FakeJsonReActAgent:
def __init__(self, **kwargs: object) -> None:
captured.update(kwargs)
monkeypatch.setattr(runner_module, "JsonReActAgent", _FakeJsonReActAgent)
runner = AgentScopeRunner()
model = runner_module.TrackingChatModel(object())
agent = runner._build_agent(
agent_name="worker",
system_prompt="test",
toolkit=object(),
model=model,
)
assert isinstance(agent, _FakeJsonReActAgent)
assert captured["max_iters"] == 7
def test_build_router_messages_injects_user_input_when_context_last_not_user() -> None:
runner = AgentScopeRunner()
run_input = _run_input()
@@ -119,6 +145,45 @@ def test_build_router_messages_appends_user_input_to_context_tail() -> None:
assert messages[0].content == "上一轮回复"
def test_enforce_tool_evidence_contract_keeps_success_when_tool_succeeds() -> None:
runner = AgentScopeRunner()
worker_output = runner._enforce_tool_evidence_contract(
worker_output=WorkerAgentOutputLite(
status=RunStatus.SUCCESS,
answer="今天没有日程",
suggested_actions=["查明天"],
),
requires_tool_evidence=True,
has_successful_tool_result=True,
)
assert worker_output.status == RunStatus.SUCCESS
assert worker_output.answer == "今天没有日程"
assert worker_output.suggested_actions == ["查明天"]
assert worker_output.error is None
def test_enforce_tool_evidence_contract_forces_failure_without_successful_tool() -> None:
runner = AgentScopeRunner()
worker_output = runner._enforce_tool_evidence_contract(
worker_output=WorkerAgentOutputLite(
status=RunStatus.SUCCESS,
answer="今天没有日程",
suggested_actions=["查明天"],
),
requires_tool_evidence=True,
has_successful_tool_result=False,
)
assert worker_output.status == RunStatus.FAILED
assert worker_output.answer == "无法确认结果:所需工具调用未成功完成。"
assert worker_output.suggested_actions == []
assert worker_output.error is not None
assert worker_output.error.code == "TOOL_EVIDENCE_MISSING"
def test_build_model_omits_none_generate_kwargs(
monkeypatch: pytest.MonkeyPatch,
) -> None:
@@ -1,6 +1,10 @@
from __future__ import annotations
from core.agentscope.prompts.agent_prompt import build_agent_prompt
from core.agentscope.prompts.agent_prompt import (
build_agent_prompt,
build_worker_contract_prompt,
)
from schemas.agent.runtime_models import RouterAgentOutput
from schemas.agent.system_agent import AgentType, SystemAgentLLMConfig
@@ -18,9 +22,12 @@ def test_build_agent_prompt_for_worker_contains_runtime_config() -> None:
assert "<!-- AGENT_START -->" in prompt
assert "- type: worker" in prompt
assert "context_messages.mode=number" in prompt
assert "context_messages.count=20" in prompt
assert "enabled_skills=calendar,contacts" in prompt
assert "Use objective plus context_summary as the primary execution guide from the router." in prompt
assert "When requires_tool_evidence=true, do not finalize an answer from failed tool calls; either recover with a corrected tool call or explicitly surface that execution failed." in prompt
assert "If all tool calls fail under requires_tool_evidence=true, set status=failed and populate error; do not present a factual answer as confirmed." in prompt
assert "context_messages.mode=number" not in prompt
assert "context_messages.count=20" not in prompt
def test_build_agent_prompt_for_router_contains_identity_and_config() -> None:
@@ -35,5 +42,20 @@ def test_build_agent_prompt_for_router_contains_identity_and_config() -> None:
assert "- type: router" in prompt
assert "[Router Agent]" in prompt
assert "When the task will require project_cli, include canonical tool input defaults in context_summary using the exact shape `project_cli_defaults={\"module\":...,\"method\":...,\"input\":{...}}` whenever they can be determined safely." in prompt
assert "Standardize every time value mentioned in context_summary to the exact project_cli input format that would be required downstream: dates as `YYYY-MM-DD`, local datetimes as RFC3339 with timezone offset, and event ids as raw UUID strings." in prompt
assert "For relative time requests like today, tomorrow, or next Monday, resolve them using system_time_local and place the resolved standardized value into project_cli_defaults.input instead of leaving natural-language time phrases." in prompt
assert "context_messages.mode=day" in prompt
assert "context_messages.count=2" in prompt
def test_build_worker_contract_prompt_prefers_resolved_dates_from_context_summary() -> None:
prompt = build_worker_contract_prompt(
router_output=RouterAgentOutput(
objective="查询今天日程",
context_summary="目标日期: 2026-04-24",
requires_tool_evidence=True,
)
)
assert "If context_summary contains project_cli_defaults, prefer using those exact module/method/input values directly." in prompt
@@ -0,0 +1,84 @@
from __future__ import annotations
import json
import pytest
from core.agentscope.tools.cli.adapter import invoke_cli_tool
@pytest.mark.asyncio
async def test_project_cli_requires_module_and_method() -> None:
response = await invoke_cli_tool(
tool_name="project_cli",
tool_call_args={
"module": "calendar",
"input": {},
},
allowed_commands={"calendar"},
)
assert response.content
block = response.content[0]
text = block["text"] if isinstance(block, dict) else block.text
payload = json.loads(text)
assert payload["ok"] is False
assert payload["module"] == "calendar"
assert payload["method"] == ""
assert payload["error"]["code"] == "INVALID_ARGUMENT"
@pytest.mark.asyncio
async def test_project_cli_failure_includes_method_contract_in_side_channel() -> None:
from core.agentscope.tools.tool_call_context import (
peek_tool_agent_output,
reset_current_tool_call_id,
set_current_tool_call_id,
)
from core.auth.credential_issuer import create_credential_issuer
from core.auth.tool_credential_context import reset_tool_credential, set_tool_credential
token = set_current_tool_call_id("call-test-guidance")
credential_token = set_tool_credential(
create_credential_issuer().issue(
owner_id="00000000-0000-0000-0000-000000000001",
mode="chat",
)
)
try:
response = await invoke_cli_tool(
tool_name="project_cli",
tool_call_args={
"module": "calendar",
"method": "read",
"input": {},
},
allowed_commands={"calendar"},
)
finally:
reset_tool_credential(credential_token)
reset_current_tool_call_id(token)
assert response.content
block = response.content[0]
text = block["text"] if isinstance(block, dict) else block.text
payload = json.loads(text)
assert payload["ok"] is False
assert payload["module"] == "calendar"
assert payload["method"] == "read"
assert payload["data"] is None
assert payload["error"]["code"] == "INVALID_ACTION_INPUT"
stored = peek_tool_agent_output(tool_call_id="call-test-guidance")
assert stored is not None
error = stored.get("error")
assert isinstance(error, dict)
assert error["code"] == "INVALID_ACTION_INPUT"
assert error["details"]["input_schema"]["mode"] == "string enum(day|range|event)"
assert error["details"]["expected_input_examples"][0] == {
"mode": "day",
"date": "2026-04-24",
"timezone": "Asia/Shanghai",
}
assert "resolve the day to a concrete input.date value" in error["message"]
@@ -1,38 +1,96 @@
from __future__ import annotations
import pytest
from core.agentscope.tools.cli.handler_calendar import (
_resolve_read_range,
_day_input_to_range_input,
_CalendarReadDayInput,
handle_calendar_create_event,
handle_calendar_list_day,
)
from core.agentscope.tools.cli.models import CliCommand
def test_resolve_read_range_supports_date_timezone_fallback() -> None:
request = CliCommand(
command="calendar",
subcommand="read",
owner_id="u1",
args={"date": "2026-04-23", "timezone": "Asia/Shanghai"},
def test_day_input_converts_to_tz_range() -> None:
payload = _CalendarReadDayInput.model_validate(
{"mode": "day", "date": "2026-04-23", "timezone": "Asia/Shanghai"}
)
start_at, end_at, error = _resolve_read_range(request)
result = _day_input_to_range_input(payload)
assert error is None
assert start_at is not None
assert end_at is not None
assert start_at.isoformat() == "2026-04-22T16:00:00+00:00"
assert end_at.isoformat() == "2026-04-23T16:00:00+00:00"
assert result == {
"mode": "range",
"start_at": "2026-04-23T00:00:00+08:00",
"end_at": "2026-04-24T00:00:00+08:00",
}
def test_resolve_read_range_rejects_bad_date() -> None:
@pytest.mark.asyncio
async def test_calendar_read_rejects_bad_date_format() -> None:
request = CliCommand(
command="calendar",
subcommand="read",
module="calendar",
method="read",
owner_id="u1",
args={"date": "2026/04/23", "timezone": "Asia/Shanghai"},
input={"mode": "day", "date": "2026/04/23", "timezone": "Asia/Shanghai"},
)
start_at, end_at, error = _resolve_read_range(request)
result = await handle_calendar_list_day(request)
assert start_at is None
assert end_at is None
assert error == "date must be YYYY-MM-DD"
assert result.ok is False
assert result.error is not None
assert result.error.code == "INVALID_ACTION_INPUT"
assert result.error.details == {
"missing_fields": [],
"invalid_fields": ["day.date"],
}
@pytest.mark.asyncio
async def test_calendar_read_range_requires_timezone_aware_datetimes() -> None:
request = CliCommand(
module="calendar",
method="read",
owner_id="u1",
input={
"mode": "range",
"start_at": "2026-04-23T00:00:00",
"end_at": "2026-04-24T00:00:00",
},
)
result = await handle_calendar_list_day(request)
assert result.ok is False
assert result.error is not None
assert result.error.code == "INVALID_ACTION_INPUT"
assert sorted(result.error.details["invalid_fields"]) == ["range.end_at", "range.start_at"]
@pytest.mark.asyncio
async def test_create_event_rejects_legacy_field_aliases_with_corrections() -> None:
request = CliCommand(
module="calendar",
method="create",
owner_id="u1",
input={
"title": "Project sync",
"start_time": "2026-04-23T10:00:00+08:00",
"end_time": "2026-04-23T11:00:00+08:00",
"event_timezone": "Asia/Shanghai",
},
)
result = await handle_calendar_create_event(request)
assert result.ok is False
assert result.error is not None
assert result.error.code == "INVALID_ACTION_INPUT"
assert result.error.details == {
"missing_fields": ["start_at", "timezone"],
"invalid_fields": ["end_time", "event_timezone", "start_time"],
"alias_corrections": {
"start_time": "start_at",
"end_time": "end_at",
"event_timezone": "timezone",
},
}
@@ -3,18 +3,21 @@ from __future__ import annotations
from core.agentscope.tools.cli.handlers import build_router
def test_router_registers_only_new_canonical_subcommands() -> None:
def test_router_registers_only_new_canonical_actions() -> None:
router = build_router()
assert ("calendar", "create") in router.command_pairs
assert ("calendar", "read") in router.command_pairs
assert ("calendar", "update") in router.command_pairs
assert ("calendar", "delete") in router.command_pairs
assert ("calendar", "share") in router.command_pairs
assert ("contacts", "read") in router.command_pairs
assert ("memory", "update") in router.command_pairs
assert ("calendar", "read") in router.module_methods
assert ("calendar", "create") in router.module_methods
assert ("calendar", "update") in router.module_methods
assert ("calendar", "delete") in router.module_methods
assert ("calendar", "share") in router.module_methods
assert ("calendar", "accept_invite") in router.module_methods
assert ("calendar", "reject_invite") in router.module_methods
assert ("contacts", "read") in router.module_methods
assert ("memory", "update") in router.module_methods
assert ("calendar", "write") not in router.command_pairs
assert ("contacts", "lookup") not in router.command_pairs
assert ("memory", "write") not in router.command_pairs
assert ("memory", "forget") not in router.command_pairs
assert ("calendar", "list_day") not in router.module_methods
assert ("calendar", "get_event") not in router.module_methods
assert ("contacts", "lookup") not in router.module_methods
assert ("memory", "write") not in router.module_methods
assert ("memory", "forget") not in router.module_methods
@@ -11,13 +11,13 @@ async def test_router_register_and_dispatch() -> None:
router = CommandRouter()
async def mock_handler(request: CliCommand) -> CliCommandResult:
return CliCommandResult(ok=True, command=request.command, subcommand=request.subcommand, data={"name": request.args["name"]})
return CliCommandResult(ok=True, module=request.module, method=request.method, data={"name": request.input["name"]})
router.register(command="test", subcommand="run", handler=mock_handler)
router.register(module="test", method="run", handler=mock_handler)
assert ("test", "run") in router.command_pairs
assert ("test", "run") in router.module_methods
result = await router.dispatch(CliCommand(command="test", subcommand="run", args={"name": "demo"}, owner_id="u1"))
result = await router.dispatch(CliCommand(module="test", method="run", input={"name": "demo"}, owner_id="u1"))
assert result.ok is True
assert result.data == {"name": "demo"}
@@ -25,10 +25,10 @@ async def test_router_register_and_dispatch() -> None:
@pytest.mark.asyncio
async def test_router_unknown_command() -> None:
router = CommandRouter()
result = await router.dispatch(CliCommand(command="unknown", subcommand="run", args={}, owner_id="u1"))
result = await router.dispatch(CliCommand(module="unknown", method="run", input={}, owner_id="u1"))
assert result.ok is False
assert result.error is not None
assert result.error.code == "UNKNOWN_COMMAND"
assert result.error.code == "UNKNOWN_METHOD"
@pytest.mark.asyncio
@@ -39,9 +39,9 @@ async def test_router_handler_exception() -> None:
del request
raise ValueError("intentional error")
router.register(command="fail", subcommand="run", handler=failing_handler)
router.register(module="fail", method="run", handler=failing_handler)
result = await router.dispatch(CliCommand(command="fail", subcommand="run", args={}, owner_id="u1"))
result = await router.dispatch(CliCommand(module="fail", method="run", input={}, owner_id="u1"))
assert result.ok is False
assert result.error is not None
assert result.error.code == "HANDLER_ERROR"
@@ -51,12 +51,12 @@ def test_router_duplicate_register() -> None:
router = CommandRouter()
async def handler1(request: CliCommand) -> CliCommandResult:
return CliCommandResult(ok=True, command=request.command, subcommand=request.subcommand)
return CliCommandResult(ok=True, module=request.module, method=request.method)
async def handler2(request: CliCommand) -> CliCommandResult:
return CliCommandResult(ok=True, command=request.command, subcommand=request.subcommand)
return CliCommandResult(ok=True, module=request.module, method=request.method)
router.register(command="cmd", subcommand="one", handler=handler1)
router.register(module="cmd", method="one", handler=handler1)
with pytest.raises(ValueError, match="already registered"):
router.register(command="cmd", subcommand="one", handler=handler2)
router.register(module="cmd", method="one", handler=handler2)
@@ -6,31 +6,53 @@ from schemas.agent.runtime_models import ToolAgentOutput, ToolStatus
def _make_tool_output(
*,
command: str,
subcommand: str,
module: str,
method: str,
status: ToolStatus,
data: dict | None = None,
) -> ToolAgentOutput:
return ToolAgentOutput(
tool_name="project_cli",
tool_call_id="test_call_id",
tool_call_args={"command": command, "subcommand": subcommand, "args": {}},
tool_call_args={"module": module, "method": method, "input": {}},
status=status,
result={"command": command, "subcommand": subcommand, "data": data or {}},
result={"module": module, "method": method, "data": data or {}},
error=None,
ui_hints=None,
)
def test_postprocess_calendar_read_has_ui_hints() -> None:
output = _make_tool_output(command="calendar", subcommand="read", status=ToolStatus.SUCCESS, data={"total": 5, "items": []})
output = _make_tool_output(
module="calendar",
method="read",
status=ToolStatus.SUCCESS,
data={"total": 5, "items": []},
)
processed = postprocess_tool_output(output)
assert processed.ui_hints is not None
assert processed.ui_hints["intent"] == "list"
def test_postprocess_calendar_read_event_detail_has_ui_hints() -> None:
output = _make_tool_output(
module="calendar",
method="read",
status=ToolStatus.SUCCESS,
data={"id": "evt_1", "title": "Project sync", "start_at": "2026-04-21T10:00:00+08:00"},
)
processed = postprocess_tool_output(output)
assert processed.ui_hints is not None
assert processed.ui_hints["title"] == "日程详情"
def test_postprocess_calendar_create_partial() -> None:
output = _make_tool_output(command="calendar", subcommand="create", status=ToolStatus.PARTIAL, data={"status": "partial", "success": 1, "failed": 1, "results": []})
output = _make_tool_output(
module="calendar",
method="create",
status=ToolStatus.PARTIAL,
data={"status": "partial", "success": 1, "failed": 1, "results": []},
)
processed = postprocess_tool_output(output)
assert processed.ui_hints is not None
assert processed.ui_hints["intent"] == "status"
@@ -39,8 +61,8 @@ def test_postprocess_calendar_create_partial() -> None:
def test_postprocess_calendar_share_has_ui_hints() -> None:
output = _make_tool_output(
command="calendar",
subcommand="share",
module="calendar",
method="share",
status=ToolStatus.SUCCESS,
data={
"status": "success",
@@ -60,7 +82,12 @@ def test_postprocess_calendar_share_has_ui_hints() -> None:
def test_postprocess_contacts_read_has_ui_hints() -> None:
output = _make_tool_output(command="contacts", subcommand="read", status=ToolStatus.SUCCESS, data={"friends_count": 3, "friends": []})
output = _make_tool_output(
module="contacts",
method="read",
status=ToolStatus.SUCCESS,
data={"friends_count": 3, "friends": []},
)
processed = postprocess_tool_output(output)
assert processed.ui_hints is not None
assert processed.ui_hints["intent"] == "list"
@@ -69,8 +96,8 @@ def test_postprocess_contacts_read_has_ui_hints() -> None:
def test_postprocess_memory_update_has_ui_hints() -> None:
output = _make_tool_output(
command="memory",
subcommand="update",
module="memory",
method="update",
status=ToolStatus.SUCCESS,
data={
"status": "success",
@@ -95,19 +122,19 @@ def test_postprocess_memory_update_has_ui_hints() -> None:
def test_postprocess_failure_no_ui_hints() -> None:
output = _make_tool_output(command="calendar", subcommand="read", status=ToolStatus.FAILURE, data=None)
output = _make_tool_output(module="calendar", method="read", status=ToolStatus.FAILURE, data=None)
processed = postprocess_tool_output(output)
assert processed.ui_hints is None
def test_postprocess_unknown_command_no_ui_hints() -> None:
output = _make_tool_output(command="unknown", subcommand="run", status=ToolStatus.SUCCESS, data={"data": "test"})
output = _make_tool_output(module="unknown", method="run", status=ToolStatus.SUCCESS, data={"data": "test"})
processed = postprocess_tool_output(output)
assert processed.ui_hints is None
def test_postprocess_preserves_existing_ui_hints() -> None:
output = _make_tool_output(command="calendar", subcommand="read", status=ToolStatus.SUCCESS, data={"total": 5})
output = _make_tool_output(module="calendar", method="read", status=ToolStatus.SUCCESS, data={"total": 5})
output = output.model_copy(update={"ui_hints": {"view": "custom_view", "custom": True}})
processed = postprocess_tool_output(output)
assert processed.ui_hints["view"] == "custom_view"
@@ -3,6 +3,7 @@ import asyncio
from core.agentscope.tools.internal.project_cli import PROJECT_CLI_TOOL_NAME
from core.agentscope.tools.internal.view_skill_file import VIEW_SKILL_FILE_TOOL_NAME
from core.agentscope.tools.internal import make_view_skill_file_wrapper
from core.agentscope.tools.skill_session import SkillSessionState
from core.agentscope.tools.toolkit import build_toolkit
from schemas.agent.skill_config import SkillName
@@ -48,8 +49,22 @@ def test_build_toolkit_registers_project_cli() -> None:
}
def test_build_toolkit_uses_custom_agent_skill_prompt_contract() -> None:
toolkit = build_toolkit(enabled_skill_names={"calendar"})
prompt = toolkit.get_agent_skill_prompt()
assert prompt is not None
assert "The entries below are skill indexes, not full execution instructions." in prompt
assert 'file_path="calendar/SKILL.md"' in prompt
assert "/home/" not in prompt
def test_view_skill_file_rejects_path_outside_enabled_skill_dirs() -> None:
wrapper = make_view_skill_file_wrapper(enabled_skill_names={"calendar"})
wrapper = make_view_skill_file_wrapper(
enabled_skill_names={"calendar"},
skill_session=SkillSessionState(),
)
response = asyncio.run(
wrapper(file_path="/tmp/not-allowed.txt", ranges=None),
@@ -62,10 +77,48 @@ def test_view_skill_file_rejects_path_outside_enabled_skill_dirs() -> None:
def test_view_skill_file_reads_enabled_skill_file() -> None:
wrapper = make_view_skill_file_wrapper(enabled_skill_names={"calendar"})
skill_session = SkillSessionState()
wrapper = make_view_skill_file_wrapper(
enabled_skill_names={"calendar"},
skill_session=skill_session,
)
response = asyncio.run(wrapper(file_path="calendar/SKILL.md", ranges=[1, 10]))
assert response.content
block = response.content[0]
text = block["text"] if isinstance(block, dict) else block.text
assert "Calendar Skill" in text or "name: calendar" in text
assert skill_session.has_read(skill_name="calendar") is True
def test_view_skill_file_reads_calendar_action_card() -> None:
skill_session = SkillSessionState()
wrapper = make_view_skill_file_wrapper(
enabled_skill_names={"calendar"},
skill_session=skill_session,
)
response = asyncio.run(
wrapper(file_path="calendar/actions/get_event.md", ranges=[1, 20])
)
assert response.content
block = response.content[0]
text = block["text"] if isinstance(block, dict) else block.text
assert "get_event" in text
assert '"action": "get_event"' in text
assert skill_session.has_read(skill_name="calendar") is True
def test_view_skill_file_rejects_action_card_for_disabled_skill() -> None:
wrapper = make_view_skill_file_wrapper(
enabled_skill_names={"contacts"},
skill_session=SkillSessionState(),
)
response = asyncio.run(
wrapper(file_path="calendar/actions/get_event.md", ranges=[1, 20])
)
assert response.content
block = response.content[0]
text = block["text"] if isinstance(block, dict) else block.text
assert "ACCESS_DENIED" in text