""" 历史消息转换工具函数 将数据库中的原始消息转换为 API 响应的数据结构 """ from collections.abc import Callable from typing import Any from core.agentscope.runtime.ui_compiler import compile as compile_ui_hints from schemas.messages.chat_message import ( AgentChatMessage, AgentChatMessageMetadata, extract_user_message_attachments, ) def convert_message_to_history( message: AgentChatMessage, get_signed_url_fn: Callable[[dict[str, str]], str] | None = None, ) -> dict[str, Any]: """ 将 AgentChatMessage 转换为 HistoryMessage 格式 转换规则: - role=user: 读取 metadata.user_message_attachments,转换为 attachments[] - role=assistant: 读取 metadata.worker_agent_output.ui_hints,编译成 ui_schema """ role = message.role content = message.content metadata = message.metadata attachments: list[dict[str, str]] = [] ui_schema: dict[str, Any] | None = None if role == "user": attachments = _convert_user_attachments(metadata, get_signed_url_fn) elif role == "assistant": ui_schema = _compile_worker_ui_hints(metadata) result: dict[str, Any] = { "id": str(message.id), "seq": message.seq, "role": role, "content": content, "timestamp": message.timestamp.isoformat(), } if attachments: result["attachments"] = attachments if ui_schema: result["ui_schema"] = ui_schema return result def _convert_user_attachments( metadata: AgentChatMessageMetadata | dict[str, Any] | None, get_signed_url_fn: Callable[[dict[str, str]], str] | None, ) -> list[dict[str, str]]: """转换用户附件为临时访问 URL 列表""" if not metadata or not get_signed_url_fn: return [] if isinstance(metadata, AgentChatMessageMetadata): resolved = extract_user_message_attachments(metadata) elif isinstance(metadata, dict): resolved = extract_user_message_attachments(metadata) else: return [] signed_attachments: list[dict[str, str]] = [] for attachment in resolved: try: signed_url = get_signed_url_fn( {"bucket": attachment.bucket, "path": attachment.path} ) except Exception: continue signed_attachments.append( { "url": signed_url, "mimeType": attachment.mime_type, } ) return signed_attachments def _compile_worker_ui_hints( metadata: AgentChatMessageMetadata | dict[str, Any] | None, ) -> dict[str, Any] | None: """编译 assistant 消息的 worker ui_hints""" if not metadata: return None if isinstance(metadata, AgentChatMessageMetadata): worker_output = metadata.worker_agent_output else: worker_output_data = metadata.get("worker_agent_output") if not worker_output_data: return None if isinstance(worker_output_data, dict): raw_ui_schema = worker_output_data.get("ui_schema") if isinstance(raw_ui_schema, dict): return raw_ui_schema legacy_ui_schema = worker_output_data.get("uiSchema") if isinstance(legacy_ui_schema, dict): return legacy_ui_schema from schemas.agent.runtime_models import WorkerAgentOutputRich try: worker_output = WorkerAgentOutputRich.model_validate(worker_output_data) except Exception: return None if not worker_output: return None ui_hints = worker_output.ui_hints if not ui_hints: return None try: compiled = compile_ui_hints(ui_hints) return compiled except Exception: return None