Files
social-app/backend/src/schemas/agent/consumer_registry.py
T
qzl 0abf51e837 feat(agentscope): add memory system and automation job support
- Add consumer_registry and pipeline_registry for runtime orchestration
- Add Visibility schema for message filtering
- Add PipelineSpec for agent pipeline configuration
- Add automation job models and configuration
- Remove memory_prompt.py (consolidated into memory system)
- Update runtime components: context_loader, context_service, orchestrator, runner, tasks
- Update toolkit: tool_config, tool_middleware, custom tools (calendar, user_lookup)
- Add auth_helpers and calendar_domain utilities
- Add system_agents.yaml configuration
2026-03-19 18:42:35 +08:00

45 lines
1.5 KiB
Python

from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
class AgentConsumerBinding(BaseModel):
model_config = ConfigDict(extra="forbid")
agent_type: str = Field(..., min_length=1, max_length=64)
bit: int = Field(..., ge=16, le=63)
@field_validator("agent_type")
@classmethod
def _normalize_agent_type(cls, value: str) -> str:
normalized = value.strip().lower()
if not normalized:
raise ValueError("agent_type must not be empty")
return normalized
class ConsumerRegistry(BaseModel):
model_config = ConfigDict(extra="forbid")
bindings: list[AgentConsumerBinding] = Field(default_factory=list)
@model_validator(mode="after")
def _validate_unique_bindings(self) -> "ConsumerRegistry":
by_agent: set[str] = set()
by_bit: set[int] = set()
for item in self.bindings:
if item.agent_type in by_agent:
raise ValueError(f"duplicate agent_type binding: {item.agent_type}")
if item.bit in by_bit:
raise ValueError(f"duplicate visibility bit binding: {item.bit}")
by_agent.add(item.agent_type)
by_bit.add(item.bit)
return self
def resolve_agent_bit(self, *, agent_type: str) -> int:
target = agent_type.strip().lower()
for item in self.bindings:
if item.agent_type == target:
return item.bit
raise ValueError(f"agent visibility bit not configured: {target}")