Files
social-app/backend/tests/unit/schemas/agent/test_pipeline_spec.py
T
qzl 0abf51e837 feat(agentscope): add memory system and automation job support
- Add consumer_registry and pipeline_registry for runtime orchestration
- Add Visibility schema for message filtering
- Add PipelineSpec for agent pipeline configuration
- Add automation job models and configuration
- Remove memory_prompt.py (consolidated into memory system)
- Update runtime components: context_loader, context_service, orchestrator, runner, tasks
- Update toolkit: tool_config, tool_middleware, custom tools (calendar, user_lookup)
- Add auth_helpers and calendar_domain utilities
- Add system_agents.yaml configuration
2026-03-19 18:42:35 +08:00

38 lines
1.0 KiB
Python

from __future__ import annotations
import pytest
from schemas.agent.consumer_registry import AgentConsumerBinding, ConsumerRegistry
from schemas.agent.pipeline_spec import (
ContextPolicy,
ExecutorKind,
PipelineSpec,
StageSpec,
)
def test_consumer_registry_rejects_duplicate_bits() -> None:
with pytest.raises(ValueError, match="duplicate visibility bit"):
ConsumerRegistry(
bindings=[
AgentConsumerBinding(agent_type="router", bit=16),
AgentConsumerBinding(agent_type="worker", bit=16),
]
)
def test_pipeline_spec_requires_non_empty_stages() -> None:
with pytest.raises(ValueError, match="at least 1 item"):
PipelineSpec(mode="worker", stages=[])
def test_stage_spec_normalizes_stage_name() -> None:
spec = StageSpec(
stage_name=" Worker ",
executor_kind=ExecutorKind.REACT,
default_visibility_mask=1,
context_policy=ContextPolicy(consumer_agent_type="worker", count=20),
)
assert spec.stage_name == "worker"