feat: 接入起卦后端流程并完善积分扣减链路

This commit is contained in:
qzl
2026-04-03 19:04:46 +08:00
parent a136e42290
commit d87b2e1e3a
56 changed files with 3310 additions and 809 deletions
@@ -17,36 +17,38 @@ from schemas.agent.ui_hints import UiHintsPayload
if TYPE_CHECKING:
pass
_INTERNAL_TO_AGUI: dict[str, EventType] = {
"run.started": EventType.RUN_STARTED,
"run.finished": EventType.RUN_FINISHED,
"run.error": EventType.RUN_ERROR,
"step.start": EventType.STEP_STARTED,
"step.finish": EventType.STEP_FINISHED,
"text.end": EventType.TEXT_MESSAGE_END,
"tool.start": EventType.TOOL_CALL_START,
"tool.args": EventType.TOOL_CALL_ARGS,
"tool.end": EventType.TOOL_CALL_END,
"tool.result": EventType.TOOL_CALL_RESULT,
"state.snapshot": EventType.STATE_SNAPSHOT,
"messages.snapshot": EventType.MESSAGES_SNAPSHOT,
_INTERNAL_TO_AGUI: dict[str, str] = {
"run.started": EventType.RUN_STARTED.value,
"run.finished": EventType.RUN_FINISHED.value,
"run.error": EventType.RUN_ERROR.value,
"step.start": EventType.STEP_STARTED.value,
"step.finish": EventType.STEP_FINISHED.value,
"text.end": EventType.TEXT_MESSAGE_END.value,
"tool.start": EventType.TOOL_CALL_START.value,
"tool.args": EventType.TOOL_CALL_ARGS.value,
"tool.end": EventType.TOOL_CALL_END.value,
"tool.result": EventType.TOOL_CALL_RESULT.value,
"state.snapshot": EventType.STATE_SNAPSHOT.value,
"messages.snapshot": EventType.MESSAGES_SNAPSHOT.value,
}
_ALLOWED_WIRE_TYPES: set[str] = {event_type.value for event_type in EventType}
_ALLOWED_WIRE_TYPES.add("DIVINATION_DERIVED")
def _convert_to_agui_type(internal_type: str) -> EventType:
def _convert_to_agui_type(internal_type: str) -> str:
mapped = _INTERNAL_TO_AGUI.get(internal_type)
if mapped is not None:
return mapped
return EventType(internal_type.upper().replace(".", "_"))
candidate = internal_type.upper().replace(".", "_")
if candidate in _ALLOWED_WIRE_TYPES:
return candidate
raise ValueError(f"unsupported ag-ui event type: {internal_type}")
def _is_agui_event(event: dict[str, Any]) -> bool:
event_type = event.get("type", "")
try:
EventType(event_type)
return True
except ValueError:
return False
event_type = str(event.get("type", "")).strip().upper()
return event_type in _ALLOWED_WIRE_TYPES
def _sanitize_agui_event(event: dict[str, Any]) -> dict[str, Any]:
@@ -153,7 +155,7 @@ def to_agui_wire_event(event: dict[str, Any] | BaseEvent) -> dict[str, Any]:
if internal_type == "text.end" and isinstance(data, dict):
text_end_payload: dict[str, Any] = {
"type": _convert_to_agui_type(internal_type).value,
"type": _convert_to_agui_type(internal_type),
}
if isinstance(thread_id, str) and thread_id:
text_end_payload["threadId"] = thread_id
@@ -174,7 +176,7 @@ def to_agui_wire_event(event: dict[str, Any] | BaseEvent) -> dict[str, Any]:
if internal_type == "tool.result" and isinstance(data, dict):
tool_result_payload: dict[str, Any] = {
"type": _convert_to_agui_type(internal_type).value,
"type": _convert_to_agui_type(internal_type),
}
if isinstance(thread_id, str) and thread_id:
tool_result_payload["threadId"] = thread_id
@@ -203,7 +205,7 @@ def to_agui_wire_event(event: dict[str, Any] | BaseEvent) -> dict[str, Any]:
wire_type = _convert_to_agui_type(internal_type)
payload: dict[str, Any] = {
"type": wire_type.value,
"type": wire_type,
}
if isinstance(thread_id, str) and thread_id:
payload["threadId"] = thread_id
@@ -0,0 +1,18 @@
from __future__ import annotations
from schemas.domain.divination import DerivedDivinationData
def build_divination_user_prompt(*, derived: DerivedDivinationData) -> str:
structured_json = derived.model_dump_json(
by_alias=True,
exclude_none=True,
ensure_ascii=False,
)
return (
f"用户问题:{derived.question}\n"
f"问题类型:{derived.question_type}\n"
"以下是后端推导后的六爻结构化数据(JSON):\n"
f"{structured_json}\n"
"请仅基于以上六爻数据做专业解读。"
)
+62 -27
View File
@@ -13,11 +13,14 @@ from agentscope.message import Msg
from agentscope.tool import Toolkit
from agentscope.model import OpenAIChatModel
from core.agentscope.prompts.system_prompt import build_system_prompt
from core.agentscope.prompts.user_prompt import build_divination_user_prompt
from core.agentscope.schemas.agui_input import extract_latest_user_payload
from core.divination import derive_divination
from core.agentscope.runtime.json_react_agent import JsonReActAgent
from core.agentscope.runtime.model_tracking import TrackingChatModel
from core.agentscope.runtime.stage_emitter import PipelineStageEmitter
from core.agentscope.utils import patch_agentscope_json_repair_compat
from core.agentscope.utils.json_finalize import finalize_json_response
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from models.llm import Llm
@@ -26,9 +29,11 @@ from models.system_agents import SystemAgents
from schemas.agent.forwarded_props import (
ClientTimeContext,
RuntimeMode,
parse_forwarded_props_divination_payload,
parse_forwarded_props_client_time,
parse_forwarded_props_runtime_mode,
)
from schemas.domain.divination import DerivedDivinationData
from schemas.agent.runtime_models import (
WorkerAgentOutputLite,
resolve_worker_output_model,
@@ -97,6 +102,21 @@ class AgentScopeRunner:
worker_toolkit = self._build_toolkit()
if cancel_checker is not None and await cancel_checker():
raise asyncio.CancelledError("run canceled by user")
derived_divination = self._resolve_derived_divination(
run_input=run_input
)
await self._emit_step_event(
pipeline=pipeline,
run_input=run_input,
step_name="divination",
event_type="DIVINATION_DERIVED",
runtime_mode=runtime_mode,
extra_event={
"divination": derived_divination.model_dump(
mode="json", by_alias=True, exclude_none=True
)
},
)
worker_output = await self._execute_worker_step(
pipeline=pipeline,
run_input=run_input,
@@ -106,6 +126,7 @@ class AgentScopeRunner:
stage_config=worker_config,
runtime_client_time=runtime_client_time,
runtime_mode=runtime_mode,
derived_divination=derived_divination,
)
return {
"worker": worker_output.model_dump(mode="json", exclude_none=True),
@@ -187,6 +208,7 @@ class AgentScopeRunner:
stage_config: SystemAgentRuntimeConfig,
runtime_client_time: ClientTimeContext | None,
runtime_mode: RuntimeMode,
derived_divination: DerivedDivinationData,
) -> WorkerAgentOutputLite:
worker_output_model = resolve_worker_output_model()
await self._emit_step_event(
@@ -201,6 +223,7 @@ class AgentScopeRunner:
input_messages=self._build_worker_input_messages(
context_messages=context_messages,
run_input=run_input,
derived_divination=derived_divination,
),
toolkit=toolkit,
run_input=run_input,
@@ -234,6 +257,7 @@ class AgentScopeRunner:
runtime_mode: RuntimeMode,
) -> StageExecutionResult:
tracking_model = self._build_model(stage_config=stage_config)
formatter = OpenAIChatFormatter()
emitter = PipelineStageEmitter(
pipeline=pipeline,
session_id=run_input.thread_id,
@@ -243,32 +267,24 @@ class AgentScopeRunner:
emit_text_events=True,
emit_tool_events=False,
)
agent = self._build_agent(
agent_name=stage_config.agent_type.value,
system_prompt=build_system_prompt(
agent_type=stage_config.agent_type,
llm_config=stage_config.llm_config,
user_context=user_context,
now_utc=datetime.now(timezone.utc),
runtime_client_time=runtime_client_time,
extra_context=stage_config.extra_context,
tools=None,
),
toolkit=toolkit,
model=tracking_model,
emitter=emitter,
system_prompt = build_system_prompt(
agent_type=stage_config.agent_type,
llm_config=stage_config.llm_config,
user_context=user_context,
now_utc=datetime.now(timezone.utc),
runtime_client_time=runtime_client_time,
extra_context=stage_config.extra_context,
tools=None,
)
async with self._active_agent_lock:
self._active_agent = agent
try:
response_msg = await agent.reply_json(
input_messages, output_model=worker_output_model
)
finally:
async with self._active_agent_lock:
if self._active_agent is agent:
self._active_agent = None
worker_payload = worker_output_model.model_validate(response_msg.metadata or {})
_, worker_payload_raw = await finalize_json_response(
model=tracking_model,
formatter=formatter,
base_messages=[Msg("system", system_prompt, "system"), *input_messages],
output_model=worker_output_model,
retries=2,
)
worker_payload = worker_output_model.model_validate(worker_payload_raw)
response_metadata = self._llm_pricing_service.build_usage_metadata(
model=stage_config.model_code,
usage_summary=tracking_model.usage_summary(),
@@ -278,7 +294,12 @@ class AgentScopeRunner:
response_metadata=response_metadata,
)
return StageExecutionResult(
message=response_msg,
message=Msg(
name=stage_config.agent_type.value,
role="assistant",
content=worker_payload.answer,
metadata=worker_payload.model_dump(mode="json", exclude_none=True),
),
payload=worker_payload.model_dump(mode="json", exclude_none=True),
response_metadata=response_metadata,
)
@@ -288,13 +309,16 @@ class AgentScopeRunner:
*,
context_messages: list[Msg],
run_input: RunAgentInput,
derived_divination: DerivedDivinationData,
) -> list[Msg]:
if context_messages:
last = context_messages[-1]
if last.role == "user":
return context_messages
user_text, user_blocks = extract_latest_user_payload(run_input)
_, _ = extract_latest_user_payload(run_input)
user_text = build_divination_user_prompt(derived=derived_divination)
user_blocks = [{"type": "text", "text": user_text}]
if (
user_blocks
and isinstance(user_blocks[0], dict)
@@ -307,6 +331,17 @@ class AgentScopeRunner:
user_msg = Msg(name="user", role="user", content=content)
return [*context_messages, user_msg]
@staticmethod
def _resolve_derived_divination(
*, run_input: RunAgentInput
) -> DerivedDivinationData:
payload = parse_forwarded_props_divination_payload(
getattr(run_input, "forwarded_props", None)
)
if payload is None:
raise ValueError("forwardedProps.divinationPayload is required")
return derive_divination(payload)
def _build_model(
self, *, stage_config: SystemAgentRuntimeConfig
) -> TrackingChatModel: