120df903d2
- 前端: 添加 SSE 流式支持、stateSnapshot 事件、路由导航工具 - 前端: 实现工具调用审批流程,支持 pending 状态展示 - 后端: Agent 状态管理与会话持久化相关重构 - 文档: 新增 agent-agui-full-alignance 设计文档 - 测试: 补充相关单元测试和集成测试
34 lines
917 B
Python
34 lines
917 B
Python
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from v1.agent import router as agent_router
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_allow_run_request_fails_closed_when_redis_unavailable(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
async def _raise_redis_error():
|
|
raise RuntimeError("redis unavailable")
|
|
|
|
monkeypatch.setattr(agent_router, "get_or_init_redis_client", _raise_redis_error)
|
|
|
|
allowed = await agent_router._allow_run_request(user_id="user-1")
|
|
|
|
assert allowed is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_acquire_sse_slot_fails_closed_when_redis_unavailable(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
async def _raise_redis_error():
|
|
raise RuntimeError("redis unavailable")
|
|
|
|
monkeypatch.setattr(agent_router, "get_or_init_redis_client", _raise_redis_error)
|
|
|
|
allowed = await agent_router._acquire_sse_slot(user_id="user-1")
|
|
|
|
assert allowed is False
|