Files
social-app/docs/plans/2026-03-05-user-agent-context-settings-design.md
T

19 KiB
Raw Blame History

UserAgentContext & ProfileSettings v1 设计

Date: 2026-03-05 Status: Approved


目标

为 Agent Runtime 提供完整的用户画像上下文,通过 Pydantic 约束 profiles.settings 结构,确保:

  1. 运行时入口读取 profileusername/bio/settings
  2. settings 结构类型安全、版本可演进
  3. 关键配置(语言/时区/国家)符合标准格式

架构

Profile (DB JSONB)
    ↓
ProfileSettings (Pydantic)
    ↓
UserAgentContext (DataClass)
    ↓
build_global_system_prompt(ctx)

设计原则:

  • 唯一入口:get_user_agent_context(user_id) 读取并构造上下文
  • 不可变:UserAgentContext 使用 frozen dataclass
  • 向后兼容:version 字段预留未来演进

ProfileSettings v1 结构

{
  "version": 1,
  "preferences": {
    "interface_language": "zh-CN",
    "ai_language": "zh-CN",
    "timezone": "Asia/Shanghai",
    "country": "CN"
  },
  "privacy": {},
  "notification": {}
}

字段说明

字段 类型 默认值 约束
version int 1 必须为 1v1 锁定)
preferences.interface_language str "zh-CN" BCP-47 格式
preferences.ai_language str "zh-CN" BCP-47 格式
preferences.timezone str "Asia/Shanghai" IANA 时区
preferences.country str "CN" ISO 3166-1 alpha-2
privacy dict {} 空对象(预留)
notification dict {} 空对象(预留)

约束规则

1. BCP-47 语言格式

正则:^[a-z]{2,3}(-[A-Z][a-z]{3})?(-[A-Z]{2})?$

示例:

  • zh-CN, en-US, zh-TW, ja-JP
  • zh_CN, EN, chn

2. IANA 时区

使用 zoneinfo.ZoneInfo 校验。

示例:

  • Asia/Shanghai, America/New_York, UTC
  • CST, GMT+8

3. ISO 3166-1 alpha-2 国家代码

使用 pycountry.countries.get(alpha_2=...) 校验。

示例:

  • CN, US, JP, GB
  • CHN, USA, zz

UserAgentContext 结构

@dataclass(frozen=True)
class UserAgentContext:
    user_id: UUID
    username: str
    bio: str | None
    settings: ProfileSettings

