from __future__ import annotations from datetime import datetime from enum import Enum from uuid import UUID from core.agentscope.tools.tool_config import AgentTool from pydantic import BaseModel, ConfigDict, Field from models.automation_jobs import AutomationJob as OrmAutomationJob from models.automation_jobs import AutomationJobStatus, ScheduleType class ContextSource(str, Enum): LATEST_CHAT = "latest_chat" class ContextWindowMode(str, Enum): DAY = "day" NUMBER = "number" class MessageContextConfig(BaseModel): model_config = ConfigDict(extra="forbid") source: ContextSource = ContextSource.LATEST_CHAT window_mode: ContextWindowMode = ContextWindowMode.DAY window_count: int = Field(default=2, ge=1, le=200) class ScheduleRunAt(BaseModel): model_config = ConfigDict(extra="forbid") hour: int = Field(default=8, ge=0, le=23) minute: int = Field(default=0, ge=0, le=59) class ScheduleConfig(BaseModel): model_config = ConfigDict(extra="forbid") type: ScheduleType run_at: ScheduleRunAt class RuntimeConfig(BaseModel): model_config = ConfigDict(extra="forbid") enabled_tools: list[AgentTool] = Field(default_factory=list, max_length=32) context: MessageContextConfig = Field(default_factory=MessageContextConfig) class AutomationJobConfig(BaseModel): model_config = ConfigDict(extra="forbid") enabled_tools: list[AgentTool] | None = Field(default=None, max_length=32) context: MessageContextConfig | None = None input_template: str | None = Field(default=None, min_length=1, max_length=4000) schedule: ScheduleConfig | None = None class AutomationJob(BaseModel): model_config = ConfigDict(extra="forbid") id: UUID owner_id: UUID bootstrap_key: str | None = Field(default=None, min_length=1, max_length=64) title: str = Field(..., min_length=1, max_length=255) config: AutomationJobConfig schedule_type: ScheduleType run_at: datetime next_run_at: datetime timezone: str = Field(default="UTC", min_length=1, max_length=50) last_run_at: datetime | None = None status: AutomationJobStatus created_by: UUID | None = None created_at: datetime updated_at: datetime @classmethod def from_orm(cls, obj: OrmAutomationJob) -> "AutomationJob": return cls( id=obj.id, owner_id=obj.owner_id, bootstrap_key=obj.bootstrap_key, title=obj.title, config=AutomationJobConfig.model_validate(obj.config or {}), schedule_type=obj.schedule_type, run_at=obj.run_at, next_run_at=obj.next_run_at, timezone=obj.timezone, last_run_at=obj.last_run_at, status=obj.status, created_by=obj.created_by, created_at=obj.created_at, updated_at=obj.updated_at, ) @property def is_system(self) -> bool: return self.bootstrap_key is not None