""" 历史消息转换工具函数 将数据库中的原始消息转换为 API 响应的数据结构 """ from collections.abc import Callable from typing import Any from core.agentscope.runtime.ui_compiler import compile as compile_ui_hints from schemas.messages.chat_message import ( AgentChatMessage, AgentChatMessageMetadata, UserMessageAttachments, ) def convert_message_to_history( message: AgentChatMessage, get_signed_url_fn: Callable[[str, str], str] | None = None, ) -> dict[str, Any]: """ 将 AgentChatMessage 转换为 HistoryMessage 格式 转换规则: - role=user: 读取 metadata.user_message_attachments,将 bucket 转临时访问 url - role=tool: 读取 content 和 metadata.tool_agent_output.ui_hints,编译成 ui_schema - role=assistant: 读取 metadata.worker_agent_output.ui_hints,编译成 ui_schema """ role = message.role content = message.content metadata = message.metadata url: str | None = None ui_schema: dict[str, Any] | None = None if role == "user": url = _convert_user_attachments(metadata, get_signed_url_fn) elif role == "tool": ui_schema = _compile_tool_ui_hints(metadata) elif role == "assistant": ui_schema = _compile_worker_ui_hints(metadata) result: dict[str, Any] = { "id": str(message.id), "seq": message.seq, "role": role, "content": content, "timestamp": message.timestamp.isoformat(), } if url: result["url"] = url if ui_schema: result["uiSchema"] = ui_schema return result def _convert_user_attachments( metadata: AgentChatMessageMetadata | dict[str, Any] | None, get_signed_url_fn: Callable[[str, str], str] | None, ) -> str | None: """转换用户附件为临时访问 URL""" if not metadata: return None if isinstance(metadata, AgentChatMessageMetadata): attachments = metadata.user_message_attachments else: attachments_data = metadata.get("user_message_attachments") if not attachments_data: return None attachments = UserMessageAttachments.model_validate(attachments_data) if not attachments or not get_signed_url_fn: return None try: return get_signed_url_fn( {"bucket": attachments.bucket, "path": attachments.path} ) except Exception: return None def _compile_tool_ui_hints( metadata: AgentChatMessageMetadata | dict[str, Any] | None, ) -> dict[str, Any] | None: """编译 tool 消息的 ui_hints""" if not metadata: return None if isinstance(metadata, AgentChatMessageMetadata): tool_output = metadata.tool_agent_output else: tool_output_data = metadata.get("tool_agent_output") if not tool_output_data: return None from schemas.agent.runtime_models import ToolAgentOutput tool_output = ToolAgentOutput.model_validate(tool_output_data) if not tool_output: return None ui_hints = tool_output.ui_hints if not ui_hints: return None try: compiled = compile_ui_hints(ui_hints) return compiled except Exception: return None def _compile_worker_ui_hints( metadata: AgentChatMessageMetadata | dict[str, Any] | None, ) -> dict[str, Any] | None: """编译 assistant 消息的 worker ui_hints""" if not metadata: return None if isinstance(metadata, AgentChatMessageMetadata): worker_output = metadata.worker_agent_output else: worker_output_data = metadata.get("worker_agent_output") if not worker_output_data: return None from schemas.agent.runtime_models import WorkerAgentOutputRich worker_output = WorkerAgentOutputRich.model_validate(worker_output_data) if not worker_output: return None ui_hints = worker_output.ui_hints if not ui_hints: return None try: compiled = compile_ui_hints(ui_hints) return compiled except Exception: return None