设计要点:

  • 不可变(frozen=True):防止运行时修改
  • 完整画像:包含身份(username/bio)和配置(settings
  • 唯一构造入口:get_user_agent_context(user_id)

Pydantic 模型实现

from pydantic import BaseModel, Field, field_validator
from dataclasses import dataclass
from uuid import UUID
import re

class PreferenceSettings(BaseModel):
    interface_language: str = "zh-CN"
    ai_language: str = "zh-CN"
    timezone: str = "Asia/Shanghai"
    country: str = "CN"
    
    @field_validator("interface_language", "ai_language")
    @classmethod
    def validate_bcp47(cls, v: str) -> str:
        pattern = r"^[a-z]{2,3}(-[A-Z][a-z]{3})?(-[A-Z]{2})?$"
        if not re.match(pattern, v):
            raise ValueError(f"Invalid BCP-47 language tag: {v}")
        return v
    
    @field_validator("timezone")
    @classmethod
    def validate_iana_timezone(cls, v: str) -> str:
        import zoneinfo
        try:
            zoneinfo.ZoneInfo(v)
        except Exception:
            raise ValueError(f"Invalid IANA timezone: {v}")
        return v
    
    @field_validator("country")
    @classmethod
    def validate_iso_country(cls, v: str) -> str:
        import pycountry
        if not pycountry.countries.get(alpha_2=v.upper()):
            raise ValueError(f"Invalid ISO 3166-1 alpha-2 country code: {v}")
        return v.upper()

class ProfileSettings(BaseModel):
    version: int = Field(default=1, ge=1, le=1)
    preferences: PreferenceSettings = Field(default_factory=PreferenceSettings)
    privacy: dict = Field(default_factory=dict)
    notification: dict = Field(default_factory=dict)

@dataclass(frozen=True)
class UserAgentContext:
    user_id: UUID
    username: str
    bio: str | None
    settings: ProfileSettings

依赖项

需要添加到 backend/pyproject.toml

[project.dependencies]
pycountry = ">=23.0.0"

迁移策略

数据库层:

  • profiles.settings 保持 JSONB,不做 schema 变更
  • 现有数据默认值:{"version": 1, "preferences": {"country": "CN"}}

应用层:

  • 读取时:ProfileSettings.model_validate(profile.settings or {})
  • 写入时:profile.settings = settings.model_dump()

未来演进

版本迁移:

  • Pydantic 支持多版本共存
  • 数据库不做破坏性变更


AG-UI 事件转发与落库策略

核心原则

1. 事件转发时机:

  • 只有 organization 阶段完成后转发 AG-UI 事件
  • AG-UI bridge 已实现底层机制,编排层控制转发时机

2. 落库时机:

  • 意图识别和任务执行阶段:落库但 seq 取负数(用于审计)
  • 结果反馈阶段:seq 取最新 seq 的绝对值 +1(用于展示)

Seq 设计细节

意图识别和任务执行阶段(审计用):

  • seq 取负数(如 -1, -2
  • role: "assistant"(标记为 agent 输出)
  • content: 阶段的完整输出(用于审计/调试)
  • 重建会话时通过 WHERE seq > 0 过滤,不展示给用户

结果反馈阶段(展示用):

  • seq 取正数(取最新负数的绝对值 +1)
  • role: "assistant"
  • content: OrganizationResult.assistant_text
  • 重建会话时通过 WHERE seq > 0 展示给用户

示例:

| seq  | role     | content                    | 展示 |
|------|----------|----------------------------|------|
| -2   | assistant| ExecutionResult (完整)     | 否   |
| -1   | assistant| IntentResult (完整)        | 否   |
| 1    | user     | 用户输入                   | 是   |
| 2    | assistant| OrganizationResult         | 是   |

编排层职责

@listen(intent_stage)
async def persist_intent(self, state: FlowState) -> FlowState:
    # seq 取负数
    seq = await message_repo.get_next_negative_seq(state.session_id)
    await message_repo.create(
        session_id=state.session_id,
        seq=seq,  # 负数
        role="assistant",
        content=state.intent_result.model_dump_json(),
        ...
    )
    return state

@listen(execution_stage)
async def persist_execution(self, state: FlowState) -> FlowState:
    # seq 取负数
    seq = await message_repo.get_next_negative_seq(state.session_id)
    await message_repo.create(
        session_id=state.session_id,
        seq=seq,  # 负数
        role="assistant",
        content=state.execution_result.model_dump_json(),
        ...
    )
    return state

@listen(organization_stage)
async def finalize_flow(self, state: FlowState) -> FlowState:
    result = state.organization_result
    
    # seq 取正数(最新负数绝对值+1)
    seq = await message_repo.get_next_positive_seq(state.session_id)
    await message_repo.create(
        session_id=state.session_id,
        seq=seq,  # 正数
        role="assistant",
        content=result.assistant_text,
        ...
    )
    
    # 触发 AG-UI 事件(由 bridge 处理)
    return state

Token 和 Cost 累加

策略:在内存中累加所有阶段的 token 和 costorganization 完成后统一落库。

@dataclass
class FlowState:
    # ...
    tokens: dict[str, dict] = field(default_factory=dict)
    cost: Decimal = Decimal("0")
    currency: str = "CNY"

CrewAI Flow 三阶段设计

架构概览

User Input + UserAgentContext
    ↓
@start() begin()
    ↓
@listen() intent_stage() → 判断 can_answer_directly
    ↓ (router)
    ├─ DIRECT_RESPONSE → 直接返回
    └─ NEEDS_EXECUTION
           ↓
       @listen() execution_stage() → 任务执行/工具调用
           ↓
       @listen() organization_stage() → 结果组织与表达
           ↓
       返回给用户

三阶段职责

1. Intent Recognition(意图识别)

  • Agent Type: INTENT_RECOGNITION
  • 输出结构(最小化设计):
    class IntentResult(BaseModel):
        direct_answer: bool  # 是否可以直接回答
        intent_analysis: str  # 意图分析文本(用于调试/审计)
        execution_prompt: str  # 给 execution 阶段的提示词(direct_answer=false时使用)
        direct_response: str  # 直接回复文本(direct_answer=true时使用)
    
  • 短路逻辑:
    • direct_answer=true → 完全跳过 execution 和 organization,直接返回 direct_response
    • direct_answer=false → 进入 execution 阶段
  • 输出约束:使用 output_pydantic=IntentResult
  • 落库策略:落库到 messages 表,但重建会话时不展示

2. Task Execution(任务执行)

  • Agent Type: TASK_EXECUTION
  • 输入:IntentResult.execution_prompt + IntentResult.intent_analysis
  • 职责:
    • 执行复杂任务(查询数据库、调用工具、多步骤推理)
    • 返回结构化执行结果
  • 输出结构(最小化设计):
    class ExecutionResult(BaseModel):
        execution_summary: str  # 任务执行摘要(用于调试/审计)
        organization_prompt: str  # 给 organization 阶段的提示词
        execution_data: dict = {}  # 执行结果的结构化数据
    
  • 输出约束:使用 output_pydantic=ExecutionResult
  • 落库策略:落库到 messages 表,但重建会话时不展示

3. Result Reporting(结果报告)

  • Agent Type: RESULT_REPORTING
  • 输入:
    • IntentResult(意图识别结果)
    • ExecutionResult(任务执行情况)
  • 职责:
    • 结合意图分析和执行结果,格式化为用户友好的响应
    • 应用个性化模板(基于 UserAgentContext
  • 输出结构(最小化设计):
    class OrganizationResult(BaseModel):
        assistant_text: str  # 最终回复文本
        response_metadata: dict = {}  # 响应元数据(可选)
    
  • 输出约束:使用 output_pydantic=OrganizationResult
  • 唯一展示阶段:重建会话时只展示此阶段的 message
  • 唯一转发阶段:只有此阶段的输出需要通过 AG-UI 事件转发

Flow 状态管理

@dataclass
class FlowState:
    user_input: str
    context: UserAgentContext
    stage_trace: list[str] = field(default_factory=list)
    intent_result: IntentResult | None = None
    execution_result: ExecutionResult | None = None
    organization_result: OrganizationResult | None = None
    assistant_text: str = ""
    tokens: dict = field(default_factory=dict)
    cost: Decimal = Decimal("0")

数据流向

User Input + UserAgentContext
    ↓
@start() begin()
    ↓
@listen() intent_stage()
    ├─ IntentResult.direct_answer=true
    │    ↓
    │  跳过 execution,直接 organization
    │    ↓
    │  organization_stage(IntentResult.next_stage_prompt, IntentResult.metadata)
    │    ↓
    │  OrganizationResult → AG-UI 事件 + 落库
    │
    └─ IntentResult.direct_answer=false
         ↓
       execution_stage(IntentResult.next_stage_prompt, IntentResult.metadata)
         ↓
       ExecutionResult
         ↓
       organization_stage(ExecutionResult.next_stage_prompt, ExecutionResult.metadata)
         ↓
       OrganizationResult → AG-UI 事件 + 落库

三阶段输出约束

所有阶段使用 output_pydantic 约束输出:

from pydantic import BaseModel

class IntentResult(BaseModel):
    direct_answer: bool
    next_stage_prompt: str
    metadata: dict = {}

class ExecutionResult(BaseModel):
    next_stage_prompt: str
    metadata: dict = {}

class OrganizationResult(BaseModel):
    assistant_text: str
    metadata: dict = {}

# Task 定义
intent_task = Task(
    description="Analyze user intent",
    expected_output="Intent analysis",
    agent=intent_agent,
    output_pydantic=IntentResult,
)

execution_task = Task(
    description="Execute tasks",
    expected_output="Execution result",
    agent=execution_agent,
    output_pydantic=ExecutionResult,
)

organization_task = Task(
    description="Format response",
    expected_output="User-friendly response",
    agent=organization_agent,
    output_pydantic=OrganizationResult,
)

系统选模逻辑设计

问题背景

旧逻辑:order_by(...).limit(1) 随机选择一个系统 agent,不区分阶段。

新逻辑:按 agent_type 显式映射到三阶段。

选模规则

必需的 Agent Types

  • INTENT_RECOGNITION → 用于 intent_stage
  • TASK_EXECUTION → 用于 execution_stage
  • RESULT_REPORTING → 用于 organization_stage

查询逻辑:

REQUIRED_TYPES = {"INTENT_RECOGNITION", "TASK_EXECUTION", "RESULT_REPORTING"}

@dataclass(frozen=True)
class StageModels:
    intent: SystemAgentCatalog
    execution: SystemAgentCatalog
    organization: SystemAgentCatalog

def resolve_stage_models(rows: list[SystemAgentCatalog]) -> StageModels:
    by_type = {row.agent_type: row for row in rows}
    missing = REQUIRED_TYPES - set(by_type.keys())
    if missing:
        raise ValueError(f"Missing required agent types: {missing}")
    
    return StageModels(
        intent=by_type["INTENT_RECOGNITION"],
        execution=by_type["TASK_EXECUTION"],
        organization=by_type["RESULT_REPORTING"],
    )

初始化数据约束:

  • system_agents 表必须包含三种类型的记录
  • 运行时启动时验证完整性

人民币结算策略设计

设计原则

  1. 保留 LiteLLM 语义completion_cost() 始终返回 USD
  2. 业务层映射:根据用户国家(profiles.settings.preferences.country)决定落库货币
  3. 默认人民币:中国用户或无国家信息默认 CNY
  4. 汇率配置USD/CNY 汇率通过环境变量配置

货币来源

UserAgentContext.settings.preferences.country
    ↓
resolve_billing_currency(country)
    ↓
CN → CNY
US → USD
其他 → USD

结算流程

LiteLLM completion_cost()
    ↓ (USD)
resolve_billing_cost(usd_cost, country)
    ↓
    ├─ country="CN" or None → CNY (乘以汇率)
    └─ country="US" → USD (保持原值)
    ↓
messages.cost + messages.currency
sessions.total_cost (同一货币)

汇率配置

# 环境变量
BILLING_USD_CNY_RATE=7.2

# 默认值
DEFAULT_USD_CNY_RATE = Decimal("7.2")

结算模型

@dataclass(frozen=True)
class BillingCost:
    currency: str  # "CNY" or "USD"
    cost: Decimal  # 6位小数精度

def resolve_billing_cost(
    usd_cost: Decimal,
    country: str | None,
    usd_cny_rate: Decimal = DEFAULT_USD_CNY_RATE,
) -> BillingCost:
    currency = "CNY" if (country or "CN").upper() == "CN" else "USD"
    if currency == "CNY":
        cost = usd_cost * usd_cny_rate
    else:
        cost = usd_cost
    return BillingCost(
        currency=currency,
        cost=cost.quantize(Decimal("0.000001"))
    )

数据库落库

messages 表:

  • cost: NUMERIC(12,6) - 业务货币金额
  • currency: VARCHAR(3) - "CNY" or "USD"

sessions 表:

  • total_cost: NUMERIC(12,6) - 同一货币累计

约束:

  • 同一 session 内所有 messages 的 currency 必须一致
  • sessions.total_cost 累加时保持货币一致

Session 状态一致性设计

问题背景

旧逻辑:

  • sessions.statusstate_snapshot.status 不同步
  • 失败时状态不一致
  • title 未自动赋值

状态机

pending (创建)
    ↓
running (开始执行)
    ↓
    ├─ completed (成功)
    └─ failed (异常)

状态同步规则

创建时:

session = AgentChatSession(
    user_id=user_uuid,
    status=AgentChatSessionStatus.PENDING,
    state_snapshot={
        "status": "pending",
        "pending_tool_call_id": None,
    },
)

运行时:

# 开始执行
session.status = AgentChatSessionStatus.RUNNING
session.state_snapshot["status"] = "running"

# 成功完成
session.status = AgentChatSessionStatus.COMPLETED
session.state_snapshot["status"] = "completed"

# 失败
session.status = AgentChatSessionStatus.FAILED
session.state_snapshot["status"] = "failed"
session.state_snapshot["error_id"] = error_id

自动 Title 赋值

规则:

  • 首次运行时,如果 session.title 为空,使用 user_input[:255] 赋值
  • 只在第一次运行时赋值,后续不覆盖

实现:

async def _set_title_if_empty(self, session_id: UUID, title: str) -> None:
    stmt = (
        update(AgentChatSession)
        .where(AgentChatSession.id == session_id)
        .where(AgentChatSession.title.is_(None))
        .values(title=title[:255])
    )
    await self.db.execute(stmt)

Repository 方法

class SessionRepository:
    async def mark_running(self, session_id: UUID) -> None: ...
    async def mark_completed(self, session_id: UUID) -> None: ...
    async def mark_failed(self, session_id: UUID, error_id: str) -> None: ...

全局 Prompt 构建设计

分层结构

全局系统 Prompt
├─ 身份段(username/bio
├─ 偏好段(language/timezone/country
└─ 阶段段(动态注入)
    ├─ intent stage prompt
    ├─ execution stage prompt
    └─ organization stage prompt

构建函数

def build_global_system_prompt(ctx: UserAgentContext) -> str:
    lines = [
        "# User Identity",
        f"username: {ctx.username}",
        f"bio: {ctx.bio or 'N/A'}",
        "",
        "# User Preferences",
        f"interface_language: {ctx.settings.preferences.interface_language}",
        f"ai_language: {ctx.settings.preferences.ai_language}",
        f"timezone: {ctx.settings.preferences.timezone}",
        f"country: {ctx.settings.preferences.country}",
        "",
        "# Instructions",
        "Use the user's preferences to personalize responses.",
        "Respond in the user's preferred AI language.",
        "Consider the user's timezone for time-related queries.",
    ]
    return "\n".join(lines)

阶段注入

每个阶段运行时,在全局 prompt 基础上追加阶段特定的指令:

def build_stage_prompt(
    base_prompt: str,
    stage: str,  # "intent" | "execution" | "organization"
    ctx: UserAgentContext,
) -> str:
    stage_prompts = {
        "intent": "Analyze the user's intent and decide if direct response is possible.",
        "execution": "Execute the required tasks and tools to fulfill the user's request.",
        "organization": "Format the execution results into a user-friendly response.",
    }
    return f"{base_prompt}\n\n# Stage: {stage}\n{stage_prompts[stage]}"

依赖关系图

UserAgentContext (核心上下文)
    ↓
    ├─ ProfileSettings (用户配置)
    │   └─ preferences.country → 人民币结算
    │
    ├─ build_global_system_prompt() (全局 Prompt)
    │   └─ 三阶段 Flow 使用
    │
    └─ resolve_stage_models() (选模逻辑)
        └─ 三阶段 Agent 配置

相关文档