docs: 更新协议文档,删除废弃计划文档

- 更新 http-error-codes, user-points-chat-data-protocol
- 更新 divination-run-protocol, profile-protocol
- 删除废弃的后端和前端设计计划文档
This commit is contained in:
qzl
2026-04-08 17:23:02 +08:00
parent 49fc9a116f
commit e80a82bef4
57 changed files with 4117 additions and 2269 deletions
@@ -11,8 +11,6 @@ from ag_ui.core import (
StepStartedEvent,
StepFinishedEvent,
)
from core.agentscope.runtime.ui_compiler import compile as compile_ui_hints
from schemas.agent.ui_hints import UiHintsPayload
if TYPE_CHECKING:
pass
@@ -55,15 +53,6 @@ def _sanitize_agui_event(event: dict[str, Any]) -> dict[str, Any]:
payload = dict(event)
event_type = str(payload.get("type", "")).strip().upper()
if event_type == EventType.TEXT_MESSAGE_END.value:
ui_hints = payload.get("ui_hints")
if ui_hints is not None:
try:
ui_hints_payload = UiHintsPayload.model_validate(ui_hints)
ui_schema = compile_ui_hints(ui_hints_payload)
payload["ui_schema"] = ui_schema
except Exception:
pass
payload.pop("ui_hints", None)
for key in (
"inputTokens",
"outputTokens",
@@ -72,9 +61,6 @@ def _sanitize_agui_event(event: dict[str, Any]) -> dict[str, Any]:
"model",
):
payload.pop(key, None)
if event_type == EventType.TOOL_CALL_RESULT.value:
payload.pop("ui_hints", None)
payload.pop("ui_schema", None)
return payload
@@ -182,7 +168,7 @@ def to_agui_wire_event(event: dict[str, Any] | BaseEvent) -> dict[str, Any]:
tool_result_payload["threadId"] = thread_id
if isinstance(run_id, str) and run_id:
tool_result_payload["runId"] = run_id
reserved = {"type", "threadId", "runId", "ui_hints", "ui_schema"}
reserved = {"type", "threadId", "runId"}
tool_result_payload.update({k: v for k, v in data.items() if k not in reserved})
return tool_result_payload
+34 -43
View File
@@ -13,7 +13,7 @@ from core.logging import get_logger
from schemas.agent.forwarded_props import RuntimeMode
from schemas.enums import AgentChatMessageRole, AgentChatSessionStatus
from schemas.agent.system_agent import AgentType
from schemas.agent.runtime_models import AgentOutput, ToolAgentOutput
from schemas.agent.runtime_models import AgentOutput, FollowUpOutput, ToolAgentOutput
from schemas.agent.visibility import SystemVisibilityBit, bit_mask
from schemas.domain.chat_message import AgentChatMessageMetadata
@@ -103,8 +103,6 @@ class SqlAlchemyEventStore:
session_repo: SessionRepository,
message_repo: MessageRepository,
) -> None:
message_id_raw = self._event_value(event, "messageId")
message_id = message_id_raw if isinstance(message_id_raw, str) else ""
content_value = self._event_value(event, "answer")
content = content_value if isinstance(content_value, str) else ""
if not content:
@@ -122,17 +120,22 @@ class SqlAlchemyEventStore:
if run_id_value is None:
return
runtime_mode = self._resolve_runtime_mode(event=event)
worker_output_fields = (
"status",
"sign_level",
"conclusion",
"focus_points",
"advice",
"keywords",
"answer",
"error",
"divination_derived",
"ui_hints",
(
"status",
"sign_level",
"conclusion",
"focus_points",
"advice",
"keywords",
"answer",
"error",
"divination_derived",
)
if runtime_mode == RuntimeMode.CHAT.value
else ("status", "answer", "error")
)
worker_output_payload: dict[str, object] = {}
for field in worker_output_fields:
@@ -143,21 +146,16 @@ class SqlAlchemyEventStore:
if not worker_output_payload:
return
try:
if runtime_mode == RuntimeMode.CHAT.value:
worker_output = AgentOutput.model_validate(worker_output_payload)
agent_type = AgentType.WORKER
metadata_model = AgentChatMessageMetadata(
run_id=run_id_value,
agent_type=agent_type,
agent_output=worker_output,
)
except Exception:
self._logger.warning(
"invalid worker metadata payload",
run_id=run_id_value,
message_id=message_id,
)
return
else:
worker_output = FollowUpOutput.model_validate(worker_output_payload)
agent_type = AgentType.WORKER
metadata_model = AgentChatMessageMetadata(
run_id=run_id_value,
agent_type=agent_type,
agent_output=worker_output,
)
role_value = self._event_value(event, "role")
if not isinstance(role_value, str):
@@ -345,29 +343,22 @@ class SqlAlchemyEventStore:
if isinstance(metadata, dict):
message_payload["metadata"] = metadata
try:
context_cache = create_context_messages_cache()
await context_cache.append_message(
thread_id=str(session_id),
runtime_mode=self._resolve_runtime_mode(event=event),
visibility_mask=visibility_mask,
message=message_payload,
)
except Exception as exc:
self._logger.warning(
"Failed to append context cache message from event",
thread_id=str(session_id),
error=str(exc),
)
context_cache = create_context_messages_cache()
await context_cache.append_message(
thread_id=str(session_id),
runtime_mode=self._resolve_runtime_mode(event=event),
visibility_mask=visibility_mask,
message=message_payload,
)
@staticmethod
def _resolve_runtime_mode(*, event: dict[str, Any]) -> str:
raw = event.get("runtime_mode")
if isinstance(raw, str):
normalized = raw.strip().lower()
if normalized:
if normalized in (RuntimeMode.CHAT.value, RuntimeMode.FOLLOW_UP.value):
return normalized
return RuntimeMode.CHAT.value
raise ValueError("invalid runtime_mode in event payload")
@staticmethod
def _resolve_message_timestamp(message: Any) -> str:
+52 -34
View File
@@ -35,6 +35,7 @@ from schemas.agent.forwarded_props import (
)
from schemas.domain.divination import DerivedDivinationData
from schemas.agent.runtime_models import (
FollowUpOutput,
WorkerAgentOutputLite,
resolve_worker_output_model,
)
@@ -102,21 +103,23 @@ class AgentScopeRunner:
worker_toolkit = self._build_toolkit()
if cancel_checker is not None and await cancel_checker():
raise asyncio.CancelledError("run canceled by user")
derived_divination = self._resolve_derived_divination(
run_input=run_input
)
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
step_name="divination",
event_type="DIVINATION_DERIVED",
runtime_mode=runtime_mode,
extra_event={
"divination": derived_divination.model_dump(
mode="json", by_alias=True, exclude_none=True
)
},
)
derived_divination: DerivedDivinationData | None = None
if runtime_mode == RuntimeMode.CHAT:
derived_divination = self._resolve_derived_divination(
run_input=run_input
)
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
step_name="divination",
event_type="DIVINATION_DERIVED",
runtime_mode=runtime_mode,
extra_event={
"divination": derived_divination.model_dump(
mode="json", by_alias=True, exclude_none=True
)
},
)
worker_output = await self._execute_worker_step(
pipeline=pipeline,
run_input=run_input,
@@ -208,9 +211,11 @@ class AgentScopeRunner:
stage_config: SystemAgentRuntimeConfig,
runtime_client_time: ClientTimeContext | None,
runtime_mode: RuntimeMode,
derived_divination: DerivedDivinationData,
) -> WorkerAgentOutputLite:
worker_output_model = resolve_worker_output_model()
derived_divination: DerivedDivinationData | None,
) -> WorkerAgentOutputLite | FollowUpOutput:
worker_output_model = resolve_worker_output_model(
runtime_mode=runtime_mode.value
)
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
@@ -252,11 +257,11 @@ class AgentScopeRunner:
toolkit: Any,
run_input: RunAgentInput,
stage_config: SystemAgentRuntimeConfig,
worker_output_model: type[WorkerAgentOutputLite],
worker_output_model: type[WorkerAgentOutputLite | FollowUpOutput],
pipeline: PipelineLike,
runtime_client_time: ClientTimeContext | None,
runtime_mode: RuntimeMode,
derived_divination: DerivedDivinationData,
derived_divination: DerivedDivinationData | None,
) -> StageExecutionResult:
tracking_model = self._build_model(stage_config=stage_config)
formatter = OpenAIChatFormatter()
@@ -292,12 +297,11 @@ class AgentScopeRunner:
usage_summary=tracking_model.usage_summary(),
)
await emitter.emit_final_text_end(
worker_output={
**worker_payload.model_dump(mode="json", exclude_none=True),
"divination_derived": derived_divination.model_dump(
mode="json", by_alias=True, exclude_none=True
),
},
worker_output=self._build_final_worker_output(
worker_payload=worker_payload,
runtime_mode=runtime_mode,
derived_divination=derived_divination,
),
response_metadata=response_metadata,
)
return StageExecutionResult(
@@ -316,15 +320,18 @@ class AgentScopeRunner:
*,
context_messages: list[Msg],
run_input: RunAgentInput,
derived_divination: DerivedDivinationData,
derived_divination: DerivedDivinationData | None,
) -> list[Msg]:
if context_messages:
last = context_messages[-1]
if last.role == "user":
return context_messages
_, _ = extract_latest_user_payload(run_input)
user_text = build_divination_user_prompt(derived=derived_divination)
_, latest_user_text = extract_latest_user_payload(run_input)
if derived_divination is None:
user_text = latest_user_text
else:
user_text = build_divination_user_prompt(derived=derived_divination)
user_blocks = [{"type": "text", "text": user_text}]
if (
user_blocks
@@ -422,12 +429,23 @@ class AgentScopeRunner:
@staticmethod
def _resolve_runtime_mode(*, run_input: RunAgentInput) -> RuntimeMode:
try:
return parse_forwarded_props_runtime_mode(
getattr(run_input, "forwarded_props", None)
return parse_forwarded_props_runtime_mode(
getattr(run_input, "forwarded_props", None)
)
@staticmethod
def _build_final_worker_output(
*,
worker_payload: WorkerAgentOutputLite | FollowUpOutput,
runtime_mode: RuntimeMode,
derived_divination: DerivedDivinationData | None,
) -> dict[str, Any]:
payload = worker_payload.model_dump(mode="json", exclude_none=True)
if runtime_mode == RuntimeMode.CHAT and derived_divination is not None:
payload["divination_derived"] = derived_divination.model_dump(
mode="json", by_alias=True, exclude_none=True
)
except ValueError:
return RuntimeMode.CHAT
return payload
@staticmethod
def _resolve_provider_api_key(*, factory_name: str) -> str:
@@ -57,19 +57,21 @@ class PipelineStageEmitter:
"role": "assistant",
"stage": self._stage,
"status": worker_output.get("status"),
"sign_level": worker_output.get("sign_level"),
"conclusion": worker_output.get("conclusion", []),
"focus_points": worker_output.get("focus_points", []),
"advice": worker_output.get("advice", []),
"keywords": worker_output.get("keywords", []),
"answer": worker_output.get("answer", ""),
"error": worker_output.get("error"),
"divination_derived": worker_output.get("divination_derived"),
**response_metadata,
}
ui_hints = worker_output.get("ui_hints")
if ui_hints is not None:
payload["ui_hints"] = ui_hints
if self._runtime_mode == "chat":
payload.update(
{
"sign_level": worker_output.get("sign_level"),
"conclusion": worker_output.get("conclusion", []),
"focus_points": worker_output.get("focus_points", []),
"advice": worker_output.get("advice", []),
"keywords": worker_output.get("keywords", []),
"divination_derived": worker_output.get("divination_derived"),
}
)
await self._emit("TEXT_MESSAGE_END", payload)
async def _emit_text_events_from_msg(self, msg: Msg) -> None:
+78 -7
View File
@@ -6,6 +6,7 @@ from typing import Any, cast
from uuid import UUID
from agentscope.message import Msg
from pydantic import TypeAdapter
from core.agentscope.caches import create_user_context_cache
from core.agentscope.caches.attachment_content_cache import (
create_attachment_content_cache,
@@ -31,6 +32,7 @@ from schemas.agent.forwarded_props import (
parse_forwarded_props_runtime_mode,
)
from schemas.agent.runtime_config import MessageContextConfig, RuntimeConfig
from schemas.agent.runtime_models import RuntimeAgentOutput
from schemas.domain.chat_message import (
AgentChatMessageMetadata,
extract_user_message_attachments,
@@ -46,6 +48,7 @@ from v1.points.service import PointsService
logger = get_logger("core.agentscope.runtime.tasks")
_MAX_CONTEXT_ATTACHMENTS = 3
_RUNTIME_AGENT_OUTPUT_ADAPTER = TypeAdapter(RuntimeAgentOutput)
def _serialize_tool_agent_output(
@@ -75,6 +78,72 @@ def _serialize_tool_agent_output(
)
def _serialize_assistant_context_from_metadata(
*,
metadata: AgentChatMessageMetadata | dict[str, object] | None,
fallback_content: str,
) -> str:
if metadata is None:
return fallback_content
try:
resolved_metadata = (
metadata
if isinstance(metadata, AgentChatMessageMetadata)
else AgentChatMessageMetadata.model_validate(metadata)
)
except Exception:
return fallback_content
agent_output = resolved_metadata.agent_output
if agent_output is None and isinstance(metadata, dict):
raw = metadata.get("agent_output")
if raw is not None:
try:
agent_output = _RUNTIME_AGENT_OUTPUT_ADAPTER.validate_python(raw)
except Exception:
return fallback_content
if agent_output is None:
return fallback_content
payload = agent_output.model_dump(mode="json", by_alias=True, exclude_none=True)
lines: list[str] = ["[assistant_context]"]
answer = payload.get("answer")
if isinstance(answer, str) and answer:
lines.append(f"answer: {answer}")
sign_level = payload.get("sign_level")
if isinstance(sign_level, str) and sign_level:
lines.append(f"sign_level: {sign_level}")
focus_points = payload.get("focus_points")
if isinstance(focus_points, list) and focus_points:
lines.append("focus_points:")
for item in focus_points:
if isinstance(item, str) and item:
lines.append(f"- {item}")
divination_derived = payload.get("divination_derived")
if isinstance(divination_derived, dict) and divination_derived:
lines.append("divination_derived:")
key_map = (
("guaName", "gua_name"),
("targetGuaName", "target_gua_name"),
("binaryCode", "binary_code"),
("changedBinaryCode", "changed_binary_code"),
)
for key, label in key_map:
value = divination_derived.get(key)
if isinstance(value, str) and value:
lines.append(f" {label}: {value}")
if len(lines) <= 1:
return fallback_content
return "\n".join(lines)
def _load_runtime() -> type[Any]:
return AgentScopeRuntimeOrchestrator
@@ -235,6 +304,12 @@ async def _build_recent_context_messages(
continue
content = tool_content
if role == "assistant":
content = _serialize_assistant_context_from_metadata(
metadata=metadata,
fallback_content=content,
)
converted.append(
Msg(
name=role or "user",
@@ -259,13 +334,9 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]:
raise ValueError("run_input is required")
run_input = parse_run_input(run_input_raw)
runtime_mode = RuntimeMode.CHAT
try:
runtime_mode = parse_forwarded_props_runtime_mode(
getattr(run_input, "forwarded_props", None)
)
except ValueError:
runtime_mode = RuntimeMode.CHAT
runtime_mode = parse_forwarded_props_runtime_mode(
getattr(run_input, "forwarded_props", None)
)
runtime_config = RuntimeConfig.model_validate(runtime_config_raw or {})
thread_id = run_input.thread_id
run_id = run_input.run_id
@@ -1,638 +0,0 @@
"""
UiCompiler - 将描述性 UiHints 编译为渲染性 UiSchemaRenderer
设计原则:
- 机械转换: 不依赖复杂语义理解
- 尽量无损: hints 中出现的主要内容字段尽量保留
- 弱模板: intent 只影响默认布局,不决定字段是否丢失
"""
from __future__ import annotations
from typing import Any
from schemas.agent.ui_hints import (
UiHintAction,
UiHintActionTarget,
UiHintIcon,
UiHintIntent,
UiHintKvItem,
UiHintListItem,
UiHintSection,
UiHintsPayload,
UiHintStatus,
UiHintTextFormat,
)
# ============================================================
# Helpers
# ============================================================
def _compact(value: Any) -> Any:
"""递归移除 dict/list 中的 None,保留 False/0/空字符串/空列表按原样存在。"""
if isinstance(value, dict):
return {k: _compact(v) for k, v in value.items() if v is not None}
if isinstance(value, list):
return [_compact(v) for v in value]
return value
def _non_empty(nodes: list[dict[str, Any] | None]) -> list[dict[str, Any]]:
return [node for node in nodes if node is not None]
def compile_status(status: UiHintStatus) -> str:
return status.value
def _status_badge_needed(intent: UiHintIntent, status: UiHintStatus) -> bool:
return intent == UiHintIntent.STATUS or status != UiHintStatus.INFO
def _status_label(status: str) -> str:
normalized = status.strip().lower()
if not normalized:
return "ui.status.info"
return f"ui.status.{normalized}"
# ============================================================
# Action Compilation
# ============================================================
def compile_action(action: UiHintActionTarget) -> dict[str, Any]:
"""
编译 action target。
关键修复:
- 使用 by_alias=True,避免 toolId/successMessage/submitTo 丢失
"""
action_dict = action.model_dump(by_alias=True, exclude_none=True)
action_type = action_dict["type"]
if action_type == "navigation":
result: dict[str, Any] = {
"type": "navigation",
"path": action_dict["path"],
}
if "params" in action_dict:
result["params"] = action_dict["params"]
return result
if action_type == "url":
return {
"type": "url",
"url": action_dict["url"],
"target": action_dict.get("target", "_blank"),
}
if action_type == "event":
result = {
"type": "event",
"event": action_dict["event"],
}
if "payload" in action_dict:
result["payload"] = action_dict["payload"]
return result
if action_type == "tool":
result = {
"type": "tool",
"toolId": action_dict["toolId"],
}
if "params" in action_dict:
result["params"] = action_dict["params"]
return result
if action_type == "copy":
result = {
"type": "copy",
"content": action_dict["content"],
}
if "successMessage" in action_dict:
result["successMessage"] = action_dict["successMessage"]
return result
if action_type == "payload":
result = {
"type": "payload",
"payload": action_dict["payload"],
}
if "submitTo" in action_dict:
result["submitTo"] = action_dict["submitTo"]
return result
raise ValueError(f"Unknown action type: {action_type}")
def compile_button(action: UiHintAction) -> dict[str, Any]:
return _compact(
{
"type": "button",
"label": action.label,
"style": action.style.value if action.style else "primary",
"disabled": action.disabled,
"action": compile_action(action.action),
}
)
# ============================================================
# Small Node Compilation
# ============================================================
def compile_icon_spec(icon: UiHintIcon) -> dict[str, Any]:
return _compact(
{
"source": icon.source.value,
"value": icon.value,
"color": icon.color,
"size": icon.size,
}
)
def compile_icon_node(icon: UiHintIcon) -> dict[str, Any]:
return _compact(
{
"type": "icon",
"source": icon.source.value,
"value": icon.value,
"color": icon.color,
"size": icon.size,
}
)
def compile_text(
content: str,
*,
role: str = "body",
format: UiHintTextFormat | str = UiHintTextFormat.PLAIN,
status: str | None = None,
) -> dict[str, Any]:
fmt = format.value if isinstance(format, UiHintTextFormat) else format
return _compact(
{
"type": "text",
"content": content,
"format": fmt,
"role": role,
"status": status,
}
)
def compile_badge(label: str, status: str) -> dict[str, Any]:
return {
"type": "badge",
"label": label,
"status": status,
}
def compile_kv_item(item: UiHintKvItem) -> dict[str, Any]:
return _compact(
{
"key": item.key,
"label": item.label,
"value": item.value,
"copyable": item.copyable,
}
)
def compile_kv(items: list[UiHintKvItem], columns: int = 1) -> dict[str, Any]:
return {
"type": "kv",
"items": [compile_kv_item(i) for i in items],
"columns": columns,
}
def compile_divider() -> dict[str, Any]:
return {"type": "divider", "inset": 0}
# ============================================================
# Layout Compilation
# ============================================================
def compile_stack(
children: list[dict[str, Any]],
*,
direction: str = "vertical",
gap: int = 12,
appearance: str = "plain",
align: str | None = None,
justify: str | None = None,
wrap: bool | None = None,
status: str | None = None,
node_id: str | None = None,
) -> dict[str, Any]:
return _compact(
{
"type": "stack",
"id": node_id,
"direction": direction,
"gap": gap,
"appearance": appearance,
"align": align,
"justify": justify,
"wrap": wrap,
"status": status,
"children": children,
}
)
def compile_grid(
children: list[dict[str, Any]],
*,
columns: int,
gap: int = 12,
appearance: str = "plain",
status: str | None = None,
node_id: str | None = None,
) -> dict[str, Any]:
return _compact(
{
"type": "grid",
"id": node_id,
"columns": columns,
"gap": gap,
"appearance": appearance,
"status": status,
"children": children,
}
)
def compile_card(
children: list[dict[str, Any]], *, status: str | None = None
) -> dict[str, Any]:
return compile_stack(
children,
direction="vertical",
gap=12,
appearance="card",
status=status,
)
def compile_section(
*,
title: str | None = None,
description: str | None = None,
icon: UiHintIcon | None = None,
children: list[dict[str, Any]],
status: str | None = None,
) -> dict[str, Any]:
section_children: list[dict[str, Any]] = []
header_row_children: list[dict[str, Any]] = []
if icon:
header_row_children.append(compile_icon_node(icon))
if title:
header_row_children.append(compile_text(title, role="title"))
if header_row_children:
section_children.append(
compile_stack(
header_row_children,
direction="horizontal",
gap=8,
align="center",
)
)
if description:
section_children.append(compile_text(description, role="caption"))
section_children.extend(children)
return compile_stack(
section_children,
direction="vertical",
gap=12,
appearance="section",
status=status,
)
# ============================================================
# Block Compilation
# ============================================================
def compile_action_row(actions: list[UiHintAction]) -> dict[str, Any] | None:
if not actions:
return None
buttons = [compile_button(a) for a in actions]
return compile_stack(
buttons,
direction="horizontal",
gap=8,
align="center",
wrap=True,
)
def compile_list_item(item: UiHintListItem) -> dict[str, Any]:
lead_children: list[dict[str, Any]] = [compile_text(item.title, role="body")]
if item.subtitle:
lead_children.append(compile_text(item.subtitle, role="caption"))
if item.description:
lead_children.append(compile_text(item.description, role="caption"))
lead_block = compile_stack(
lead_children,
direction="vertical",
gap=4,
)
main_row_children: list[dict[str, Any]] = []
if item.icon:
main_row_children.append(compile_icon_node(item.icon))
main_row_children.append(lead_block)
row = compile_stack(
main_row_children,
node_id=item.id,
direction="horizontal",
gap=8,
align="center",
)
trailing_children: list[dict[str, Any]] = []
if item.status:
trailing_children.append(
compile_badge(
label=_status_label(item.status.value),
status=item.status.value,
)
)
if trailing_children:
header = compile_stack(
[row, compile_stack(trailing_children, direction="horizontal", gap=8)],
direction="horizontal",
gap=8,
justify="space-between",
align="center",
)
else:
header = row
children: list[dict[str, Any]] = [header]
action_row = compile_action_row(item.actions)
if action_row:
children.append(action_row)
return compile_stack(
children,
direction="vertical",
gap=8,
appearance="card" if item.actions or item.description else "plain",
status=item.status.value if item.status else None,
node_id=item.id,
)
def compile_list_block(items: list[UiHintListItem]) -> dict[str, Any]:
return compile_stack(
[compile_list_item(item) for item in items],
direction="vertical",
gap=8,
)
def compile_section_block(
section: UiHintSection, default_status: str
) -> dict[str, Any]:
"""
修复点:
- 不再先把 title/description 塞进 children 再切片
- 避免 description 重复输出
"""
body_children: list[dict[str, Any]] = []
if section.content:
body_children.append(
compile_text(
section.content,
role="body",
format=section.content_format,
)
)
if section.items:
body_children.append(compile_kv(section.items))
if section.list_items:
body_children.append(compile_list_block(section.list_items))
action_row = compile_action_row(section.actions)
if action_row:
body_children.append(action_row)
if section.title or section.description or section.icon:
return compile_section(
title=section.title,
description=section.description,
icon=section.icon,
children=body_children,
status=default_status,
)
return compile_stack(
body_children,
direction="vertical",
gap=12,
)
# ============================================================
# Top-level Compilation
# ============================================================
def compile_header(hints: UiHintsPayload) -> dict[str, Any] | None:
status = compile_status(hints.status)
title_row_children: list[dict[str, Any]] = []
if hints.icon:
title_row_children.append(compile_icon_node(hints.icon))
if hints.title:
title_row_children.append(compile_text(hints.title, role="title"))
right_children: list[dict[str, Any]] = []
if _status_badge_needed(hints.intent, hints.status):
right_children.append(compile_badge(_status_label(status), status))
header_children: list[dict[str, Any]] = []
if title_row_children and right_children:
header_children.append(
compile_stack(
[
compile_stack(
title_row_children,
direction="horizontal",
gap=8,
align="center",
),
compile_stack(
right_children,
direction="horizontal",
gap=8,
align="center",
),
],
direction="horizontal",
gap=8,
justify="space-between",
align="center",
)
)
elif title_row_children:
header_children.append(
compile_stack(
title_row_children,
direction="horizontal",
gap=8,
align="center",
)
)
elif right_children:
header_children.append(
compile_stack(
right_children,
direction="horizontal",
gap=8,
align="center",
)
)
if hints.description:
header_children.append(compile_text(hints.description, role="caption"))
if not header_children:
return None
return compile_stack(
header_children,
direction="vertical",
gap=8,
)
def compile_body_blocks(hints: UiHintsPayload) -> list[dict[str, Any]]:
blocks: list[dict[str, Any]] = []
if hints.body:
blocks.append(
compile_text(
hints.body,
role="body",
format=hints.body_format,
status=compile_status(hints.status)
if hints.intent == UiHintIntent.STATUS
else None,
)
)
if hints.items:
blocks.append(compile_kv(hints.items))
if hints.list_items:
blocks.append(compile_list_block(hints.list_items))
if hints.sections:
blocks.extend(
[
compile_section_block(section, compile_status(hints.status))
for section in hints.sections
]
)
return blocks
def compile_footer(hints: UiHintsPayload) -> dict[str, Any] | None:
return compile_action_row(hints.actions)
def _root_appearance(intent: UiHintIntent) -> str:
if intent in {UiHintIntent.DATA, UiHintIntent.STATUS, UiHintIntent.MIXED}:
return "card"
if intent == UiHintIntent.FORM:
return "section"
return "plain"
def _root_gap(intent: UiHintIntent) -> int:
if intent == UiHintIntent.FORM:
return 16
return 12
def compile_root(hints: UiHintsPayload) -> dict[str, Any]:
"""
intent 只影响默认布局风格,不决定字段是否保留。
"""
children = _non_empty(
[
compile_header(hints),
*compile_body_blocks(hints),
compile_footer(hints),
]
)
if not children:
children = [compile_text("No content", role="body")]
return compile_stack(
children,
direction="vertical",
gap=_root_gap(hints.intent),
appearance=_root_appearance(hints.intent),
status=compile_status(hints.status),
)
# ============================================================
# Public API
# ============================================================
def compile(hints: UiHintsPayload) -> dict[str, Any]:
"""
将描述性 UiHints 编译为渲染性 UiSchemaRenderer。
保证:
- title / description / body / items / listItems / sections / actions / icon 尽量保留
- intent 只影响默认包装风格
- meta 中常用 requestId/toolId/traceId/userId 会透传
"""
root = compile_root(hints)
meta_keys = ("requestId", "toolId", "traceId", "userId")
meta = {k: hints.meta.get(k) for k in meta_keys if hints.meta.get(k) is not None}
result: dict[str, Any] = {
"version": "2.0",
"locale": "zh-CN",
"status": compile_status(hints.status),
"theme": "default",
"root": root,
}
if meta:
result["meta"] = meta
return _compact(result)
-14
View File
@@ -11,18 +11,10 @@ from schemas.agent.runtime_models import (
ToolAgentOutput,
ToolStatus,
WorkerAgentOutputLite,
WorkerAgentOutputRich,
resolve_worker_output_model,
)
from schemas.agent.system_agent import AgentType, SystemAgentLLMConfig
from schemas.agent.visibility import SystemVisibilityBit, VisibilityMask, bit_mask
from schemas.agent.ui_hints import (
UiHintAction,
UiHintIntent,
UiHintSection,
UiHintStatus,
UiHintsPayload,
)
__all__ = [
"AgentType",
@@ -35,14 +27,8 @@ __all__ = [
"SystemVisibilityBit",
"ToolAgentOutput",
"ToolStatus",
"UiHintAction",
"UiHintIntent",
"UiHintSection",
"UiHintStatus",
"UiHintsPayload",
"VisibilityMask",
"WorkerAgentOutputLite",
"WorkerAgentOutputRich",
"bit_mask",
"parse_forwarded_props_client_time",
"parse_forwarded_props_runtime_mode",
@@ -64,6 +64,7 @@ class ClientTimeContext(BaseModel):
class RuntimeMode(str, Enum):
CHAT = "chat"
FOLLOW_UP = "follow_up"
class ForwardedPropsPayload(BaseModel):
+15 -6
View File
@@ -5,7 +5,6 @@ from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field
from schemas.agent.ui_hints import UiHintsPayload
from schemas.domain.divination import DerivedDivinationData
@@ -53,18 +52,28 @@ class WorkerAgentOutputLite(BaseModel):
error: ErrorInfo | None = None
class WorkerAgentOutputRich(WorkerAgentOutputLite):
ui_hints: UiHintsPayload | None = None
class FollowUpOutput(BaseModel):
model_config = ConfigDict(extra="forbid")
status: RunStatus = RunStatus.SUCCESS
answer: str = Field(min_length=1, max_length=4000)
error: ErrorInfo | None = None
class AgentOutput(WorkerAgentOutputRich):
class AgentOutput(WorkerAgentOutputLite):
model_config = ConfigDict(extra="forbid")
divination_derived: DerivedDivinationData | None = None
WorkerAgentOutput = WorkerAgentOutputLite | WorkerAgentOutputRich
WorkerAgentOutput = WorkerAgentOutputLite
RuntimeAgentOutput = AgentOutput | FollowUpOutput
def resolve_worker_output_model() -> type[WorkerAgentOutputLite]:
def resolve_worker_output_model(
*, runtime_mode: str
) -> type[WorkerAgentOutputLite | FollowUpOutput]:
normalized = runtime_mode.strip().lower()
if normalized == "follow_up":
return FollowUpOutput
return WorkerAgentOutputLite
-349
View File
@@ -1,349 +0,0 @@
"""
UiHints - 描述性 UI 提示
设计原则:
- 描述性而非渲染性: 告诉编译器“要展示什么”,而不是“如何渲染”
- 最小化 token: 保持字段简洁
- 可编译: 可机械转换为 UiSchemaRenderer
- 尽量无损: hints 中的主要内容字段应尽量被保留到 renderer 中
Version: 2.1
"""
from __future__ import annotations
from enum import Enum
import re
from typing import Any, ClassVar, Literal
from pydantic import BaseModel, ConfigDict, Field
from pydantic import field_validator
_NAVIGATION_PATH_PATTERN = re.compile(r"^/[A-Za-z0-9/_-]*$")
_NAVIGATION_PARAM_KEY_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9_]{0,31}$")
_MAX_NAVIGATION_PARAMS = 8
# ============================================================
# Enums
# ============================================================
class UiHintStatus(str, Enum):
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
ERROR = "error"
PENDING = "pending"
class UiHintIntent(str, Enum):
"""主要展示意图(弱提示,不应决定字段生死)"""
MESSAGE = "message" # 普通消息/说明
DATA = "data" # 数据/结果摘要
LIST = "list" # 列表为主
STATUS = "status" # 状态结果为主
FORM = "form" # 结构化内容(当前不表示真实输入表单)
MIXED = "mixed" # 混合内容
class UiHintActionStyle(str, Enum):
PRIMARY = "primary"
SECONDARY = "secondary"
GHOST = "ghost"
DANGER = "danger"
class UiHintTextFormat(str, Enum):
PLAIN = "plain"
MARKDOWN = "markdown"
class UiHintActionType(str, Enum):
NAVIGATION = "navigation"
URL = "url"
EVENT = "event"
TOOL = "tool"
COPY = "copy"
PAYLOAD = "payload"
class UiHintIconSource(str, Enum):
ICON = "icon"
EMOJI = "emoji"
URL = "url"
# ============================================================
# Base Config
# ============================================================
class UiHintBaseModel(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(
extra="forbid",
populate_by_name=True,
)
# ============================================================
# Action Targets
# ============================================================
class UiHintActionNavigation(UiHintBaseModel):
type: Literal["navigation"]
path: str = Field(..., description="Internal route path.")
params: dict[str, Any] | None = Field(default=None, description="Route params.")
@field_validator("path")
@classmethod
def validate_navigation_path(cls, value: str) -> str:
path = value.strip()
if not path:
raise ValueError("navigation path must not be empty")
if len(path) > 256:
raise ValueError("navigation path is too long")
if path.startswith("//") or "://" in path:
raise ValueError("navigation path must be internal")
if "?" in path or "#" in path:
raise ValueError("navigation path must not contain query or fragment")
if ":" in path:
raise ValueError("navigation path must be concrete without placeholders")
if _NAVIGATION_PATH_PATTERN.fullmatch(path) is None:
raise ValueError("navigation path contains unsupported characters")
return path
@field_validator("params")
@classmethod
def validate_navigation_params(
cls, value: dict[str, Any] | None
) -> dict[str, Any] | None:
if value is None:
return None
if len(value) > _MAX_NAVIGATION_PARAMS:
raise ValueError("navigation params exceed limit")
normalized: dict[str, Any] = {}
for key, param_value in value.items():
if _NAVIGATION_PARAM_KEY_PATTERN.fullmatch(key) is None:
raise ValueError("navigation param key is invalid")
if isinstance(param_value, (str, int, float, bool)):
normalized[key] = param_value
continue
raise ValueError("navigation params must be scalar")
return normalized
class UiHintActionUrl(UiHintBaseModel):
type: Literal["url"]
url: str = Field(..., description="External URL.")
target: Literal["_self", "_blank"] | None = Field(default=None)
class UiHintActionEvent(UiHintBaseModel):
type: Literal["event"]
event: str = Field(..., description="Frontend event name.")
payload: dict[str, Any] | None = Field(default=None)
class UiHintActionTool(UiHintBaseModel):
type: Literal["tool"]
tool_id: str = Field(alias="toolId", description="Tool identifier.")
params: dict[str, Any] | None = Field(default=None)
class UiHintActionCopy(UiHintBaseModel):
type: Literal["copy"]
content: str = Field(..., description="Content to copy.")
success_message: str | None = Field(alias="successMessage", default=None)
class UiHintActionPayload(UiHintBaseModel):
type: Literal["payload"]
payload: dict[str, Any] = Field(..., description="Structured payload.")
submit_to: str | None = Field(alias="submitTo", default=None)
UiHintActionTarget = (
UiHintActionNavigation
| UiHintActionUrl
| UiHintActionEvent
| UiHintActionTool
| UiHintActionCopy
| UiHintActionPayload
)
class UiHintAction(UiHintBaseModel):
label: str = Field(..., description="Button label.")
style: UiHintActionStyle | None = Field(default=None, description="Button style.")
disabled: bool = Field(default=False, description="Disabled state.")
action: UiHintActionTarget = Field(..., description="Action to execute.")
# ============================================================
# Small Descriptive Models
# ============================================================
class UiHintIcon(UiHintBaseModel):
source: UiHintIconSource = Field(default=UiHintIconSource.ICON)
value: str = Field(..., description="Icon identifier / emoji / url.")
color: str | None = Field(default=None)
size: int | None = Field(default=None)
class UiHintKvItem(UiHintBaseModel):
key: str = Field(..., description="Key identifier.")
label: str | None = Field(default=None, description="Display label.")
value: Any = Field(default=None, description="Value.")
copyable: bool = Field(default=False, description="Allow copy.")
class UiHintListItem(UiHintBaseModel):
id: str | None = Field(default=None)
title: str = Field(..., description="Item title.")
subtitle: str | None = Field(default=None)
description: str | None = Field(default=None)
icon: UiHintIcon | None = Field(default=None)
status: UiHintStatus | None = Field(default=None)
actions: list[UiHintAction] = Field(default_factory=list)
@field_validator("status", mode="before")
@classmethod
def normalize_status(cls, value: object) -> object:
if value is None:
return None
if isinstance(value, dict):
status_type = value.get("type")
if isinstance(status_type, str):
return status_type
status_value = value.get("status")
if isinstance(status_value, str):
return status_value
return value
class UiHintSection(UiHintBaseModel):
title: str | None = Field(default=None, description="Section title.")
description: str | None = Field(default=None, description="Section description.")
icon: UiHintIcon | None = Field(default=None, description="Section icon.")
content: str | None = Field(default=None, description="Main text content.")
content_format: UiHintTextFormat = Field(
default=UiHintTextFormat.PLAIN,
alias="contentFormat",
description="Section content text format.",
)
items: list[UiHintKvItem] = Field(default_factory=list, description="KV items.")
list_items: list[UiHintListItem] = Field(
default_factory=list,
alias="listItems",
description="List items.",
)
actions: list[UiHintAction] = Field(default_factory=list, description="Actions.")
# ============================================================
# Root Payload
# ============================================================
class UiHintsPayload(UiHintBaseModel):
"""
描述性 UI 提示
设计目标:
- agent 输出尽可能短
- 不表达布局细节
- 编译器负责转换为完整 UiSchemaRenderer
"""
model_config: ClassVar[ConfigDict] = ConfigDict(
extra="forbid",
populate_by_name=True,
json_schema_extra={
"examples": [
{
"intent": "status",
"status": "success",
"title": "日程已创建",
"body": "本次创建已成功完成。",
"items": [
{"key": "title", "label": "主题", "value": "Q1 规划会议"},
{"key": "time", "label": "时间", "value": "2026-03-15 14:00"},
],
"actions": [
{
"label": "查看详情",
"style": "primary",
"action": {
"type": "navigation",
"path": "/calendar/evt_123",
},
},
{
"label": "删除",
"style": "danger",
"action": {
"type": "tool",
"toolId": "calendar.delete",
"params": {"eventId": "evt_123"},
},
},
],
}
]
},
)
version: str = Field(default="2.1")
intent: UiHintIntent = Field(
default=UiHintIntent.MESSAGE,
description="Primary display intent.",
)
status: UiHintStatus = Field(
default=UiHintStatus.INFO,
description="Overall status.",
)
title: str | None = Field(default=None, description="Top-level title.")
description: str | None = Field(default=None, description="Top-level description.")
body: str | None = Field(default=None, description="Top-level main body text.")
body_format: UiHintTextFormat = Field(
default=UiHintTextFormat.PLAIN,
alias="bodyFormat",
description="Body text format.",
)
items: list[UiHintKvItem] = Field(
default_factory=list,
description="Top-level key-value items.",
)
list_items: list[UiHintListItem] = Field(
default_factory=list,
alias="listItems",
description="Top-level list items.",
)
sections: list[UiHintSection] = Field(
default_factory=list,
description="Grouped sections.",
)
actions: list[UiHintAction] = Field(
default_factory=list,
description="Top-level actions.",
)
icon: UiHintIcon | None = Field(
default=None,
description="Top-level icon.",
)
meta: dict[str, Any] = Field(
default_factory=dict,
description="Extra meta, e.g. requestId/toolId/traceId/userId.",
)
+3 -2
View File
@@ -6,7 +6,7 @@ from typing import Any, ClassVar
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from schemas.agent.runtime_models import AgentOutput
from schemas.agent.runtime_models import AgentOutput, FollowUpOutput
from ..agent import AgentType, ToolAgentOutput
@@ -25,7 +25,7 @@ class AgentChatMessageMetadata(BaseModel):
agent_type: AgentType | None = None
user_message_attachments: list[UserMessageAttachment] | None = None
tool_agent_output: ToolAgentOutput | None = None
agent_output: AgentOutput | None = None
agent_output: AgentOutput | FollowUpOutput | None = None
class AgentChatMessage(BaseModel):
@@ -34,6 +34,7 @@ class AgentChatMessage(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
id: UUID
session_id: UUID
seq: int
role: str
content: str
+71 -13
View File
@@ -2,7 +2,7 @@ from __future__ import annotations
from datetime import date, datetime, time, timedelta, timezone
from decimal import Decimal
from typing import Protocol
from typing import Any, Protocol
from uuid import UUID, uuid4
from sqlalchemy import Select, func, select
@@ -45,8 +45,10 @@ class AgentRepository:
detail="Invalid session_id",
) from exc
stmt = select(AgentChatSession.user_id).where(
AgentChatSession.id == session_uuid
stmt = (
select(AgentChatSession.user_id)
.where(AgentChatSession.id == session_uuid)
.where(AgentChatSession.deleted_at.is_(None))
)
owner_id = (await self._session.execute(stmt)).scalar_one_or_none()
if owner_id is None:
@@ -103,10 +105,18 @@ class AgentRepository:
code="AGENT_SESSION_ID_INVALID",
detail="Invalid session_id",
) from exc
session = await self._session.get(AgentChatSession, session_uuid)
if session is not None:
await self._session.delete(session)
await self._session.flush()
stmt = (
select(AgentChatSession)
.where(AgentChatSession.id == session_uuid)
.with_for_update()
)
session = (await self._session.execute(stmt)).scalar_one_or_none()
if session is None:
return
if session.deleted_at is not None:
return
session.deleted_at = datetime.now(timezone.utc)
await self._session.flush()
async def persist_user_message(
self,
@@ -263,6 +273,37 @@ class AgentRepository:
"messages": snapshot_messages,
}
async def get_session_messages(
self,
*,
session_id: str,
visibility_mask: int | None = None,
) -> list[dict[str, object]]:
try:
session_uuid = UUID(session_id)
except ValueError as exc:
raise ApiProblemError(
status_code=422,
code="AGENT_SESSION_ID_INVALID",
detail="Invalid session_id",
) from exc
message_stmt = (
select(AgentChatMessage)
.where(AgentChatMessage.session_id == session_uuid)
.where(AgentChatMessage.deleted_at.is_(None))
.order_by(AgentChatMessage.seq.asc())
)
message_stmt = self._apply_visibility_filter(
stmt=message_stmt,
visibility_mask=visibility_mask,
)
messages = (await self._session.execute(message_stmt)).scalars().all()
snapshot_messages: list[dict[str, object]] = []
for message in messages:
snapshot_messages.append(await self._to_snapshot_message(message))
return snapshot_messages
async def get_recent_messages_by_user_window(
self,
*,
@@ -371,16 +412,32 @@ class AgentRepository:
.where(AgentChatMessage.deleted_at.is_(None))
.where(AgentChatMessage.role == AgentChatMessageRole.ASSISTANT)
.order_by(AgentChatMessage.created_at.desc())
.limit(1)
.limit(20)
)
message_stmt = self._apply_visibility_filter(
stmt=message_stmt,
visibility_mask=visibility_mask,
)
message = (await self._session.execute(message_stmt)).scalar_one_or_none()
if message is None:
candidate_messages = (
(await self._session.execute(message_stmt)).scalars().all()
)
if not candidate_messages:
continue
snapshots.append(await self._to_snapshot_message(message))
selected_snapshot: dict[str, object] | None = None
for message in candidate_messages:
snapshot = await self._to_snapshot_message(message)
metadata = snapshot.get("metadata")
if not isinstance(metadata, dict):
continue
agent_output = metadata.get("agent_output")
if not isinstance(agent_output, dict):
continue
derived = agent_output.get("divination_derived")
if isinstance(derived, dict) and derived:
selected_snapshot = snapshot
break
if selected_snapshot is not None:
snapshots.append(selected_snapshot)
snapshots.sort(
key=lambda item: str(item.get("timestamp") or ""),
@@ -416,6 +473,7 @@ class AgentRepository:
payload_model = AgentChatMessageSchema.model_validate(
{
"id": str(message.id),
"session_id": str(message.session_id),
"seq": int(message.seq),
"role": role,
"content": message.content,
@@ -434,9 +492,9 @@ class AgentRepository:
def _apply_visibility_filter(
self,
*,
stmt: Select,
stmt: Select[Any],
visibility_mask: int | None,
) -> Select:
) -> Select[Any]:
if visibility_mask is None:
return stmt
required_mask = max(int(visibility_mask), 0)
+11 -3
View File
@@ -5,7 +5,6 @@ import os
import re
import tempfile
from collections.abc import AsyncIterator
from datetime import date
from typing import Annotated
from ag_ui.core import RunAgentInput
@@ -25,6 +24,7 @@ from fastapi import (
Form,
Header,
Query,
Response,
Request,
UploadFile,
status,
@@ -297,15 +297,23 @@ async def get_user_history_snapshot(
service: Annotated[AgentService, Depends(get_agent_service)],
current_user: Annotated[CurrentUser, Depends(get_current_user)],
thread_id: str | None = Query(default=None, alias="threadId"),
before: date | None = Query(default=None),
) -> HistorySnapshotResponse:
return await service.get_user_history_snapshot(
current_user=current_user,
thread_id=thread_id,
before=before,
)
@router.delete("/sessions/{thread_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_session(
thread_id: str,
service: Annotated[AgentService, Depends(get_agent_service)],
current_user: Annotated[CurrentUser, Depends(get_current_user)],
) -> Response:
await service.delete_session(thread_id=thread_id, current_user=current_user)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.post(
"/attachments",
response_model=AttachmentUploadResponse,
+16 -1
View File
@@ -7,6 +7,7 @@ from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from schemas.agent.runtime_models import ErrorInfo
from schemas.domain.divination import DerivedDivinationData
@@ -21,6 +22,8 @@ class AgentRepositoryLike(Protocol):
async def rollback(self) -> None: ...
async def delete_session(self, *, session_id: str) -> None: ...
async def get_history_day(
self,
*,
@@ -29,6 +32,13 @@ class AgentRepositoryLike(Protocol):
visibility_mask: int | None = None,
) -> dict[str, object] | None: ...
async def get_session_messages(
self,
*,
session_id: str,
visibility_mask: int | None = None,
) -> list[dict[str, object]]: ...
async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: ...
async def get_latest_assistant_messages_by_user_sessions(
@@ -186,6 +196,7 @@ class HistoryMessage(BaseModel):
model_config = ConfigDict(populate_by_name=True, serialize_by_alias=True)
id: str = Field(description="Message UUID")
thread_id: str = Field(alias="threadId", description="Owning session UUID")
seq: int = Field(description="Message sequence number")
role: Literal["user", "assistant"] = Field(
description="Message role: user | assistant"
@@ -213,6 +224,7 @@ class HistoryAgentOutput(BaseModel):
advice: list[str] = Field(default_factory=list)
keywords: list[str] = Field(default_factory=list)
answer: str | None = None
error: ErrorInfo | None = None
divination_derived: DerivedDivinationData | None = None
@@ -221,7 +233,10 @@ class HistorySnapshotResponse(BaseModel):
model_config = ConfigDict(populate_by_name=True, serialize_by_alias=True)
scope: str = Field(default="history_day")
scope: str = Field(
default="history_session_full",
description="history_session_full | history_sessions_latest_assistant",
)
thread_id: str | None = Field(default=None, alias="threadId")
day: str | None = None
has_more: bool = Field(default=False, alias="hasMore")
+81 -67
View File
@@ -1,6 +1,6 @@
from __future__ import annotations
from datetime import date, datetime, timezone
from datetime import datetime, timezone
import hashlib
from urllib.parse import urlparse
@@ -46,7 +46,7 @@ from v1.agent.utils import (
)
logger = get_logger(__name__)
MAX_RUNS_PER_SESSION = 4
MAX_RUNS_PER_SESSION = 2
def ensure_session_owner(*, owner_id: str, current_user: CurrentUser) -> None:
@@ -94,6 +94,15 @@ class AgentService:
forwarded_props = getattr(run_input, "forwarded_props", None)
try:
runtime_mode = parse_forwarded_props_runtime_mode(forwarded_props)
except ValueError as exc:
raise ApiProblemError(
status_code=422,
detail=problem_payload(
code="AGENT_RUNTIME_MODE_INVALID",
detail="Invalid forwardedProps.runtime_mode",
),
) from exc
try:
divination_payload = parse_forwarded_props_divination_payload(
forwarded_props
)
@@ -123,6 +132,14 @@ class AgentService:
except ApiProblemError as exc:
if exc.status_code != 404:
raise
if runtime_mode == RuntimeMode.FOLLOW_UP:
raise ApiProblemError(
status_code=404,
detail=problem_payload(
code="AGENT_SESSION_NOT_FOUND",
detail="Session not found",
),
) from exc
created = await self._create_session_if_missing(
thread_id=thread_id,
current_user=current_user,
@@ -204,6 +221,22 @@ class AgentService:
accepted=True,
)
async def delete_session(
self,
*,
thread_id: str,
current_user: CurrentUser,
) -> None:
try:
owner = await self._repository.get_session_owner(session_id=thread_id)
except ApiProblemError as exc:
if exc.status_code == 404:
return
raise
ensure_session_owner(owner_id=owner, current_user=current_user)
await self._repository.delete_session(session_id=thread_id)
await self._repository.commit()
async def _append_context_cache_user_message(
self,
*,
@@ -226,30 +259,21 @@ class AgentService:
if isinstance(metadata_payload, dict):
message_payload["metadata"] = metadata_payload
try:
context_cache = create_context_messages_cache()
await context_cache.append_message(
thread_id=thread_id,
runtime_mode=runtime_mode.value,
visibility_mask=visibility_mask,
message=message_payload,
)
except Exception as exc:
logger.warning(
"Failed to append user message to context cache",
thread_id=thread_id,
runtime_mode=runtime_mode.value,
error=str(exc),
)
context_cache = create_context_messages_cache()
await context_cache.append_message(
thread_id=thread_id,
runtime_mode=runtime_mode.value,
visibility_mask=visibility_mask,
message=message_payload,
)
async def _resolve_user_message_visibility_mask(
self, *, runtime_mode: RuntimeMode
) -> int:
if runtime_mode == RuntimeMode.CHAT:
return bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) | bit_mask(
bit=int(SystemVisibilityBit.CONTEXT_ASSEMBLY)
)
return 0
_ = runtime_mode
return bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) | bit_mask(
bit=int(SystemVisibilityBit.CONTEXT_ASSEMBLY)
)
async def _prepare_user_message(
self,
@@ -571,7 +595,6 @@ class AgentService:
self,
*,
thread_id: str,
before: date | None,
current_user: CurrentUser,
) -> HistorySnapshotResponse:
from schemas.domain.chat_message import AgentChatMessage
@@ -580,57 +603,47 @@ class AgentService:
owner = await self._repository.get_session_owner(session_id=thread_id)
ensure_session_owner(owner_id=owner, current_user=current_user)
day_payload = await self._repository.get_history_day(
raw_messages = await self._repository.get_session_messages(
session_id=thread_id,
before=before,
visibility_mask=bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)),
)
messages: list[HistoryMessage] = []
if day_payload:
raw_messages_obj = day_payload.get("messages")
raw_messages = (
raw_messages_obj if isinstance(raw_messages_obj, list) else []
)
for msg_dict in raw_messages:
msg = AgentChatMessage.model_validate(msg_dict)
if msg.role == "tool":
continue
for msg_dict in raw_messages:
msg = AgentChatMessage.model_validate(msg_dict)
if msg.role == "tool":
continue
signed_urls: dict[str, str] = {}
attachments = extract_user_message_attachments(msg.metadata)
if self._attachment_storage and attachments:
expected_prefix = (
f"agent-inputs/{current_user.id}/{thread_id}/uploads/"
signed_urls: dict[str, str] = {}
attachments = extract_user_message_attachments(msg.metadata)
if self._attachment_storage and attachments:
expected_prefix = f"agent-inputs/{current_user.id}/{thread_id}/uploads/"
for attachment in attachments:
if not is_safe_attachment_path(
attachment.path,
expected_prefix=expected_prefix,
):
continue
signed_url = await self._attachment_storage.create_signed_url(
bucket=attachment.bucket,
path=attachment.path,
expires_in_seconds=self._SIGNED_URL_EXPIRES_IN_SECONDS,
)
for attachment in attachments:
if not is_safe_attachment_path(
attachment.path,
expected_prefix=expected_prefix,
):
continue
signed_url = await self._attachment_storage.create_signed_url(
bucket=attachment.bucket,
path=attachment.path,
expires_in_seconds=self._SIGNED_URL_EXPIRES_IN_SECONDS,
)
key = f"{attachment.bucket}/{attachment.path}"
signed_urls[key] = signed_url
key = f"{attachment.bucket}/{attachment.path}"
signed_urls[key] = signed_url
def _get_signed_url(payload: dict[str, str]) -> str:
key = f"{payload['bucket']}/{payload['path']}"
return signed_urls[key]
def _get_signed_url(payload: dict[str, str]) -> str:
key = f"{payload['bucket']}/{payload['path']}"
return signed_urls[key]
converted = convert_message_to_history(msg, _get_signed_url)
messages.append(HistoryMessage.model_validate(converted))
converted = convert_message_to_history(msg, _get_signed_url)
messages.append(HistoryMessage.model_validate(converted))
return HistorySnapshotResponse(
scope="history_day",
scope="history_session_full",
threadId=thread_id,
day=str(day_payload.get("day"))
if day_payload and day_payload.get("day")
else None,
hasMore=bool(day_payload.get("hasMore")) if day_payload else False,
day=None,
hasMore=False,
messages=messages,
)
@@ -639,7 +652,6 @@ class AgentService:
*,
current_user: CurrentUser,
thread_id: str | None,
before: date | None,
) -> HistorySnapshotResponse:
from schemas.domain.chat_message import AgentChatMessage
from v1.agent.utils import convert_message_to_history
@@ -648,20 +660,22 @@ class AgentService:
if thread_id is not None:
return await self.get_history_snapshot(
thread_id=thread_id,
before=before,
current_user=current_user,
)
summary_limit = 50
raw_messages = (
await self._repository.get_latest_assistant_messages_by_user_sessions(
user_id=str(current_user.id),
visibility_mask=bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)),
session_limit=50,
session_limit=summary_limit + 1,
)
)
has_more = len(raw_messages) > summary_limit
visible_messages = raw_messages[:summary_limit]
messages: list[HistoryMessage] = []
for msg_dict in raw_messages:
for msg_dict in visible_messages:
msg = AgentChatMessage.model_validate(msg_dict)
converted = convert_message_to_history(msg)
messages.append(HistoryMessage.model_validate(converted))
@@ -670,7 +684,7 @@ class AgentService:
scope="history_sessions_latest_assistant",
threadId=None,
day=None,
hasMore=False,
hasMore=has_more,
messages=messages,
)
+8 -12
View File
@@ -7,7 +7,8 @@
from collections.abc import Callable
from typing import Any
from schemas.agent.runtime_models import AgentOutput
from pydantic import TypeAdapter
from schemas.agent.runtime_models import RuntimeAgentOutput
from schemas.domain.chat_message import (
AgentChatMessage,
AgentChatMessageMetadata,
@@ -18,6 +19,7 @@ ALLOWED_ATTACHMENT_MIME_TYPES = {"image/png", "image/jpeg", "image/webp"}
MAX_ATTACHMENT_BYTES = 5 * 1024 * 1024
MAX_TOTAL_ATTACHMENT_BYTES = 12 * 1024 * 1024
MAX_ATTACHMENTS_PER_MESSAGE = 3
_RUNTIME_AGENT_OUTPUT_ADAPTER = TypeAdapter(RuntimeAgentOutput)
def convert_message_to_history(
@@ -46,6 +48,7 @@ def convert_message_to_history(
result: dict[str, Any] = {
"id": str(message.id),
"threadId": str(message.session_id),
"seq": message.seq,
"role": role,
"content": content,
@@ -78,12 +81,9 @@ def _convert_user_attachments(
signed_attachments: list[dict[str, str]] = []
for attachment in resolved:
try:
signed_url = get_signed_url_fn(
{"bucket": attachment.bucket, "path": attachment.path}
)
except Exception:
continue
signed_url = get_signed_url_fn(
{"bucket": attachment.bucket, "path": attachment.path}
)
signed_attachments.append(
{
"url": signed_url,
@@ -106,16 +106,12 @@ def _extract_worker_agent_output(
agent_output_data = metadata.get("agent_output")
if not agent_output_data:
return None
try:
agent_output = AgentOutput.model_validate(agent_output_data)
except Exception:
return None
agent_output = _RUNTIME_AGENT_OUTPUT_ADAPTER.validate_python(agent_output_data)
if not agent_output:
return None
payload = agent_output.model_dump(mode="json", by_alias=True, exclude_none=True)
payload.pop("ui_hints", None)
return payload or None
@@ -0,0 +1,29 @@
from __future__ import annotations
from v1.agent.schemas import HistoryMessage
def test_history_message_accepts_follow_up_error_payload() -> None:
payload = {
"id": "msg-1",
"threadId": "thread-1",
"seq": 2,
"role": "assistant",
"content": "补充回答",
"timestamp": "2026-04-08T07:31:24+00:00",
"agent_output": {
"status": "failed",
"answer": "需要补充信息",
"error": {
"code": "INVALID_INPUT",
"message": "输入内容不完整",
"retryable": True,
"details": {},
},
},
}
parsed = HistoryMessage.model_validate(payload)
assert parsed.agent_output is not None
assert parsed.agent_output.error is not None
assert parsed.agent_output.error.code == "INVALID_INPUT"
@@ -3,7 +3,11 @@ from __future__ import annotations
import pytest
from pydantic import ValidationError
from schemas.agent.runtime_models import WorkerAgentOutputLite
from schemas.agent.runtime_models import (
FollowUpOutput,
WorkerAgentOutputLite,
resolve_worker_output_model,
)
def test_worker_output_lite_rejects_divination_derived_from_llm() -> None:
@@ -20,3 +24,29 @@ def test_worker_output_lite_rejects_divination_derived_from_llm() -> None:
with pytest.raises(ValidationError):
WorkerAgentOutputLite.model_validate(payload)
def test_follow_up_output_accepts_minimal_schema() -> None:
payload = {
"status": "success",
"answer": "追问回答",
}
parsed = FollowUpOutput.model_validate(payload)
assert parsed.answer == "追问回答"
def test_follow_up_output_rejects_chat_only_fields() -> None:
payload = {
"status": "success",
"answer": "追问回答",
"sign_level": "中上签",
}
with pytest.raises(ValidationError):
FollowUpOutput.model_validate(payload)
def test_resolve_worker_output_model_uses_runtime_mode() -> None:
assert resolve_worker_output_model(runtime_mode="chat") is WorkerAgentOutputLite
assert resolve_worker_output_model(runtime_mode="follow_up") is FollowUpOutput