diff --git a/backend/scripts/trigger_feedback_report.py b/backend/scripts/trigger_feedback_report.py index 355f068..8c2b276 100644 --- a/backend/scripts/trigger_feedback_report.py +++ b/backend/scripts/trigger_feedback_report.py @@ -1,4 +1,4 @@ -"""手动触发 worker-general 定时任务:生成反馈报告 +"""手动触发 worker-agent 定时任务:生成反馈报告 用法: cd /home/qzl/Code/eryao/.worktrees/feat-user-feedback @@ -12,13 +12,13 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) -from core.taskiq.app import worker_general_broker +from core.taskiq.app import worker_agent_broker from v1.feedback.tasks import generate_daily_feedback_report def main(): task = generate_daily_feedback_report.kiq() - result = worker_general_broker.wait_result(task, timeout=120) + result = worker_agent_broker.wait_result(task, timeout=120) print(f"Task result: {result.return_value}") diff --git a/backend/src/core/agentscope/events/__init__.py b/backend/src/core/agentscope/events/__init__.py index 5c52772..b69ad2e 100644 --- a/backend/src/core/agentscope/events/__init__.py +++ b/backend/src/core/agentscope/events/__init__.py @@ -1,9 +1,3 @@ -from core.agentscope.events.agui_codec import AgentScopeAgUiCodec, to_agui_wire_event -from core.agentscope.events.pipeline import AgentScopeEventPipeline -from core.agentscope.events.redis_bus import RedisStreamBus -from core.agentscope.events.sse import to_sse_event -from core.agentscope.events.store import NullEventStore, SqlAlchemyEventStore - __all__ = [ "AgentScopeAgUiCodec", "AgentScopeEventPipeline", @@ -13,3 +7,36 @@ __all__ = [ "to_agui_wire_event", "to_sse_event", ] + + +def __getattr__(name: str): + if name in {"AgentScopeAgUiCodec", "to_agui_wire_event"}: + from core.agentscope.events.agui_codec import ( + AgentScopeAgUiCodec, + to_agui_wire_event, + ) + + return { + "AgentScopeAgUiCodec": AgentScopeAgUiCodec, + "to_agui_wire_event": to_agui_wire_event, + }[name] + if name == "AgentScopeEventPipeline": + from core.agentscope.events.pipeline import AgentScopeEventPipeline + + return AgentScopeEventPipeline + if name == "RedisStreamBus": + from core.agentscope.events.redis_bus import RedisStreamBus + + return RedisStreamBus + if name == "to_sse_event": + from core.agentscope.events.sse import to_sse_event + + return to_sse_event + if name in {"NullEventStore", "SqlAlchemyEventStore"}: + from core.agentscope.events.store import NullEventStore, SqlAlchemyEventStore + + return { + "NullEventStore": NullEventStore, + "SqlAlchemyEventStore": SqlAlchemyEventStore, + }[name] + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/backend/src/core/agentscope/runtime/json_react_agent.py b/backend/src/core/agentscope/runtime/json_react_agent.py index 45f4d6a..b54ece2 100644 --- a/backend/src/core/agentscope/runtime/json_react_agent.py +++ b/backend/src/core/agentscope/runtime/json_react_agent.py @@ -6,7 +6,7 @@ from agentscope.agent import ReActAgent from agentscope.message import Msg from pydantic import BaseModel -from core.agentscope.utils import finalize_json_response +from core.agentscope.utils.json_finalize import finalize_json_response class JsonReActAgent(ReActAgent): diff --git a/backend/src/core/agentscope/runtime/orchestrator.py b/backend/src/core/agentscope/runtime/orchestrator.py index 2e429db..c674b0c 100644 --- a/backend/src/core/agentscope/runtime/orchestrator.py +++ b/backend/src/core/agentscope/runtime/orchestrator.py @@ -1,17 +1,18 @@ from __future__ import annotations import asyncio -from typing import Any, Awaitable, Callable, Protocol +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Protocol from ag_ui.core.types import RunAgentInput -from agentscope.message import Msg from openai import APIConnectionError -from core.agentscope.runtime.runner import AgentScopeRunner from core.agentscope.runtime.protocols import PipelineLike from core.logging import get_logger from schemas.agent.runtime_config import RuntimeConfig from schemas.shared.user import UserContext +if TYPE_CHECKING: + from agentscope.message import Msg + logger = get_logger("core.agentscope.runtime.orchestrator") @@ -38,8 +39,12 @@ class AgentScopeRuntimeOrchestrator: pipeline: PipelineLike, runner: RunnerLike | None = None, ) -> None: + if runner is None: + from core.agentscope.runtime.runner import AgentScopeRunner + + runner = AgentScopeRunner() self._pipeline = pipeline - self._runner = runner or AgentScopeRunner() + self._runner = runner async def run( self, diff --git a/backend/src/core/agentscope/runtime/runner.py b/backend/src/core/agentscope/runtime/runner.py index 23dd5d5..edf007c 100644 --- a/backend/src/core/agentscope/runtime/runner.py +++ b/backend/src/core/agentscope/runtime/runner.py @@ -22,7 +22,7 @@ from core.divination import derive_divination from core.agentscope.runtime.json_react_agent import JsonReActAgent from core.agentscope.runtime.model_tracking import TrackingChatModel from core.agentscope.runtime.stage_emitter import PipelineStageEmitter -from core.agentscope.utils import patch_agentscope_json_repair_compat +from core.agentscope.utils.compat import patch_agentscope_json_repair_compat from core.agentscope.utils.json_finalize import finalize_json_response from core.config.settings import config from core.db.session import AsyncSessionLocal diff --git a/backend/src/core/agentscope/runtime/stage_emitter.py b/backend/src/core/agentscope/runtime/stage_emitter.py index 6231c78..fb6b57c 100644 --- a/backend/src/core/agentscope/runtime/stage_emitter.py +++ b/backend/src/core/agentscope/runtime/stage_emitter.py @@ -5,7 +5,7 @@ from uuid import uuid4 from agentscope.message import Msg -from core.agentscope.utils import parse_tool_agent_output +from core.agentscope.utils.parsing import parse_tool_agent_output class PipelineLike(Protocol): diff --git a/backend/src/core/agentscope/runtime/task_handles.py b/backend/src/core/agentscope/runtime/task_handles.py new file mode 100644 index 0000000..5500c55 --- /dev/null +++ b/backend/src/core/agentscope/runtime/task_handles.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from core.taskiq.app import worker_agent_broker + + +@worker_agent_broker.task(task_name="tasks.agentscope.run_command.agent") +async def run_command_task_agent(command: dict[str, object]) -> dict[str, object]: + del command + raise RuntimeError("task handle is only for enqueueing") + + +@worker_agent_broker.task(task_name="tasks.agentscope.run_command.general") +async def run_command_task_general(command: dict[str, object]) -> dict[str, object]: + del command + raise RuntimeError("task handle is only for enqueueing") diff --git a/backend/src/core/agentscope/runtime/tasks.py b/backend/src/core/agentscope/runtime/tasks.py index f2caecf..e398b61 100644 --- a/backend/src/core/agentscope/runtime/tasks.py +++ b/backend/src/core/agentscope/runtime/tasks.py @@ -3,31 +3,13 @@ from __future__ import annotations import asyncio import base64 import json -from typing import Any, cast +from typing import TYPE_CHECKING, Any, cast from uuid import UUID -from agentscope.message import Msg from pydantic import TypeAdapter -from core.agentscope.caches import create_user_context_cache -from core.agentscope.caches.attachment_content_cache import ( - create_attachment_content_cache, -) -from core.agentscope.caches.context_messages_cache import ( - create_context_messages_cache, -) -from core.agentscope.events import ( - AgentScopeAgUiCodec, - AgentScopeEventPipeline, - RedisStreamBus, - SqlAlchemyEventStore, -) -from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator from core.agentscope.schemas.agui_input import parse_run_input -from core.agentscope.services.context_service import AgentContextService -from core.config.settings import config -from core.db.session import AsyncSessionLocal from core.logging import get_logger -from core.taskiq.app import worker_agent_broker, worker_general_broker +from core.taskiq.app import worker_agent_broker from schemas.agent.forwarded_props import ( RuntimeMode, parse_forwarded_props_runtime_mode, @@ -40,12 +22,10 @@ from schemas.domain.chat_message import ( ) from schemas.shared.user import UserContext from schemas.shared.user import parse_profile_settings -from services.base.redis import get_or_init_redis_client -from services.base.supabase import supabase_service -from v1.agent.repository import AgentRepository -from v1.points.repository import PointsRepository from v1.users.repository import SQLAlchemyUserRepository -from v1.points.service import PointsService + +if TYPE_CHECKING: + from agentscope.message import Msg logger = get_logger("core.agentscope.runtime.tasks") _MAX_CONTEXT_ATTACHMENTS = 3 @@ -176,6 +156,8 @@ def _serialize_assistant_context_from_metadata( def _load_runtime() -> type[Any]: + from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator + return AgentScopeRuntimeOrchestrator @@ -186,6 +168,8 @@ async def _build_user_context( session: Any, session_id: str, ) -> UserContext: + from core.agentscope.caches import create_user_context_cache + cache = create_user_context_cache() cached = await cache.get(session_id=UUID(session_id)) if cached: @@ -218,6 +202,17 @@ async def _build_recent_context_messages( runtime_mode: RuntimeMode = RuntimeMode.CHAT, context_config: "MessageContextConfig", ) -> list[Msg]: + from agentscope.message import Msg + from core.agentscope.caches.attachment_content_cache import ( + create_attachment_content_cache, + ) + from core.agentscope.caches.context_messages_cache import ( + create_context_messages_cache, + ) + from core.agentscope.services.context_service import AgentContextService + from services.base.supabase import supabase_service + from v1.agent.repository import AgentRepository + context_cache = create_context_messages_cache() attachment_cache = create_attachment_content_cache() raw_messages = await context_cache.get( @@ -353,6 +348,18 @@ async def _build_recent_context_messages( async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: + from core.agentscope.events import ( + AgentScopeAgUiCodec, + AgentScopeEventPipeline, + RedisStreamBus, + SqlAlchemyEventStore, + ) + from core.config.settings import config + from core.db.session import AsyncSessionLocal + from services.base.redis import get_or_init_redis_client + from v1.points.repository import PointsRepository + from v1.points.service import PointsService + command_type = str(command.get("command", "run")).strip().lower() raw_owner_id = command.get("owner_id") raw_owner_email = command.get("owner_email") @@ -485,6 +492,6 @@ async def run_command_task_agent(command: dict[str, object]) -> dict[str, object return await run_agentscope_task(command) -@worker_general_broker.task(task_name="tasks.agentscope.run_command.general") +@worker_agent_broker.task(task_name="tasks.agentscope.run_command.general") async def run_command_task_general(command: dict[str, object]) -> dict[str, object]: return await run_agentscope_task(command) diff --git a/backend/src/core/taskiq/app.py b/backend/src/core/taskiq/app.py index 22ab470..3d6138f 100644 --- a/backend/src/core/taskiq/app.py +++ b/backend/src/core/taskiq/app.py @@ -23,7 +23,7 @@ def _build_broker(queue_name: str) -> ListQueueBroker: worker_agent_broker = _build_broker("agent") -worker_general_broker = _build_broker("general") +worker_general_broker = worker_agent_broker broker = worker_agent_broker diff --git a/backend/src/v1/agent/asr.py b/backend/src/v1/agent/asr.py index 36ee940..75d0c60 100644 --- a/backend/src/v1/agent/asr.py +++ b/backend/src/v1/agent/asr.py @@ -3,9 +3,6 @@ from __future__ import annotations import asyncio from typing import Any -import dashscope -from dashscope.audio.asr import Recognition, RecognitionCallback - from core.config.settings import config from core.logging import get_logger @@ -28,6 +25,9 @@ class AsrService: async def transcribe_file(self, file_path: str, filename: str) -> str: try: + import dashscope + from dashscope.audio.asr import Recognition, RecognitionCallback + dashscope.api_key = self._get_api_key() loop = asyncio.get_event_loop() diff --git a/backend/src/v1/agent/dependencies.py b/backend/src/v1/agent/dependencies.py index 1750607..693074c 100644 --- a/backend/src/v1/agent/dependencies.py +++ b/backend/src/v1/agent/dependencies.py @@ -48,7 +48,7 @@ class TaskiqQueueClient: @staticmethod def _select_queue_task(command: dict[str, object]) -> Any: - from core.agentscope.runtime.tasks import ( + from core.agentscope.runtime.task_handles import ( run_command_task_agent, run_command_task_general, ) diff --git a/backend/src/v1/feedback/tasks.py b/backend/src/v1/feedback/tasks.py index fe3db1e..a623adf 100644 --- a/backend/src/v1/feedback/tasks.py +++ b/backend/src/v1/feedback/tasks.py @@ -5,15 +5,11 @@ from pathlib import Path from sqlalchemy import select from structlog import get_logger -from taskiq_redis import RedisScheduleSource from core.config.settings import config from core.db.session import AsyncSessionLocal -from core.email.sender import EmailAttachment, EmailMessage, EmailSender -from core.email.template_loader import load_template -from core.taskiq.app import worker_general_broker +from core.taskiq.app import worker_agent_broker from models.user_feedback import UserFeedback -from v1.feedback.report import generate_feedback_report logger = get_logger("v1.feedback.tasks") @@ -52,6 +48,8 @@ def _build_report_email_html( end_time: datetime, push_hour: int, ) -> str: + from core.email.template_loader import load_template + template = load_template("feedback", "daily_report.html") return template.substitute( start_date=start_time.strftime("%Y-%m-%d"), @@ -70,6 +68,8 @@ def _build_no_feedback_email_html( end_time: datetime, push_hour: int, ) -> str: + from core.email.template_loader import load_template + template = load_template("feedback", "no_feedback.html") return template.substitute( start_date=start_time.strftime("%Y-%m-%d"), @@ -87,6 +87,8 @@ async def _send_feedback_email( push_hour: int, report_path: Path | None = None, ) -> bool: + from core.email.sender import EmailAttachment, EmailMessage, EmailSender + sender = EmailSender() if feedbacks: @@ -123,12 +125,14 @@ async def _send_feedback_email( # type: ignore reportArgumentType for taskiq decorator -@worker_general_broker.on_event("startup") # pyright: ignore[reportArgumentType] +@worker_agent_broker.on_event("startup") # pyright: ignore[reportArgumentType] async def _register_feedback_report_schedule() -> None: if not config.feedback_report.enabled: logger.info("Feedback report scheduling disabled") return + from taskiq_redis import RedisScheduleSource + schedule_source = RedisScheduleSource( url=config.taskiq_broker_url, prefix="schedule:feedback", @@ -146,12 +150,14 @@ async def _register_feedback_report_schedule() -> None: ) -@worker_general_broker.task(task_name="tasks.feedback.generate_daily_report") +@worker_agent_broker.task(task_name="tasks.feedback.generate_daily_report") async def generate_daily_feedback_report() -> str | None: if not config.feedback_report.enabled: logger.info("Feedback report is disabled, skipping") return None + from v1.feedback.report import generate_feedback_report + now = datetime.now(timezone.utc) push_hour = 10 end_time = now.replace(hour=push_hour, minute=0, second=0, microsecond=0) @@ -192,6 +198,8 @@ async def generate_daily_feedback_report() -> str | None: async def generate_all_feedback_report() -> Path: + from v1.feedback.report import generate_feedback_report + async with AsyncSessionLocal() as session: stmt = select(UserFeedback).order_by(UserFeedback.created_at.desc()) result = await session.execute(stmt) diff --git a/deploy/README.md b/deploy/README.md index 1116d8f..5811027 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -71,6 +71,24 @@ ERYAO_DEPLOY_BIND_HOST=127.0.0.1 ERYAO_DEPLOY_BIND_HOST=0.0.0.0 ``` +### 进程配置建议 + +生产 Compose 只启动一个 `worker-agent` 容器。Agent 任务、低频通用任务和反馈日报任务共用 `agent` 队列,不再单独常驻 `worker-general` 进程。 + +2 核 2G 机器建议使用: + +```text +ERYAO_WEB__WORKERS=1 +ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY=2 +``` + +4G 以上机器可按流量提高 Web 或 Agent worker 数量: + +```text +ERYAO_WEB__WORKERS=2 +ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY=2 +``` + ## 登录 ECR 进入部署目录,并把 `.env` 加载到当前 shell: @@ -128,7 +146,6 @@ cd deploy docker compose --env-file ./.env -f docker-compose.prod.yml --profile workers ps docker logs -f eryao-prod-backend docker logs -f eryao-prod-worker-agent -docker logs -f eryao-prod-worker-general docker logs -f eryao-prod-redis ``` diff --git a/deploy/docker-compose.prod.yml b/deploy/docker-compose.prod.yml index 1284570..e87d152 100644 --- a/deploy/docker-compose.prod.yml +++ b/deploy/docker-compose.prod.yml @@ -34,21 +34,7 @@ services: command: - sh - -c - - exec taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2} - - worker-general: - <<: *backend-common - container_name: eryao-prod-worker-general - profiles: ["workers"] - environment: - ERYAO_RUNTIME__ENVIRONMENT: prod - ERYAO_RUNTIME__SERVICE_NAME: worker-general - ERYAO_REDIS__HOST: redis - ERYAO_REDIS__PORT: 6379 - command: - - sh - - -c - - exec taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__GENERAL__CONCURRENCY:-1} + - exec taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2} redis: image: redis:7.4.2-alpine diff --git a/infra/scripts/app.sh b/infra/scripts/app.sh index 6233b95..a41ef98 100755 --- a/infra/scripts/app.sh +++ b/infra/scripts/app.sh @@ -153,14 +153,12 @@ start() { WEB_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=web uv run uvicorn app:app --host ${ERYAO_WEB__HOST:-0.0.0.0} --port ${WEB_PORT} --workers ${ERYAO_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}" - WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2}" - WORKER_GENERAL_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=worker-general uv run taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__GENERAL__CONCURRENCY:-1}" + WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src ERYAO_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks v1.feedback.tasks --workers ${ERYAO_WORKER__GROUPS__AGENT__CONCURRENCY:-2}" echo "Starting tmux web process in session '$SESSION_NAME'..." tmux new-session -d -s "$SESSION_NAME" -n web "bash -lc \"$WEB_CMD; echo '[web] exited'; exec bash\"" tmux new-window -t "$SESSION_NAME" -n worker-agent "bash -lc \"$WORKER_AGENT_CMD; echo '[worker-agent] exited'; exec bash\"" - tmux new-window -t "$SESSION_NAME" -n worker-general "bash -lc \"$WORKER_GENERAL_CMD; echo '[worker-general] exited'; exec bash\"" echo "" echo "=== App Started ===" @@ -170,7 +168,6 @@ start() { echo "Log files will be created in logs/ directory:" echo " - web.log, web.error.log" echo " - worker-agent.log, worker-agent.error.log" - echo " - worker-general.log, worker-general.error.log" echo "" echo "tmux attach -t $SESSION_NAME" echo "tmux list-windows -t $SESSION_NAME"