From 19a2cd451d41ec01a0494dd16812ed0baf9e10c2 Mon Sep 17 00:00:00 2001 From: qzl Date: Fri, 20 Mar 2026 14:06:37 +0800 Subject: [PATCH] refactor(runtime): decouple scheduler tasks and align worker group names --- .env.example | 13 ++--- backend/src/core/agentscope/runtime/tasks.py | 52 -------------------- backend/src/core/automation/scheduler.py | 46 +++++++++++++++++ backend/src/core/automation/tasks.py | 14 ++++++ backend/src/core/runtime/cli.py | 30 ++++++++--- deploy/.env.prod.example | 13 ++--- deploy/docker-compose.prod.yml | 4 +- infra/scripts/app.sh | 4 +- pyproject.toml | 1 + 9 files changed, 97 insertions(+), 80 deletions(-) create mode 100644 backend/src/core/automation/tasks.py diff --git a/.env.example b/.env.example index 5b97a2d..c4bc8c3 100644 --- a/.env.example +++ b/.env.example @@ -28,14 +28,11 @@ SOCIAL_REDIS__DB=0 ############ # Worker 队列分组配置(显式参数控制) ############ -# critical: 用户同步感知任务(验证码发送、鉴权后置关键动作) -# default: 常规异步任务 -# bulk: 批处理/重计算/可延迟任务 -SOCIAL_WORKER__GROUPS__CRITICAL__CONCURRENCY=2 - -SOCIAL_WORKER__GROUPS__DEFAULT__CONCURRENCY=2 - -SOCIAL_WORKER__GROUPS__BULK__CONCURRENCY=1 +# Worker 队列分组配置 +# agent: 常规异步任务 +# automation: 批处理/重计算/可延迟任务 +SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY=2 +SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=2 ############ # Automation 调度器配置 diff --git a/backend/src/core/agentscope/runtime/tasks.py b/backend/src/core/agentscope/runtime/tasks.py index cf66163..5a6a80b 100644 --- a/backend/src/core/agentscope/runtime/tasks.py +++ b/backend/src/core/agentscope/runtime/tasks.py @@ -2,7 +2,6 @@ from __future__ import annotations import base64 import json -from datetime import timezone from typing import Any, cast from uuid import UUID @@ -17,11 +16,6 @@ from core.agentscope.runtime.context_service import AgentContextService from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator from core.agentscope.runtime.pipeline_registry import build_default_pipeline_spec from core.agentscope.schemas.agui_input import parse_run_input -from core.automation.scheduler import ( - AutomationSchedulerService, - SqlAlchemyAutomationSchedulerRepository, - utc_now, -) from core.auth.models import CurrentUser from core.config.settings import config from core.db.session import AsyncSessionLocal @@ -46,18 +40,6 @@ logger = get_logger("core.agentscope.runtime.tasks") _MAX_CONTEXT_ATTACHMENTS = 3 -class _BulkQueueAdapter: - async def enqueue( - self, - *, - command: dict[str, object], - dedup_key: str | None, - ) -> str: - del dedup_key - result = await run_command_task_bulk.kiq(command) - return str(result.task_id) - - def _serialize_tool_agent_output( *, metadata: AgentChatMessageMetadata | dict[str, object] | None, @@ -314,35 +296,6 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: } -async def run_automation_scheduler_scan( - *, - limit: int | None = None, -) -> dict[str, int]: - now = utc_now() - safe_limit = ( - max(int(limit), 1) - if isinstance(limit, int) - else int(config.automation_scheduler.batch_limit) - ) - async with AsyncSessionLocal() as session: - repository = SqlAlchemyAutomationSchedulerRepository(session=session) - service = AutomationSchedulerService( - repository=repository, - queue=_BulkQueueAdapter(), - ) - result = await service.scan_and_dispatch(now_utc=now, limit=safe_limit) - logger.info( - "automation scheduler scan completed", - scanned=result.scanned, - dispatched=result.dispatched, - now_utc=now.astimezone(timezone.utc).isoformat(), - ) - return { - "scanned": int(result.scanned), - "dispatched": int(result.dispatched), - } - - @default_broker.task(task_name="tasks.agentscope.run_command") async def run_command_task(command: dict[str, Any]) -> dict[str, object]: return await run_agentscope_task(command) @@ -356,8 +309,3 @@ async def run_command_task_critical(command: dict[str, Any]) -> dict[str, object @bulk_broker.task(task_name="tasks.agentscope.run_command.bulk") async def run_command_task_bulk(command: dict[str, Any]) -> dict[str, object]: return await run_agentscope_task(command) - - -@default_broker.task(task_name="tasks.automation.scan_due_jobs") -async def scan_due_automation_jobs_task(limit: int | None = None) -> dict[str, int]: - return await run_automation_scheduler_scan(limit=limit) diff --git a/backend/src/core/automation/scheduler.py b/backend/src/core/automation/scheduler.py index a62019a..2df400d 100644 --- a/backend/src/core/automation/scheduler.py +++ b/backend/src/core/automation/scheduler.py @@ -8,6 +8,7 @@ from uuid import UUID, uuid4 from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from core.config.settings import config from core.logging import get_logger from models.agent_chat_session import AgentChatSession, SessionType from models.automation_jobs import AutomationJob, ScheduleType @@ -17,6 +18,20 @@ from schemas.automation.scheduler import DueAutomationJob, SchedulerDispatchComm logger = get_logger("core.automation.scheduler") +class _BulkQueueAdapter: + async def enqueue( + self, + *, + command: dict[str, object], + dedup_key: str | None, + ) -> str: + del dedup_key + from core.agentscope.runtime.tasks import run_command_task_bulk + + result = await run_command_task_bulk.kiq(command) + return str(result.task_id) + + class QueueLike(Protocol): async def enqueue( self, @@ -245,3 +260,34 @@ def _compute_next_run_at( def utc_now() -> datetime: return datetime.now(timezone.utc) + + +async def run_automation_scheduler_scan( + *, + limit: int | None = None, +) -> dict[str, int]: + now = utc_now() + safe_limit = ( + max(int(limit), 1) + if isinstance(limit, int) + else int(config.automation_scheduler.batch_limit) + ) + from core.db.session import AsyncSessionLocal + + async with AsyncSessionLocal() as session: + repository = SqlAlchemyAutomationSchedulerRepository(session=session) + service = AutomationSchedulerService( + repository=repository, + queue=_BulkQueueAdapter(), + ) + result = await service.scan_and_dispatch(now_utc=now, limit=safe_limit) + logger.info( + "automation scheduler scan completed", + scanned=result.scanned, + dispatched=result.dispatched, + now_utc=now.astimezone(timezone.utc).isoformat(), + ) + return { + "scanned": int(result.scanned), + "dispatched": int(result.dispatched), + } diff --git a/backend/src/core/automation/tasks.py b/backend/src/core/automation/tasks.py new file mode 100644 index 0000000..feabf0f --- /dev/null +++ b/backend/src/core/automation/tasks.py @@ -0,0 +1,14 @@ +from __future__ import annotations + + +from core.logging import get_logger +from core.taskiq.app import bulk_broker + +logger = get_logger("core.automation.tasks") + + +@bulk_broker.task(task_name="tasks.automation.scan_due_jobs") +async def scan_due_automation_jobs_task(limit: int | None = None) -> dict[str, int]: + from core.automation.scheduler import run_automation_scheduler_scan + + return await run_automation_scheduler_scan(limit=limit) diff --git a/backend/src/core/runtime/cli.py b/backend/src/core/runtime/cli.py index e1b266b..d82ad55 100644 --- a/backend/src/core/runtime/cli.py +++ b/backend/src/core/runtime/cli.py @@ -5,7 +5,10 @@ import subprocess import sys from pathlib import Path -from core.agentscope.runtime.tasks import run_automation_scheduler_scan +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.triggers.interval import IntervalTrigger + +from core.automation.scheduler import run_automation_scheduler_scan from core.config.settings import config from core.config.initial.init_data import initialize_data from core.logging import get_logger @@ -103,24 +106,34 @@ async def bootstrap() -> bool: return True -async def run_automation_scheduler_forever() -> bool: +async def run_automation_scheduler_forever() -> None: if not config.automation_scheduler.enabled: logger.info("Automation scheduler disabled by config") - return True + return interval_seconds = int(config.automation_scheduler.interval_seconds) batch_limit = int(config.automation_scheduler.batch_limit) logger.info( - "Starting automation scheduler loop", + "Starting automation scheduler", interval_seconds=interval_seconds, batch_limit=batch_limit, ) - while True: + + def scan_job() -> None: try: - await run_automation_scheduler_scan(limit=batch_limit) + asyncio.run(run_automation_scheduler_scan(limit=batch_limit)) except Exception as exc: logger.exception("Automation scheduler scan failed", error=str(exc)) - await asyncio.sleep(interval_seconds) + + scheduler = BlockingScheduler() + scheduler.add_job( + scan_job, + trigger=IntervalTrigger(seconds=interval_seconds), + id="automation_scheduler_scan", + name="Automation scheduler scan", + replace_existing=True, + ) + scheduler.start() def main() -> int: @@ -142,7 +155,8 @@ def main() -> int: elif command == "bootstrap": success = asyncio.run(bootstrap()) elif command == "automation-scheduler": - success = asyncio.run(run_automation_scheduler_forever()) + asyncio.run(run_automation_scheduler_forever()) + return 0 else: logger.error("Unknown command", command=command) logger.info( diff --git a/deploy/.env.prod.example b/deploy/.env.prod.example index 24a6bd0..1846912 100644 --- a/deploy/.env.prod.example +++ b/deploy/.env.prod.example @@ -25,16 +25,13 @@ SOCIAL_REDIS__PORT=6379 SOCIAL_REDIS__DB=0 ############ -# Worker 队列分组配置(显式参数控制) +# Worker 队列分组配置 ############ -# critical: 用户同步感知任务(验证码发送、鉴权后置关键动作) -# default: 常规异步任务 -# bulk: 批处理/重计算/可延迟任务 -SOCIAL_WORKER__GROUPS__CRITICAL__CONCURRENCY=2 +# agent: 常规异步任务 +# automation: 批处理/重计算/可延迟任务 +SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY=2 -SOCIAL_WORKER__GROUPS__DEFAULT__CONCURRENCY=2 - -SOCIAL_WORKER__GROUPS__BULK__CONCURRENCY=1 +SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=1 ############ # Taskiq(可选,默认回落到 Redis URL) diff --git a/deploy/docker-compose.prod.yml b/deploy/docker-compose.prod.yml index ee5cb16..129dc26 100644 --- a/deploy/docker-compose.prod.yml +++ b/deploy/docker-compose.prod.yml @@ -72,7 +72,7 @@ services: - SOCIAL_REDIS__HOST=redis - SOCIAL_REDIS__PORT=6379 command: > - sh -c '.venv/bin/taskiq worker core.taskiq.app:default_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__DEFAULT__CONCURRENCY:-2}' + sh -c '.venv/bin/taskiq worker core.taskiq.app:default_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY:-2}' depends_on: redis: condition: service_healthy @@ -94,7 +94,7 @@ services: - SOCIAL_REDIS__HOST=redis - SOCIAL_REDIS__PORT=6379 command: > - sh -c '.venv/bin/taskiq worker core.taskiq.app:bulk_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__BULK__CONCURRENCY:-1}' + sh -c '.venv/bin/taskiq worker core.taskiq.app:bulk_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY:-1}' depends_on: redis: condition: service_healthy diff --git a/infra/scripts/app.sh b/infra/scripts/app.sh index 3f23b90..bedce08 100755 --- a/infra/scripts/app.sh +++ b/infra/scripts/app.sh @@ -159,8 +159,8 @@ start() { ${SOCIAL_WEB__HOST:-0.0.0.0} --port ${WEB_PORT} --workers \ ${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}" - WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:default_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__DEFAULT__CONCURRENCY:-2}" - WORKER_AUTOMATION_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-automation uv run taskiq worker core.taskiq.app:bulk_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__BULK__CONCURRENCY:-1}" + WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:default_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY:-2}" + WORKER_AUTOMATION_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-automation uv run taskiq worker core.taskiq.app:bulk_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY:-1}" SCHEDULER_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=scheduler uv run python -m core.runtime.cli automation-scheduler" echo "Starting tmux workers in session '$SESSION_NAME'..." diff --git a/pyproject.toml b/pyproject.toml index 4c8800c..a8a260b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "uvicorn[standard]>=0.40.0", "dashscope>=1.25.13", "agentscope>=1.0.16", + "apscheduler>=3.11.0", ] [project.optional-dependencies]