fix(deploy): reduce backend worker footprint

This commit is contained in:
ZL-Q
2026-04-29 21:28:21 +08:00
parent b17862bff7
commit 203cdd9330
15 changed files with 136 additions and 74 deletions
+3 -3
View File
@@ -1,4 +1,4 @@
"""手动触发 worker-general 定时任务:生成反馈报告
"""手动触发 worker-agent 定时任务:生成反馈报告
用法:
cd /home/qzl/Code/eryao/.worktrees/feat-user-feedback
@@ -12,13 +12,13 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from core.taskiq.app import worker_general_broker
from core.taskiq.app import worker_agent_broker
from v1.feedback.tasks import generate_daily_feedback_report
def main():
task = generate_daily_feedback_report.kiq()
result = worker_general_broker.wait_result(task, timeout=120)
result = worker_agent_broker.wait_result(task, timeout=120)
print(f"Task result: {result.return_value}")
+33 -6
View File
@@ -1,9 +1,3 @@
from core.agentscope.events.agui_codec import AgentScopeAgUiCodec, to_agui_wire_event
from core.agentscope.events.pipeline import AgentScopeEventPipeline
from core.agentscope.events.redis_bus import RedisStreamBus
from core.agentscope.events.sse import to_sse_event
from core.agentscope.events.store import NullEventStore, SqlAlchemyEventStore
__all__ = [
"AgentScopeAgUiCodec",
"AgentScopeEventPipeline",
@@ -13,3 +7,36 @@ __all__ = [
"to_agui_wire_event",
"to_sse_event",
]
def __getattr__(name: str):
if name in {"AgentScopeAgUiCodec", "to_agui_wire_event"}:
from core.agentscope.events.agui_codec import (
AgentScopeAgUiCodec,
to_agui_wire_event,
)
return {
"AgentScopeAgUiCodec": AgentScopeAgUiCodec,
"to_agui_wire_event": to_agui_wire_event,
}[name]
if name == "AgentScopeEventPipeline":
from core.agentscope.events.pipeline import AgentScopeEventPipeline
return AgentScopeEventPipeline
if name == "RedisStreamBus":
from core.agentscope.events.redis_bus import RedisStreamBus
return RedisStreamBus
if name == "to_sse_event":
from core.agentscope.events.sse import to_sse_event
return to_sse_event
if name in {"NullEventStore", "SqlAlchemyEventStore"}:
from core.agentscope.events.store import NullEventStore, SqlAlchemyEventStore
return {
"NullEventStore": NullEventStore,
"SqlAlchemyEventStore": SqlAlchemyEventStore,
}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
@@ -6,7 +6,7 @@ from agentscope.agent import ReActAgent
from agentscope.message import Msg
from pydantic import BaseModel
from core.agentscope.utils import finalize_json_response
from core.agentscope.utils.json_finalize import finalize_json_response
class JsonReActAgent(ReActAgent):
@@ -1,17 +1,18 @@
from __future__ import annotations
import asyncio
from typing import Any, Awaitable, Callable, Protocol
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Protocol
from ag_ui.core.types import RunAgentInput
from agentscope.message import Msg
from openai import APIConnectionError
from core.agentscope.runtime.runner import AgentScopeRunner
from core.agentscope.runtime.protocols import PipelineLike
from core.logging import get_logger
from schemas.agent.runtime_config import RuntimeConfig
from schemas.shared.user import UserContext
if TYPE_CHECKING:
from agentscope.message import Msg
logger = get_logger("core.agentscope.runtime.orchestrator")
@@ -38,8 +39,12 @@ class AgentScopeRuntimeOrchestrator:
pipeline: PipelineLike,
runner: RunnerLike | None = None,
) -> None:
if runner is None:
from core.agentscope.runtime.runner import AgentScopeRunner
runner = AgentScopeRunner()
self._pipeline = pipeline
self._runner = runner or AgentScopeRunner()
self._runner = runner
async def run(
self,
@@ -22,7 +22,7 @@ from core.divination import derive_divination
from core.agentscope.runtime.json_react_agent import JsonReActAgent
from core.agentscope.runtime.model_tracking import TrackingChatModel
from core.agentscope.runtime.stage_emitter import PipelineStageEmitter
from core.agentscope.utils import patch_agentscope_json_repair_compat
from core.agentscope.utils.compat import patch_agentscope_json_repair_compat
from core.agentscope.utils.json_finalize import finalize_json_response
from core.config.settings import config
from core.db.session import AsyncSessionLocal
@@ -5,7 +5,7 @@ from uuid import uuid4
from agentscope.message import Msg
from core.agentscope.utils import parse_tool_agent_output
from core.agentscope.utils.parsing import parse_tool_agent_output
class PipelineLike(Protocol):
@@ -0,0 +1,15 @@
from __future__ import annotations
from core.taskiq.app import worker_agent_broker
@worker_agent_broker.task(task_name="tasks.agentscope.run_command.agent")
async def run_command_task_agent(command: dict[str, object]) -> dict[str, object]:
del command
raise RuntimeError("task handle is only for enqueueing")
@worker_agent_broker.task(task_name="tasks.agentscope.run_command.general")
async def run_command_task_general(command: dict[str, object]) -> dict[str, object]:
del command
raise RuntimeError("task handle is only for enqueueing")
+33 -26
View File
@@ -3,31 +3,13 @@ from __future__ import annotations
import asyncio
import base64
import json
from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast
from uuid import UUID
from agentscope.message import Msg
from pydantic import TypeAdapter
from core.agentscope.caches import create_user_context_cache
from core.agentscope.caches.attachment_content_cache import (
create_attachment_content_cache,
)
from core.agentscope.caches.context_messages_cache import (
create_context_messages_cache,
)
from core.agentscope.events import (
AgentScopeAgUiCodec,
AgentScopeEventPipeline,
RedisStreamBus,
SqlAlchemyEventStore,
)
from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator
from core.agentscope.schemas.agui_input import parse_run_input
from core.agentscope.services.context_service import AgentContextService
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.logging import get_logger
from core.taskiq.app import worker_agent_broker, worker_general_broker
from core.taskiq.app import worker_agent_broker
from schemas.agent.forwarded_props import (
RuntimeMode,
parse_forwarded_props_runtime_mode,
@@ -40,12 +22,10 @@ from schemas.domain.chat_message import (
)
from schemas.shared.user import UserContext
from schemas.shared.user import parse_profile_settings
from services.base.redis import get_or_init_redis_client
from services.base.supabase import supabase_service
from v1.agent.repository import AgentRepository
from v1.points.repository import PointsRepository
from v1.users.repository import SQLAlchemyUserRepository
from v1.points.service import PointsService
if TYPE_CHECKING:
from agentscope.message import Msg
logger = get_logger("core.agentscope.runtime.tasks")
_MAX_CONTEXT_ATTACHMENTS = 3
@@ -176,6 +156,8 @@ def _serialize_assistant_context_from_metadata(
def _load_runtime() -> type[Any]:
from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator
return AgentScopeRuntimeOrchestrator
@@ -186,6 +168,8 @@ async def _build_user_context(
session: Any,
session_id: str,
) -> UserContext:
from core.agentscope.caches import create_user_context_cache
cache = create_user_context_cache()
cached = await cache.get(session_id=UUID(session_id))
if cached:
@@ -218,6 +202,17 @@ async def _build_recent_context_messages(
runtime_mode: RuntimeMode = RuntimeMode.CHAT,
context_config: "MessageContextConfig",
) -> list[Msg]:
from agentscope.message import Msg
from core.agentscope.caches.attachment_content_cache import (
create_attachment_content_cache,
)
from core.agentscope.caches.context_messages_cache import (
create_context_messages_cache,
)
from core.agentscope.services.context_service import AgentContextService
from services.base.supabase import supabase_service
from v1.agent.repository import AgentRepository
context_cache = create_context_messages_cache()
attachment_cache = create_attachment_content_cache()
raw_messages = await context_cache.get(
@@ -353,6 +348,18 @@ async def _build_recent_context_messages(
async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]:
from core.agentscope.events import (
AgentScopeAgUiCodec,
AgentScopeEventPipeline,
RedisStreamBus,
SqlAlchemyEventStore,
)
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from services.base.redis import get_or_init_redis_client
from v1.points.repository import PointsRepository
from v1.points.service import PointsService
command_type = str(command.get("command", "run")).strip().lower()
raw_owner_id = command.get("owner_id")
raw_owner_email = command.get("owner_email")
@@ -485,6 +492,6 @@ async def run_command_task_agent(command: dict[str, object]) -> dict[str, object
return await run_agentscope_task(command)
@worker_general_broker.task(task_name="tasks.agentscope.run_command.general")
@worker_agent_broker.task(task_name="tasks.agentscope.run_command.general")
async def run_command_task_general(command: dict[str, object]) -> dict[str, object]:
return await run_agentscope_task(command)
+1 -1
View File
@@ -23,7 +23,7 @@ def _build_broker(queue_name: str) -> ListQueueBroker:
worker_agent_broker = _build_broker("agent")
worker_general_broker = _build_broker("general")
worker_general_broker = worker_agent_broker
broker = worker_agent_broker
+3 -3
View File
@@ -3,9 +3,6 @@ from __future__ import annotations
import asyncio
from typing import Any
import dashscope
from dashscope.audio.asr import Recognition, RecognitionCallback
from core.config.settings import config
from core.logging import get_logger
@@ -28,6 +25,9 @@ class AsrService:
async def transcribe_file(self, file_path: str, filename: str) -> str:
try:
import dashscope
from dashscope.audio.asr import Recognition, RecognitionCallback
dashscope.api_key = self._get_api_key()
loop = asyncio.get_event_loop()
+1 -1
View File
@@ -48,7 +48,7 @@ class TaskiqQueueClient:
@staticmethod
def _select_queue_task(command: dict[str, object]) -> Any:
from core.agentscope.runtime.tasks import (
from core.agentscope.runtime.task_handles import (
run_command_task_agent,
run_command_task_general,
)
+15 -7
View File
@@ -5,15 +5,11 @@ from pathlib import Path
from sqlalchemy import select
from structlog import get_logger
from taskiq_redis import RedisScheduleSource
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.email.sender import EmailAttachment, EmailMessage, EmailSender
from core.email.template_loader import load_template
from core.taskiq.app import worker_general_broker
from core.taskiq.app import worker_agent_broker
from models.user_feedback import UserFeedback
from v1.feedback.report import generate_feedback_report
logger = get_logger("v1.feedback.tasks")
@@ -52,6 +48,8 @@ def _build_report_email_html(
end_time: datetime,
push_hour: int,
) -> str:
from core.email.template_loader import load_template
template = load_template("feedback", "daily_report.html")
return template.substitute(
start_date=start_time.strftime("%Y-%m-%d"),
@@ -70,6 +68,8 @@ def _build_no_feedback_email_html(
end_time: datetime,
push_hour: int,
) -> str:
from core.email.template_loader import load_template
template = load_template("feedback", "no_feedback.html")
return template.substitute(
start_date=start_time.strftime("%Y-%m-%d"),
@@ -87,6 +87,8 @@ async def _send_feedback_email(
push_hour: int,
report_path: Path | None = None,
) -> bool:
from core.email.sender import EmailAttachment, EmailMessage, EmailSender
sender = EmailSender()
if feedbacks:
@@ -123,12 +125,14 @@ async def _send_feedback_email(
# type: ignore reportArgumentType for taskiq decorator
@worker_general_broker.on_event("startup") # pyright: ignore[reportArgumentType]
@worker_agent_broker.on_event("startup") # pyright: ignore[reportArgumentType]
async def _register_feedback_report_schedule() -> None:
if not config.feedback_report.enabled:
logger.info("Feedback report scheduling disabled")
return
from taskiq_redis import RedisScheduleSource
schedule_source = RedisScheduleSource(
url=config.taskiq_broker_url,
prefix="schedule:feedback",
@@ -146,12 +150,14 @@ async def _register_feedback_report_schedule() -> None:
)
@worker_general_broker.task(task_name="tasks.feedback.generate_daily_report")
@worker_agent_broker.task(task_name="tasks.feedback.generate_daily_report")
async def generate_daily_feedback_report() -> str | None:
if not config.feedback_report.enabled:
logger.info("Feedback report is disabled, skipping")
return None
from v1.feedback.report import generate_feedback_report
now = datetime.now(timezone.utc)
push_hour = 10
end_time = now.replace(hour=push_hour, minute=0, second=0, microsecond=0)
@@ -192,6 +198,8 @@ async def generate_daily_feedback_report() -> str | None:
async def generate_all_feedback_report() -> Path:
from v1.feedback.report import generate_feedback_report
async with AsyncSessionLocal() as session:
stmt = select(UserFeedback).order_by(UserFeedback.created_at.desc())
result = await session.execute(stmt)
+18 -1
View File
@@ -71,6 +71,24 @@ ERYAO_DEPLOY_BIND_HOST=127.0.0.1
ERYAO_DEPLOY_BIND_HOST=0.0.0.0
```
### 进程配置建议
生产 Compose 只启动一个 `worker-agent` 容器。Agent 任务、低频通用任务和反馈日报任务共用 `agent` 队列,不再单独常驻 `worker-general` 进程。
2 核 2G 机器建议使用:
```text
ERYAO_WEB__WORKERS=1
ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY=2
```
4G 以上机器可按流量提高 Web 或 Agent worker 数量:
```text
ERYAO_WEB__WORKERS=2
ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY=2
```
## 登录 ECR
进入部署目录,并把 `.env` 加载到当前 shell
@@ -128,7 +146,6 @@ cd deploy
docker compose --env-file ./.env -f docker-compose.prod.yml --profile workers ps
docker logs -f eryao-prod-backend
docker logs -f eryao-prod-worker-agent
docker logs -f eryao-prod-worker-general
docker logs -f eryao-prod-redis
```
+1 -15
View File
@@ -34,21 +34,7 @@ services:
command:
- sh
- -c
- exec taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2}
worker-general:
<<: *backend-common
container_name: eryao-prod-worker-general
profiles: ["workers"]
environment:
ERYAO_RUNTIME__ENVIRONMENT: prod
ERYAO_RUNTIME__SERVICE_NAME: worker-general
ERYAO_REDIS__HOST: redis
ERYAO_REDIS__PORT: 6379
command:
- sh
- -c
- exec taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__GENERAL__CONCURRENCY:-1}
- exec taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2}
redis:
image: redis:7.4.2-alpine
+1 -4
View File
@@ -153,14 +153,12 @@ start() {
WEB_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=web uv run uvicorn app:app --host ${ERYAO_WEB__HOST:-0.0.0.0} --port ${WEB_PORT} --workers ${ERYAO_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}"
WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2}"
WORKER_GENERAL_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=worker-general uv run taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__GENERAL__CONCURRENCY:-1}"
WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2}"
echo "Starting tmux web process in session '$SESSION_NAME'..."
tmux new-session -d -s "$SESSION_NAME" -n web "bash -lc \"$WEB_CMD; echo '[web] exited'; exec bash\""
tmux new-window -t "$SESSION_NAME" -n worker-agent "bash -lc \"$WORKER_AGENT_CMD; echo '[worker-agent] exited'; exec bash\""
tmux new-window -t "$SESSION_NAME" -n worker-general "bash -lc \"$WORKER_GENERAL_CMD; echo '[worker-general] exited'; exec bash\""
echo ""
echo "=== App Started ==="
@@ -170,7 +168,6 @@ start() {
echo "Log files will be created in logs/ directory:"
echo " - web.log, web.error.log"
echo " - worker-agent.log, worker-agent.error.log"
echo " - worker-general.log, worker-general.error.log"
echo ""
echo "tmux attach -t $SESSION_NAME"
echo "tmux list-windows -t $SESSION_NAME"