diff --git a/backend/src/core/agentscope/events/store.py b/backend/src/core/agentscope/events/store.py index 0445ca0..72de0bd 100644 --- a/backend/src/core/agentscope/events/store.py +++ b/backend/src/core/agentscope/events/store.py @@ -8,12 +8,10 @@ from core.agentscope.events.persistence import MessageRepository, SessionReposit from core.logging import get_logger from models.agent_chat_message import AgentChatMessageRole from models.agent_chat_session import AgentChatSessionStatus -from models.system_agents import SystemAgents -from schemas.agent.system_agent import AgentType, SystemAgentLLMConfig +from schemas.agent.system_agent import AgentType from schemas.agent.runtime_models import AgentOutput, ToolAgentOutput from schemas.agent.visibility import SystemVisibilityBit, bit_mask from schemas.messages.chat_message import AgentChatMessageMetadata -from sqlalchemy import select class EventStore(Protocol): @@ -48,9 +46,6 @@ class SqlAlchemyEventStore: async with self._session_factory() as session: session_repo = SessionRepository(session) message_repo = MessageRepository(session) - stage_visibility_bit_map = await self._load_stage_visibility_bit_map( - session=session - ) chat_session = await session_repo.get_session(session_id=session_id) if chat_session is None: return @@ -83,7 +78,6 @@ class SqlAlchemyEventStore: chat_session=chat_session, session_repo=session_repo, message_repo=message_repo, - stage_visibility_bit_map=stage_visibility_bit_map, ) elif event_type == "TOOL_CALL_RESULT": await self._persist_tool_call_result( @@ -92,7 +86,6 @@ class SqlAlchemyEventStore: chat_session=chat_session, session_repo=session_repo, message_repo=message_repo, - stage_visibility_bit_map=stage_visibility_bit_map, ) await session.commit() @@ -105,7 +98,6 @@ class SqlAlchemyEventStore: chat_session: Any, session_repo: SessionRepository, message_repo: MessageRepository, - stage_visibility_bit_map: dict[str, int], ) -> None: message_id_raw = self._event_value(event, "messageId") message_id = message_id_raw if isinstance(message_id_raw, str) else "" @@ -146,17 +138,7 @@ class SqlAlchemyEventStore: try: worker_output = AgentOutput.model_validate(worker_output_payload) - raw_agent_type = self._event_value(event, "stage") - normalized_agent_type = ( - str(raw_agent_type).strip().lower() - if isinstance(raw_agent_type, str) - else AgentType.WORKER.value - ) - agent_type = ( - AgentType.MEMORY - if normalized_agent_type == AgentType.MEMORY.value - else AgentType.WORKER - ) + agent_type = AgentType.WORKER metadata_model = AgentChatMessageMetadata( run_id=run_id_value, agent_type=agent_type, @@ -199,7 +181,6 @@ class SqlAlchemyEventStore: latency_ms=latency_ms, visibility_mask=self._resolve_stage_visibility_mask( event=event, - stage_visibility_bit_map=stage_visibility_bit_map, ), ) @@ -226,7 +207,6 @@ class SqlAlchemyEventStore: chat_session: Any, session_repo: SessionRepository, message_repo: MessageRepository, - stage_visibility_bit_map: dict[str, int], ) -> None: run_id = self._event_value(event, "runId") run_id_value = run_id if isinstance(run_id, str) and run_id else None @@ -272,7 +252,6 @@ class SqlAlchemyEventStore: metadata=metadata_model.model_dump(mode="json", exclude_none=True), visibility_mask=self._resolve_stage_visibility_mask( event=event, - stage_visibility_bit_map=stage_visibility_bit_map, ), ) @@ -301,39 +280,16 @@ class SqlAlchemyEventStore: self, *, event: dict[str, Any], - stage_visibility_bit_map: dict[str, int], ) -> int: - base = bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) raw_stage = self._event_value(event, "stage") if not isinstance(raw_stage, str): - return base + return bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) normalized_stage = raw_stage.strip().lower() - bit = stage_visibility_bit_map.get(normalized_stage) - if bit is None and normalized_stage == AgentType.MEMORY.value: - bit = 18 - if bit is None: - return base - return base | bit_mask(bit=bit) - - async def _load_stage_visibility_bit_map( - self, - *, - session: Any, - ) -> dict[str, int]: - stmt = select(SystemAgents.agent_type, SystemAgents.config).where( - SystemAgents.agent_type.in_( - [AgentType.ROUTER.value, AgentType.WORKER.value, AgentType.MEMORY.value] - ) + if normalized_stage == "memory": + return bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) + return bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) | bit_mask( + bit=int(SystemVisibilityBit.CONTEXT_ASSEMBLY) ) - rows = (await session.execute(stmt)).all() - bit_map: dict[str, int] = {} - for agent_type, raw_config in rows: - if not isinstance(agent_type, str): - continue - config_payload = raw_config if isinstance(raw_config, dict) else {} - llm_config = SystemAgentLLMConfig.model_validate(config_payload) - bit_map[agent_type.strip().lower()] = llm_config.visibility_consumer_bit - return bit_map async def _update_session_state( self, diff --git a/backend/src/core/agentscope/runtime/pipeline_registry.py b/backend/src/core/agentscope/runtime/pipeline_registry.py index db37059..057c753 100644 --- a/backend/src/core/agentscope/runtime/pipeline_registry.py +++ b/backend/src/core/agentscope/runtime/pipeline_registry.py @@ -1,12 +1,7 @@ from __future__ import annotations -from schemas.agent.pipeline_spec import ( - ContextPolicy, - ContextWindowMode, - ExecutorKind, - PipelineSpec, - StageSpec, -) +from core.agentscope.schemas.pipeline_spec import ExecutorKind, PipelineSpec, StageSpec +from schemas.agent.system_agent import AgentType def build_default_pipeline_spec(*, mode: str) -> PipelineSpec: @@ -17,23 +12,13 @@ def build_default_pipeline_spec(*, mode: str) -> PipelineSpec: stages=[ StageSpec( stage_name="router", + agent_type=AgentType.ROUTER, executor_kind=ExecutorKind.SINGLE_SHOT, - default_visibility_mask=0, - context_policy=ContextPolicy( - consumer_agent_type="router", - window_mode=ContextWindowMode.DAY, - count=20, - ), ), StageSpec( stage_name="worker", + agent_type=AgentType.WORKER, executor_kind=ExecutorKind.REACT, - default_visibility_mask=0, - context_policy=ContextPolicy( - consumer_agent_type="worker", - window_mode=ContextWindowMode.NUMBER, - count=20, - ), ), ], ) @@ -44,13 +29,8 @@ def build_default_pipeline_spec(*, mode: str) -> PipelineSpec: stages=[ StageSpec( stage_name="memory", + agent_type=AgentType.MEMORY, executor_kind=ExecutorKind.REACT, - default_visibility_mask=0, - context_policy=ContextPolicy( - consumer_agent_type="memory", - window_mode=ContextWindowMode.DAY, - count=20, - ), ) ], ) diff --git a/backend/src/core/agentscope/runtime/consumer_registry.py b/backend/src/core/agentscope/runtime/registry_builder.py similarity index 88% rename from backend/src/core/agentscope/runtime/consumer_registry.py rename to backend/src/core/agentscope/runtime/registry_builder.py index d022b58..1283422 100644 --- a/backend/src/core/agentscope/runtime/consumer_registry.py +++ b/backend/src/core/agentscope/runtime/registry_builder.py @@ -1,6 +1,9 @@ from __future__ import annotations -from schemas.agent.consumer_registry import AgentConsumerBinding, ConsumerRegistry +from core.agentscope.schemas.consumer_registry import ( + AgentConsumerBinding, + ConsumerRegistry, +) def build_consumer_registry( diff --git a/backend/src/core/agentscope/runtime/runner.py b/backend/src/core/agentscope/runtime/runner.py index 0468b12..90efde2 100644 --- a/backend/src/core/agentscope/runtime/runner.py +++ b/backend/src/core/agentscope/runtime/runner.py @@ -88,10 +88,7 @@ class AgentScopeRunner: owner_id = UUID(user_context.id) runtime_client_time = self._resolve_runtime_client_time(run_input=run_input) pipeline_spec = build_default_pipeline_spec(mode=system_agent_mode) - stage_agent_types = [ - self._parse_agent_type(stage_name=stage.stage_name) - for stage in pipeline_spec.stages - ] + stage_agent_types = [stage.agent_type for stage in pipeline_spec.stages] async with AsyncSessionLocal() as session: if stage_agent_types == [AgentType.ROUTER, AgentType.WORKER]: @@ -177,17 +174,6 @@ class AgentScopeRunner: enabled_tool_names=enabled_tool_names, ) - @staticmethod - def _parse_agent_type(*, stage_name: str) -> AgentType: - normalized = stage_name.strip().lower() - if normalized == AgentType.ROUTER.value: - return AgentType.ROUTER - if normalized == AgentType.WORKER.value: - return AgentType.WORKER - if normalized == AgentType.MEMORY.value: - return AgentType.MEMORY - raise ValueError(f"unsupported stage name: {stage_name}") - async def _load_stage_config( self, *, @@ -355,7 +341,6 @@ class AgentScopeRunner: temperature=0.7, max_tokens=None, timeout_seconds=30, - visibility_consumer_bit=18, context_messages=ContextMessagesConfig( mode=( ContextBuildStrategy.DAY diff --git a/backend/src/core/agentscope/runtime/tasks.py b/backend/src/core/agentscope/runtime/tasks.py index 5a6a80b..ba35e12 100644 --- a/backend/src/core/agentscope/runtime/tasks.py +++ b/backend/src/core/agentscope/runtime/tasks.py @@ -12,7 +12,7 @@ from core.agentscope.events import ( RedisStreamBus, SqlAlchemyEventStore, ) -from core.agentscope.runtime.context_service import AgentContextService +from core.agentscope.services.context_service import AgentContextService from core.agentscope.runtime.orchestrator import AgentScopeRuntimeOrchestrator from core.agentscope.runtime.pipeline_registry import build_default_pipeline_spec from core.agentscope.schemas.agui_input import parse_run_input @@ -20,8 +20,7 @@ from core.auth.models import CurrentUser from core.config.settings import config from core.db.session import AsyncSessionLocal from core.logging import get_logger -from core.taskiq.app import bulk_broker, critical_broker, default_broker -from models.automation_jobs import AutomationJob +from core.taskiq.app import worker_agent_broker, worker_automation_broker from schemas.agent.visibility import SystemVisibilityBit, bit_mask from schemas.automation.config import AutomationJobConfig from schemas.messages.chat_message import ( @@ -33,8 +32,10 @@ from schemas.user import UserContext from services.base.redis import get_or_init_redis_client from services.base.supabase import supabase_service from v1.agent.repository import AgentRepository +from v1.memory.repository import MemoryRepository +from v1.memory.service import MemoryService from v1.users.dependencies import get_user_service -from sqlalchemy import select + logger = get_logger("core.agentscope.runtime.tasks") _MAX_CONTEXT_ATTACHMENTS = 3 @@ -188,29 +189,6 @@ async def _build_recent_context_messages( return converted -async def _load_memory_job_config( - *, - session: Any, - owner_id: UUID, - automation_job_id: str, -) -> AutomationJobConfig: - try: - job_uuid = UUID(automation_job_id) - except ValueError as exc: - raise ValueError("automation_job_id is invalid") from exc - - stmt = ( - select(AutomationJob) - .where(AutomationJob.id == job_uuid) - .where(AutomationJob.owner_id == owner_id) - .where(AutomationJob.deleted_at.is_(None)) - ) - row = (await session.execute(stmt)).scalar_one_or_none() - if row is None: - raise ValueError("automation job not found") - return AutomationJobConfig.model_validate(row.config or {}) - - async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: command_type = str(command.get("command", "run")).strip().lower() raw_owner_id = command.get("owner_id") @@ -245,10 +223,11 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: memory_job_config: AutomationJobConfig | None = None if system_agent_mode == "memory": assert isinstance(raw_automation_job_id, str) - memory_job_config = await _load_memory_job_config( - session=session, + job_uuid = UUID(raw_automation_job_id) + memory_service = MemoryService(MemoryRepository(session)) + memory_job_config = await memory_service.get_memory_job_config( + job_id=job_uuid, owner_id=owner_id, - automation_job_id=raw_automation_job_id, ) redis_client = await get_or_init_redis_client() @@ -272,7 +251,7 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: context_messages = await _build_recent_context_messages( session=session, thread_id=thread_id, - context_mode=pipeline_spec.stages[0].context_policy.consumer_agent_type, + context_mode=pipeline_spec.stages[0].agent_type.value, memory_job_config=memory_job_config, ) @@ -296,16 +275,11 @@ async def run_agentscope_task(command: dict[str, Any]) -> dict[str, object]: } -@default_broker.task(task_name="tasks.agentscope.run_command") -async def run_command_task(command: dict[str, Any]) -> dict[str, object]: +@worker_agent_broker.task(task_name="tasks.agentscope.run_command.agent") +async def run_command_task_agent(command: dict[str, object]) -> dict[str, object]: return await run_agentscope_task(command) -@critical_broker.task(task_name="tasks.agentscope.run_command.critical") -async def run_command_task_critical(command: dict[str, Any]) -> dict[str, object]: - return await run_agentscope_task(command) - - -@bulk_broker.task(task_name="tasks.agentscope.run_command.bulk") -async def run_command_task_bulk(command: dict[str, Any]) -> dict[str, object]: +@worker_automation_broker.task(task_name="tasks.agentscope.run_command.automation") +async def run_command_task_automation(command: dict[str, object]) -> dict[str, object]: return await run_agentscope_task(command) diff --git a/backend/src/schemas/agent/consumer_registry.py b/backend/src/core/agentscope/schemas/consumer_registry.py similarity index 100% rename from backend/src/schemas/agent/consumer_registry.py rename to backend/src/core/agentscope/schemas/consumer_registry.py diff --git a/backend/src/schemas/agent/pipeline_spec.py b/backend/src/core/agentscope/schemas/pipeline_spec.py similarity index 59% rename from backend/src/schemas/agent/pipeline_spec.py rename to backend/src/core/agentscope/schemas/pipeline_spec.py index ac04106..9465d3e 100644 --- a/backend/src/schemas/agent/pipeline_spec.py +++ b/backend/src/core/agentscope/schemas/pipeline_spec.py @@ -4,40 +4,20 @@ from enum import Enum from pydantic import BaseModel, ConfigDict, Field, field_validator +from schemas.agent.system_agent import AgentType + class ExecutorKind(str, Enum): SINGLE_SHOT = "single_shot" REACT = "react" -class ContextWindowMode(str, Enum): - DAY = "day" - NUMBER = "number" - - -class ContextPolicy(BaseModel): - model_config = ConfigDict(extra="forbid") - - consumer_agent_type: str = Field(..., min_length=1, max_length=64) - window_mode: ContextWindowMode = ContextWindowMode.NUMBER - count: int = Field(default=20, ge=1, le=200) - - @field_validator("consumer_agent_type") - @classmethod - def _normalize_consumer_agent_type(cls, value: str) -> str: - normalized = value.strip().lower() - if not normalized: - raise ValueError("consumer_agent_type must not be empty") - return normalized - - class StageSpec(BaseModel): model_config = ConfigDict(extra="forbid") stage_name: str = Field(..., min_length=1, max_length=64) + agent_type: AgentType executor_kind: ExecutorKind - default_visibility_mask: int = Field(..., ge=0, le=(1 << 63) - 1) - context_policy: ContextPolicy @field_validator("stage_name") @classmethod diff --git a/backend/src/core/agentscope/runtime/context_service.py b/backend/src/core/agentscope/services/context_service.py similarity index 96% rename from backend/src/core/agentscope/runtime/context_service.py rename to backend/src/core/agentscope/services/context_service.py index ca24ef9..5257c07 100644 --- a/backend/src/core/agentscope/runtime/context_service.py +++ b/backend/src/core/agentscope/services/context_service.py @@ -5,7 +5,7 @@ from typing import Protocol from core.agentscope.runtime.context_loader_registry import CONTEXT_LOADER_REGISTRY from schemas.agent.system_agent import SystemAgentLLMConfig -from schemas.agent.visibility import bit_mask +from schemas.agent.visibility import SystemVisibilityBit, bit_mask _DEFAULT_CONTEXT_WINDOW_USER_MESSAGES = 20 _DEFAULT_ROUTER_CONTEXT_DAY_COUNT = 20 @@ -61,7 +61,7 @@ class AgentContextService: normalized_config = self._normalize_system_agent_config(raw_llm_config) context_config = normalized_config.context_messages - visibility_mask = bit_mask(bit=normalized_config.visibility_consumer_bit) + visibility_mask = bit_mask(bit=int(SystemVisibilityBit.CONTEXT_ASSEMBLY)) context_loader = CONTEXT_LOADER_REGISTRY.resolve(mode=context_config.mode) return await context_loader( self, diff --git a/backend/src/core/automation/tasks.py b/backend/src/core/automation/tasks.py index feabf0f..166288a 100644 --- a/backend/src/core/automation/tasks.py +++ b/backend/src/core/automation/tasks.py @@ -2,12 +2,12 @@ from __future__ import annotations from core.logging import get_logger -from core.taskiq.app import bulk_broker +from core.taskiq.app import worker_automation_broker logger = get_logger("core.automation.tasks") -@bulk_broker.task(task_name="tasks.automation.scan_due_jobs") +@worker_automation_broker.task(task_name="tasks.automation.scan_due_jobs") async def scan_due_automation_jobs_task(limit: int | None = None) -> dict[str, int]: from core.automation.scheduler import run_automation_scheduler_scan diff --git a/backend/src/core/config/static/database/system_agents.yaml b/backend/src/core/config/static/database/system_agents.yaml index 28ceabf..3b1d51d 100644 --- a/backend/src/core/config/static/database/system_agents.yaml +++ b/backend/src/core/config/static/database/system_agents.yaml @@ -6,7 +6,6 @@ agents: temperature: 0.7 max_tokens: null timeout_seconds: 30 - visibility_consumer_bit: 16 context_messages: mode: day count: 2 @@ -19,7 +18,6 @@ agents: temperature: 0.7 max_tokens: null timeout_seconds: 30 - visibility_consumer_bit: 17 context_messages: mode: number count: 20 diff --git a/backend/src/core/taskiq/__init__.py b/backend/src/core/taskiq/__init__.py index 7a93800..59b081b 100644 --- a/backend/src/core/taskiq/__init__.py +++ b/backend/src/core/taskiq/__init__.py @@ -1,3 +1,3 @@ -from core.taskiq.app import broker, bulk_broker, critical_broker, default_broker +from core.taskiq.app import broker, worker_agent_broker, worker_automation_broker -__all__ = ["broker", "default_broker", "critical_broker", "bulk_broker"] +__all__ = ["broker", "worker_agent_broker", "worker_automation_broker"] diff --git a/backend/src/core/taskiq/app.py b/backend/src/core/taskiq/app.py index 12ae6c6..d20cf0b 100644 --- a/backend/src/core/taskiq/app.py +++ b/backend/src/core/taskiq/app.py @@ -21,11 +21,9 @@ def _build_broker(queue_name: str) -> ListQueueBroker: ) -default_broker = _build_broker("default") -critical_broker = _build_broker("critical") -bulk_broker = _build_broker("bulk") +worker_agent_broker = _build_broker("agent") +worker_automation_broker = _build_broker("automation") -# Backward-compatible export name for existing imports/tests. -broker = default_broker +broker = worker_agent_broker -__all__ = ["broker", "default_broker", "critical_broker", "bulk_broker"] +__all__ = ["broker", "worker_agent_broker", "worker_automation_broker"] diff --git a/backend/src/schemas/agent/__init__.py b/backend/src/schemas/agent/__init__.py index 1647112..8ac7a85 100644 --- a/backend/src/schemas/agent/__init__.py +++ b/backend/src/schemas/agent/__init__.py @@ -1,17 +1,10 @@ -from schemas.agent.consumer_registry import AgentConsumerBinding, ConsumerRegistry from schemas.agent.forwarded_props import ( ClientTimeContext, ForwardedPropsPayload, - parse_forwarded_props_agent_type, parse_forwarded_props_client_time, + parse_forwarded_props_runtime_mode, ) -from schemas.agent.pipeline_spec import ( - ContextPolicy, - ContextWindowMode, - ExecutorKind, - PipelineSpec, - StageSpec, -) +from schemas.agent.forwarded_props import RuntimeMode from schemas.agent.runtime_models import ( AgentOutput, ConstraintItem, @@ -45,28 +38,22 @@ from schemas.agent.ui_hints import ( __all__ = [ "AgentType", "AgentOutput", - "AgentConsumerBinding", "ConstraintItem", - "ConsumerRegistry", - "ContextPolicy", - "ContextWindowMode", "ExecutionMode", - "ExecutorKind", "ForwardedPropsPayload", "KeyEntity", "NormalizedTaskInput", - "PipelineSpec", "ResultTyping", "ClientTimeContext", "ResultType", "RouterAgentOutput", "RouterUiDecision", "RunStatus", + "RuntimeMode", "TaskType", "TaskTyping", "SystemAgentLLMConfig", "SystemVisibilityBit", - "StageSpec", "ToolAgentOutput", "ToolStatus", "UiMode", @@ -79,7 +66,7 @@ __all__ = [ "WorkerAgentOutputLite", "WorkerAgentOutputRich", "bit_mask", - "parse_forwarded_props_agent_type", "parse_forwarded_props_client_time", + "parse_forwarded_props_runtime_mode", "resolve_worker_output_model", ] diff --git a/backend/src/schemas/agent/forwarded_props.py b/backend/src/schemas/agent/forwarded_props.py index dbda22a..9e8250b 100644 --- a/backend/src/schemas/agent/forwarded_props.py +++ b/backend/src/schemas/agent/forwarded_props.py @@ -1,6 +1,7 @@ from __future__ import annotations from datetime import datetime +from enum import Enum import re from zoneinfo import ZoneInfo, ZoneInfoNotFoundError @@ -59,20 +60,17 @@ class ClientTimeContext(BaseModel): return value +class RuntimeMode(str, Enum): + CHAT = "chat" + AUTOMATION = "automation" + + class ForwardedPropsPayload(BaseModel): model_config = ConfigDict(extra="forbid") - agent_type: str = Field(..., min_length=1, max_length=64) + runtime_mode: RuntimeMode client_time: ClientTimeContext | None = None - @field_validator("agent_type") - @classmethod - def validate_agent_type(cls, value: str) -> str: - normalized = value.strip().lower() - if not normalized: - raise ValueError("invalid forwarded_props.agent_type") - return normalized - def parse_forwarded_props(forwarded_props: object) -> ForwardedPropsPayload: if not isinstance(forwarded_props, dict): @@ -90,6 +88,6 @@ def parse_forwarded_props_client_time( return payload.client_time -def parse_forwarded_props_agent_type(forwarded_props: object) -> str: +def parse_forwarded_props_runtime_mode(forwarded_props: object) -> RuntimeMode: payload = parse_forwarded_props(forwarded_props) - return payload.agent_type + return payload.runtime_mode diff --git a/backend/src/schemas/agent/system_agent.py b/backend/src/schemas/agent/system_agent.py index fd20712..92e41bb 100644 --- a/backend/src/schemas/agent/system_agent.py +++ b/backend/src/schemas/agent/system_agent.py @@ -2,15 +2,13 @@ from __future__ import annotations from enum import Enum -from pydantic import BaseModel, Field, field_validator - from core.agentscope.tools.tool_config import AgentTool, parse_agent_tool +from pydantic import BaseModel, Field, field_validator class AgentType(str, Enum): ROUTER = "router" WORKER = "worker" - MEMORY = "memory" class ContextBuildStrategy(str, Enum): @@ -30,7 +28,6 @@ class SystemAgentLLMConfig(BaseModel): context_messages: ContextMessagesConfig = Field( default_factory=ContextMessagesConfig ) - visibility_consumer_bit: int = Field(default=16, ge=16, le=63) enabled_tools: list[AgentTool] = Field(default_factory=list, max_length=32) @field_validator("enabled_tools", mode="before") @@ -42,10 +39,13 @@ class SystemAgentLLMConfig(BaseModel): raise ValueError("enabled_tools must be a list") normalized: list[AgentTool] = [] for item in value: - raw_item = str(item or "").strip() - if not raw_item: - continue - tool = parse_agent_tool(raw_item) + if isinstance(item, AgentTool): + tool = item + else: + raw_item = str(item or "").strip() + if not raw_item: + continue + tool = parse_agent_tool(raw_item) if tool not in normalized: normalized.append(tool) return normalized diff --git a/backend/src/schemas/agent/visibility.py b/backend/src/schemas/agent/visibility.py index 9314286..7af41d3 100644 --- a/backend/src/schemas/agent/visibility.py +++ b/backend/src/schemas/agent/visibility.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator class SystemVisibilityBit(IntEnum): UI_HISTORY = 0 - UI_REALTIME = 1 + CONTEXT_ASSEMBLY = 1 class VisibilityMask(BaseModel): diff --git a/backend/src/v1/agent/asr.py b/backend/src/v1/agent/asr.py new file mode 100644 index 0000000..36ee940 --- /dev/null +++ b/backend/src/v1/agent/asr.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +import asyncio +from typing import Any + +import dashscope +from dashscope.audio.asr import Recognition, RecognitionCallback + +from core.config.settings import config +from core.logging import get_logger + +logger = get_logger(__name__) + + +class AsrService: + def __init__(self) -> None: + self._api_key: str | None = None + + def _get_api_key(self) -> str: + if self._api_key is None: + dashscope_key = config.llm.provider_keys.get("dashscope") + if not dashscope_key: + raise ValueError( + "DASHSCOPE_API_KEY not configured. Set SOCIAL_LLM__PROVIDER_KEYS__DASHSCOPE in environment." + ) + self._api_key = dashscope_key + return self._api_key + + async def transcribe_file(self, file_path: str, filename: str) -> str: + try: + dashscope.api_key = self._get_api_key() + + loop = asyncio.get_event_loop() + + class SyncCallback(RecognitionCallback): + error: str | None = None + + def on_error(self, result: Any) -> None: + self.error = str(result) + + callback = SyncCallback() + recognizer = Recognition( + model="fun-asr-realtime-2026-02-28", + callback=callback, + format="wav", + sample_rate=16000, + ) + + result: Any = await loop.run_in_executor( + None, + lambda: recognizer.call(file=file_path), + ) + + if callback.error: + raise RuntimeError(f"ASR error: {callback.error}") + status_code = self._extract_field(result, "status_code") + if status_code != 200: + message = self._extract_field(result, "message") + raise RuntimeError(f"ASR transcription failed: {message}") + + sentence = self._extract_sentence_payload(result) + if sentence is None: + request_id = self._extract_field(result, "request_id") + logger.warning( + "ASR returned empty result", extra={"request_id": request_id} + ) + return "" + + if isinstance(sentence, dict): + transcription = sentence.get("text", "") + elif isinstance(sentence, list): + transcription = " ".join( + item.get("text", "") for item in sentence if isinstance(item, dict) + ) + else: + transcription = str(sentence) if sentence else "" + + logger.info( + "ASR transcription completed", + extra={"filename": filename, "transcript_length": len(transcription)}, + ) + return transcription + + except asyncio.CancelledError: + raise + except RuntimeError: + raise + except Exception as exc: + logger.exception("ASR transcription error") + raise RuntimeError(f"ASR transcription failed: {exc}") from exc + + def _extract_sentence_payload(self, result: Any) -> Any | None: + if isinstance(result, dict): + output = result.get("output") + if isinstance(output, dict): + return output.get("sentence") + if output is not None: + return getattr(output, "sentence", None) + return result.get("sentence") + + get_sentence = getattr(result, "get_sentence", None) + if callable(get_sentence): + sentence = get_sentence() + if sentence is not None: + return sentence + + output = getattr(result, "output", None) + if output is None: + return None + if isinstance(output, dict): + return output.get("sentence") + return getattr(output, "sentence", None) + + def _extract_field(self, result: Any, field: str) -> Any | None: + if isinstance(result, dict): + return result.get(field) + return getattr(result, field, None) + + +asr_service = AsrService() diff --git a/backend/src/v1/agent/dependencies.py b/backend/src/v1/agent/dependencies.py index ba79151..adf8350 100644 --- a/backend/src/v1/agent/dependencies.py +++ b/backend/src/v1/agent/dependencies.py @@ -44,17 +44,14 @@ class TaskiqQueueClient: @staticmethod def _select_queue_task(command: dict[str, object]) -> Any: from core.agentscope.runtime.tasks import ( - run_command_task, - run_command_task_bulk, - run_command_task_critical, + run_command_task_agent, + run_command_task_automation, ) - queue = str(command.get("queue", "default")).strip().lower() - if queue == "critical": - return run_command_task_critical - if queue == "bulk": - return run_command_task_bulk - return run_command_task + queue = str(command.get("queue", "agent")).strip().lower() + if queue == "automation": + return run_command_task_automation + return run_command_task_agent async def enqueue( self, *, command: dict[str, object], dedup_key: str | None diff --git a/backend/src/v1/agent/router.py b/backend/src/v1/agent/router.py index 796d4a2..a2f57e2 100644 --- a/backend/src/v1/agent/router.py +++ b/backend/src/v1/agent/router.py @@ -6,7 +6,7 @@ import re import tempfile from collections.abc import AsyncIterator from datetime import date -from typing import Annotated, Union +from typing import Annotated from ag_ui.core import RunAgentInput from core.agentscope.events import to_sse_event @@ -28,7 +28,7 @@ from fastapi import ( UploadFile, status, ) -from fastapi.responses import JSONResponse, StreamingResponse +from fastapi.responses import StreamingResponse from services.base.redis import get_or_init_redis_client from v1.agent.dependencies import get_agent_service from v1.agent.schemas import ( @@ -39,7 +39,8 @@ from v1.agent.schemas import ( HistorySnapshotResponse, TaskAcceptedResponse, ) -from v1.agent.service import AgentService, asr_service +from v1.agent.asr import asr_service +from v1.agent.service import AgentService from v1.users.dependencies import get_current_user router = APIRouter(prefix="/agent", tags=["agent"]) @@ -73,15 +74,13 @@ async def _acquire_sse_slot(*, user_id: str) -> bool: count = await redis.incr(key) if count == 1: await redis.expire(key, _SSE_SLOT_TTL_SECONDS) + elif count > _MAX_SSE_CONNECTIONS_PER_USER: + await redis.decr(key) + return False else: ttl = await redis.ttl(key) - if int(ttl) < 0: + if ttl < 0: await redis.expire(key, _SSE_SLOT_TTL_SECONDS) - if int(count) > _MAX_SSE_CONNECTIONS_PER_USER: - after_decr = await redis.decr(key) - if int(after_decr) <= 0: - await redis.delete(key) - return False return True except Exception as exc: # noqa: BLE001 logger.warning( @@ -97,13 +96,18 @@ async def _release_sse_slot(*, user_id: str) -> None: redis = await get_or_init_redis_client() key = f"agent:sse-active:{user_id}" count = await redis.decr(key) - if int(count) <= 0: + if count <= 0: await redis.delete(key) - return None - ttl = await redis.ttl(key) - if int(ttl) < 0: - await redis.expire(key, _SSE_SLOT_TTL_SECONDS) - except Exception: # noqa: BLE001 + else: + ttl = await redis.ttl(key) + if ttl < 0: + await redis.expire(key, _SSE_SLOT_TTL_SECONDS) + except Exception as exc: # noqa: BLE001 + logger.warning( + "SSE slot release failed", + user_id=user_id, + reason=str(exc), + ) return None @@ -176,6 +180,11 @@ async def stream_events( last_event_id=cursor, current_user=current_user, ) + except TimeoutError: + idle_polls += 1 + yield ": keep-alive\n\n" + await asyncio.sleep(0.2) + continue except Exception as exc: # noqa: BLE001 logger.warning( "SSE stream read failed", @@ -183,11 +192,6 @@ async def stream_events( user_id=str(current_user.id), reason=str(exc), ) - if "Timeout reading from" in str(exc): - idle_polls += 1 - yield ": keep-alive\n\n" - await asyncio.sleep(0.2) - continue break if not rows: @@ -291,12 +295,12 @@ async def create_attachment_signed_url( async def transcribe( audio: UploadFile, request: Request, - _current_user: Annotated[CurrentUser, Depends(get_current_user)], -) -> Union[AsrTranscribeResponse, JSONResponse]: + current_user: Annotated[CurrentUser, Depends(get_current_user)], +) -> AsrTranscribeResponse: temp_path: str | None = None try: if audio.content_type not in _ALLOWED_AUDIO_CONTENT_TYPES: - raise ValueError("Unsupported audio format") + raise HTTPException(status_code=400, detail="Unsupported audio format") content_length = request.headers.get("content-length") if content_length is not None: @@ -309,7 +313,7 @@ async def transcribe( and declared_length > _MAX_TRANSCRIBE_AUDIO_BYTES + _MULTIPART_OVERHEAD_BYTES ): - raise ValueError("Audio file too large") + raise HTTPException(status_code=400, detail="Audio file too large") with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: temp_path = tmp_file.name @@ -322,16 +326,16 @@ async def transcribe( break total_bytes += len(chunk) if total_bytes > _MAX_TRANSCRIBE_AUDIO_BYTES: - raise ValueError("Audio file too large") + raise HTTPException(status_code=400, detail="Audio file too large") if len(header) < _WAV_HEADER_MIN_BYTES: required = _WAV_HEADER_MIN_BYTES - len(header) header.extend(chunk[:required]) tmp_file.write(chunk) if total_bytes == 0: - raise ValueError("Empty audio file") + raise HTTPException(status_code=400, detail="Empty audio file") if not _looks_like_wav_header(bytes(header)): - raise ValueError("Unsupported audio format") + raise HTTPException(status_code=400, detail="Unsupported audio format") transcript = await asr_service.transcribe_file( temp_path, audio.filename or "unknown" @@ -339,17 +343,14 @@ async def transcribe( return AsrTranscribeResponse(transcript=transcript) - except ValueError as exc: - return JSONResponse( - status_code=status.HTTP_400_BAD_REQUEST, - content={"detail": str(exc)}, - ) + except HTTPException: + raise except RuntimeError: - return JSONResponse( - status_code=status.HTTP_502_BAD_GATEWAY, - content={"detail": "ASR service unavailable"}, - ) + raise HTTPException(status_code=502, detail="ASR service unavailable") finally: await audio.close() - if temp_path and os.path.exists(temp_path): - os.unlink(temp_path) + if temp_path: + try: + os.unlink(temp_path) + except OSError: + pass diff --git a/backend/src/v1/agent/schemas.py b/backend/src/v1/agent/schemas.py index a5aaf5f..88cd718 100644 --- a/backend/src/v1/agent/schemas.py +++ b/backend/src/v1/agent/schemas.py @@ -1,12 +1,95 @@ from __future__ import annotations -from typing import Literal +from dataclasses import dataclass +from datetime import date +from typing import Any, Literal, Protocol from pydantic import BaseModel, ConfigDict, Field from schemas.agent.ui_schema import UiSchemaRenderer +class AgentRepositoryLike(Protocol): + async def get_session_owner(self, *, session_id: str) -> str: ... + + async def create_session_for_user( + self, *, user_id: str, session_id: str | None = None + ) -> str: ... + + async def commit(self) -> None: ... + + async def rollback(self) -> None: ... + + async def get_history_day( + self, + *, + session_id: str, + before: date | None, + visibility_mask: int | None = None, + ) -> dict[str, object] | None: ... + + async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: ... + + async def persist_user_message( + self, + *, + session_id: str, + content: str, + metadata: Any, + visibility_mask: int, + ) -> None: ... + + async def get_system_agent_config( + self, *, agent_type: str + ) -> dict[str, object] | None: ... + + +class QueueClientLike(Protocol): + async def enqueue( + self, *, command: dict[str, object], dedup_key: str | None + ) -> str: ... + + +class EventStreamLike(Protocol): + async def read( + self, + *, + session_id: str, + last_event_id: str | None, + ) -> list[dict[str, object]]: ... + + +class AttachmentStorageLike(Protocol): + async def upload_bytes( + self, + *, + bucket: str, + path: str, + content: bytes, + content_type: str, + ) -> str: ... + + async def download_bytes(self, *, bucket: str, path: str) -> bytes: ... + + async def create_signed_url( + self, + *, + bucket: str, + path: str, + expires_in_seconds: int, + ) -> str: ... + + def parse_signed_url(self, url: str) -> tuple[str, str]: ... + + +@dataclass(frozen=True) +class TaskAccepted: + task_id: str + thread_id: str + run_id: str + created: bool + + class TaskAcceptedResponse(BaseModel): model_config = ConfigDict(populate_by_name=True, serialize_by_alias=True) diff --git a/backend/src/v1/agent/service.py b/backend/src/v1/agent/service.py index 01a471c..30d87fa 100644 --- a/backend/src/v1/agent/service.py +++ b/backend/src/v1/agent/service.py @@ -1,15 +1,11 @@ from __future__ import annotations -import asyncio -from dataclasses import dataclass from datetime import date import hashlib -from typing import Any, Protocol + from urllib.parse import urlparse -import dashscope from ag_ui.core import RunAgentInput -from dashscope.audio.asr import Recognition, RecognitionCallback from fastapi import HTTPException from sqlalchemy.exc import IntegrityError @@ -17,102 +13,32 @@ from core.auth.models import CurrentUser from core.agentscope.schemas.agui_input import extract_latest_user_payload from core.config.settings import config from core.logging import get_logger -from schemas.agent.forwarded_props import parse_forwarded_props_agent_type -from schemas.agent.system_agent import SystemAgentLLMConfig +from schemas.agent.forwarded_props import ( + parse_forwarded_props_runtime_mode, + RuntimeMode, +) from schemas.agent.visibility import SystemVisibilityBit, bit_mask from schemas.messages.chat_message import ( AgentChatMessageMetadata, UserMessageAttachment, extract_user_message_attachments, ) -from v1.agent.schemas import HistorySnapshotResponse +from v1.agent.schemas import ( + AgentRepositoryLike, + AttachmentStorageLike, + EventStreamLike, + HistorySnapshotResponse, + QueueClientLike, + TaskAccepted, +) +from v1.agent.utils import ( + MAX_ATTACHMENT_BYTES, + MAX_ATTACHMENTS_PER_MESSAGE, + is_safe_attachment_path, + mime_to_suffix, +) logger = get_logger(__name__) -_ALLOWED_ATTACHMENT_MIME_TYPES = {"image/png", "image/jpeg", "image/webp"} -_MAX_ATTACHMENT_BYTES = 5 * 1024 * 1024 -_MAX_TOTAL_ATTACHMENT_BYTES = 12 * 1024 * 1024 -_MAX_ATTACHMENTS_PER_MESSAGE = 3 - - -@dataclass(frozen=True) -class TaskAccepted: - task_id: str - thread_id: str - run_id: str - created: bool - - -class AgentRepositoryLike(Protocol): - async def get_session_owner(self, *, session_id: str) -> str: ... - - async def create_session_for_user( - self, *, user_id: str, session_id: str | None = None - ) -> str: ... - - async def commit(self) -> None: ... - - async def rollback(self) -> None: ... - - async def get_history_day( - self, - *, - session_id: str, - before: date | None, - visibility_mask: int | None = None, - ) -> dict[str, object] | None: ... - - async def get_latest_session_id_for_user(self, *, user_id: str) -> str | None: ... - - async def persist_user_message( - self, - *, - session_id: str, - content: str, - metadata: AgentChatMessageMetadata | None, - visibility_mask: int, - ) -> None: ... - - async def get_system_agent_config( - self, *, agent_type: str - ) -> dict[str, object] | None: ... - - -class QueueClientLike(Protocol): - async def enqueue( - self, *, command: dict[str, object], dedup_key: str | None - ) -> str: ... - - -class EventStreamLike(Protocol): - async def read( - self, - *, - session_id: str, - last_event_id: str | None, - ) -> list[dict[str, object]]: ... - - -class AttachmentStorageLike(Protocol): - async def upload_bytes( - self, - *, - bucket: str, - path: str, - content: bytes, - content_type: str, - ) -> str: ... - - async def download_bytes(self, *, bucket: str, path: str) -> bytes: ... - - async def create_signed_url( - self, - *, - bucket: str, - path: str, - expires_in_seconds: int, - ) -> str: ... - - def parse_signed_url(self, url: str) -> tuple[str, str]: ... def ensure_session_owner(*, owner_id: str, current_user: CurrentUser) -> None: @@ -152,14 +78,9 @@ class AgentService: run_id = run_input.run_id forwarded_props = getattr(run_input, "forwarded_props", None) try: - agent_type = parse_forwarded_props_agent_type(forwarded_props) + runtime_mode = parse_forwarded_props_runtime_mode(forwarded_props) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) from exc - if agent_type == "memory": - raise HTTPException( - status_code=422, - detail="memory mode is automation-only", - ) try: owner = await self._repository.get_session_owner(session_id=thread_id) @@ -185,7 +106,7 @@ class AgentService: current_user=current_user, ) visibility_mask = await self._resolve_user_message_visibility_mask( - agent_type=agent_type + runtime_mode=runtime_mode ) await self._repository.persist_user_message( session_id=thread_id, @@ -195,6 +116,7 @@ class AgentService: ) await self._repository.commit() + queue = "automation" if runtime_mode == RuntimeMode.AUTOMATION else "agent" task_id = await self._queue.enqueue( command={ "command": "run", @@ -202,6 +124,7 @@ class AgentService: "run_input": run_input.model_dump( mode="json", by_alias=True, exclude_none=True ), + "queue": queue, }, dedup_key=None, ) @@ -212,60 +135,14 @@ class AgentService: created=created, ) - async def _resolve_user_message_visibility_mask(self, *, agent_type: str) -> int: - normalized_agent_type = agent_type.strip().lower() - history_bit_mask = bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) - - if normalized_agent_type == "memory": - return bit_mask(bit=18) - - agent_config = await self._repository.get_system_agent_config( - agent_type=normalized_agent_type - ) - if agent_config is None: - raise HTTPException( - status_code=422, detail="invalid forwarded_props.agent_type" + async def _resolve_user_message_visibility_mask( + self, *, runtime_mode: RuntimeMode + ) -> int: + if runtime_mode == RuntimeMode.CHAT: + return bit_mask(bit=int(SystemVisibilityBit.UI_HISTORY)) | bit_mask( + bit=int(SystemVisibilityBit.CONTEXT_ASSEMBLY) ) - llm_config = SystemAgentLLMConfig.model_validate( - (agent_config.get("config") if isinstance(agent_config, dict) else {}) or {} - ) - agent_mask = bit_mask(bit=llm_config.visibility_consumer_bit) - - if normalized_agent_type == "worker": - router_config = await self._repository.get_system_agent_config( - agent_type="router" - ) - worker_config = await self._repository.get_system_agent_config( - agent_type="worker" - ) - if router_config is None or worker_config is None: - raise HTTPException( - status_code=500, - detail="system agent visibility config missing", - ) - router_mask = bit_mask( - bit=SystemAgentLLMConfig.model_validate( - ( - router_config.get("config") - if isinstance(router_config, dict) - else {} - ) - or {} - ).visibility_consumer_bit - ) - worker_mask = bit_mask( - bit=SystemAgentLLMConfig.model_validate( - ( - worker_config.get("config") - if isinstance(worker_config, dict) - else {} - ) - or {} - ).visibility_consumer_bit - ) - return history_bit_mask | router_mask | worker_mask - - return history_bit_mask | agent_mask + return 0 async def _prepare_user_message( self, @@ -309,7 +186,7 @@ class AgentService: mime_type=mime_type, ) ) - if len(user_attachments) > _MAX_ATTACHMENTS_PER_MESSAGE: + if len(user_attachments) > MAX_ATTACHMENTS_PER_MESSAGE: raise HTTPException(status_code=422, detail="Too many attachments") except HTTPException: raise @@ -360,14 +237,14 @@ class AgentService: if not isinstance(content_type, str): raise HTTPException(status_code=422, detail="Unsupported attachment type") mime_type = content_type.lower() - if mime_type not in _ALLOWED_ATTACHMENT_MIME_TYPES: + if mime_type not in {"image/png", "image/jpeg", "image/webp"}: raise HTTPException(status_code=422, detail="Unsupported attachment type") if not payload: raise HTTPException(status_code=422, detail="Empty attachment") - if len(payload) > _MAX_ATTACHMENT_BYTES: + if len(payload) > MAX_ATTACHMENT_BYTES: raise HTTPException(status_code=413, detail="Attachment too large") - suffix = _mime_to_suffix(mime_type) + suffix = mime_to_suffix(mime_type) checksum = hashlib.sha1(payload).hexdigest()[:16] filename_seed = filename if isinstance(filename, str) and filename else "upload" filename_hash = hashlib.sha1(filename_seed.encode("utf-8")).hexdigest()[:8] @@ -424,7 +301,7 @@ class AgentService: normalized_path = path.strip() expected_prefix = f"agent-inputs/{current_user.id}/" - if not _is_safe_attachment_path( + if not is_safe_attachment_path( normalized_path, expected_prefix=expected_prefix ): raise HTTPException(status_code=422, detail="Invalid attachment path scope") @@ -503,7 +380,7 @@ class AgentService: f"agent-inputs/{current_user.id}/{thread_id}/uploads/" ) for attachment in attachments: - if not _is_safe_attachment_path( + if not is_safe_attachment_path( attachment.path, expected_prefix=expected_prefix, ): @@ -586,134 +463,6 @@ class AgentService: raise HTTPException(status_code=422, detail="INVALID_BINARY_URL_BUCKET") expected_prefix = f"agent-inputs/{current_user.id}/{thread_id}/uploads/" - if not _is_safe_attachment_path(path, expected_prefix=expected_prefix): + if not is_safe_attachment_path(path, expected_prefix=expected_prefix): raise HTTPException(status_code=422, detail="INVALID_BINARY_URL_PATH_SCOPE") return bucket, path - - -class AsrService: - def __init__(self) -> None: - self._api_key: str | None = None - - def _get_api_key(self) -> str: - if self._api_key is None: - dashscope_key = config.llm.provider_keys.get("dashscope") - if not dashscope_key: - raise ValueError( - "DASHSCOPE_API_KEY not configured. Set SOCIAL_LLM__PROVIDER_KEYS__DASHSCOPE in environment." - ) - self._api_key = dashscope_key - return self._api_key - - async def transcribe_file(self, file_path: str, filename: str) -> str: - try: - dashscope.api_key = self._get_api_key() - - loop = asyncio.get_event_loop() - - class SyncCallback(RecognitionCallback): - error: str | None = None - - def on_error(self, result: Any) -> None: - self.error = str(result) - - callback = SyncCallback() - recognizer = Recognition( - model="fun-asr-realtime-2026-02-28", - callback=callback, - format="wav", - sample_rate=16000, - ) - - result: Any = await loop.run_in_executor( - None, - lambda: recognizer.call(file=file_path), - ) - - if callback.error: - raise RuntimeError(f"ASR error: {callback.error}") - status_code = self._extract_field(result, "status_code") - if status_code != 200: - message = self._extract_field(result, "message") - raise RuntimeError(f"ASR transcription failed: {message}") - - sentence = self._extract_sentence_payload(result) - if sentence is None: - request_id = self._extract_field(result, "request_id") - logger.warning( - "ASR returned empty result", extra={"request_id": request_id} - ) - return "" - - if isinstance(sentence, dict): - transcription = sentence.get("text", "") - elif isinstance(sentence, list): - transcription = " ".join( - item.get("text", "") for item in sentence if isinstance(item, dict) - ) - else: - transcription = str(sentence) if sentence else "" - - logger.info( - "ASR transcription completed", - extra={"filename": filename, "transcript_length": len(transcription)}, - ) - return transcription - - except asyncio.CancelledError: - raise - except RuntimeError: - raise - except Exception as exc: - logger.exception("ASR transcription error") - raise RuntimeError(f"ASR transcription failed: {exc}") from exc - - def _extract_sentence_payload(self, result: Any) -> Any | None: - if isinstance(result, dict): - output = result.get("output") - if isinstance(output, dict): - return output.get("sentence") - if output is not None: - return getattr(output, "sentence", None) - return result.get("sentence") - - get_sentence = getattr(result, "get_sentence", None) - if callable(get_sentence): - sentence = get_sentence() - if sentence is not None: - return sentence - - output = getattr(result, "output", None) - if output is None: - return None - if isinstance(output, dict): - return output.get("sentence") - return getattr(output, "sentence", None) - - def _extract_field(self, result: Any, field: str) -> Any | None: - if isinstance(result, dict): - return result.get(field) - return getattr(result, field, None) - - -asr_service = AsrService() - - -def _mime_to_suffix(mime_type: str) -> str: - mapping = { - "image/png": "png", - "image/jpeg": "jpg", - "image/webp": "webp", - } - return mapping.get(mime_type.lower(), "bin") - - -def _is_safe_attachment_path(path: str, *, expected_prefix: str) -> bool: - normalized = path.strip() - if not normalized: - return False - if normalized.startswith("/"): - return False - if ".." in normalized: - return False - return normalized.startswith(expected_prefix) diff --git a/backend/src/v1/agent/utils.py b/backend/src/v1/agent/utils.py index fd14f2d..d67522d 100644 --- a/backend/src/v1/agent/utils.py +++ b/backend/src/v1/agent/utils.py @@ -14,6 +14,11 @@ from schemas.messages.chat_message import ( extract_user_message_attachments, ) +ALLOWED_ATTACHMENT_MIME_TYPES = {"image/png", "image/jpeg", "image/webp"} +MAX_ATTACHMENT_BYTES = 5 * 1024 * 1024 +MAX_TOTAL_ATTACHMENT_BYTES = 12 * 1024 * 1024 +MAX_ATTACHMENTS_PER_MESSAGE = 3 + def convert_message_to_history( message: AgentChatMessage, @@ -124,3 +129,23 @@ def _compile_worker_ui_hints( return compiled except Exception: return None + + +def mime_to_suffix(mime_type: str) -> str: + mapping = { + "image/png": "png", + "image/jpeg": "jpg", + "image/webp": "webp", + } + return mapping.get(mime_type.lower(), "bin") + + +def is_safe_attachment_path(path: str, *, expected_prefix: str) -> bool: + normalized = path.strip() + if not normalized: + return False + if normalized.startswith("/"): + return False + if ".." in normalized: + return False + return normalized.startswith(expected_prefix) diff --git a/backend/src/v1/memory/__init__.py b/backend/src/v1/memory/__init__.py new file mode 100644 index 0000000..48e7de0 --- /dev/null +++ b/backend/src/v1/memory/__init__.py @@ -0,0 +1,3 @@ +from v1.memory.service import MemoryService + +__all__ = ["MemoryService"] diff --git a/backend/src/v1/memory/repository.py b/backend/src/v1/memory/repository.py new file mode 100644 index 0000000..38d6866 --- /dev/null +++ b/backend/src/v1/memory/repository.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol +from uuid import UUID + +from sqlalchemy import select + +from core.db.base_repository import BaseRepository +from models.automation_jobs import AutomationJob + +if TYPE_CHECKING: + from sqlalchemy.ext.asyncio import AsyncSession + + +class MemoryRepositoryLike(Protocol): + async def get_job_by_id_and_owner( + self, *, job_id: UUID, owner_id: UUID + ) -> AutomationJob | None: ... + + +class MemoryRepository(BaseRepository[AutomationJob]): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session=session, model=AutomationJob) + + async def get_job_by_id_and_owner( + self, *, job_id: UUID, owner_id: UUID + ) -> AutomationJob | None: + stmt = ( + select(AutomationJob) + .where(AutomationJob.id == job_id) + .where(AutomationJob.owner_id == owner_id) + .where(AutomationJob.deleted_at.is_(None)) + ) + result = await self._session.execute(stmt) + return result.scalar_one_or_none() diff --git a/backend/src/v1/memory/service.py b/backend/src/v1/memory/service.py new file mode 100644 index 0000000..0068c2c --- /dev/null +++ b/backend/src/v1/memory/service.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from uuid import UUID + +from fastapi import HTTPException + +from schemas.automation.config import AutomationJobConfig +from v1.memory.repository import MemoryRepositoryLike + + +class MemoryService: + _repository: MemoryRepositoryLike + + def __init__(self, repository: MemoryRepositoryLike) -> None: + self._repository = repository + + async def get_memory_job_config( + self, *, job_id: UUID, owner_id: UUID + ) -> AutomationJobConfig: + job = await self._repository.get_job_by_id_and_owner( + job_id=job_id, owner_id=owner_id + ) + if job is None: + raise HTTPException(status_code=404, detail="Automation job not found") + return AutomationJobConfig.model_validate(job.config or {}) diff --git a/backend/tests/unit/core/agentscope/runtime/test_consumer_registry.py b/backend/tests/unit/core/agentscope/runtime/test_consumer_registry.py index a8653ee..17905fd 100644 --- a/backend/tests/unit/core/agentscope/runtime/test_consumer_registry.py +++ b/backend/tests/unit/core/agentscope/runtime/test_consumer_registry.py @@ -2,7 +2,7 @@ from __future__ import annotations import pytest -from core.agentscope.runtime.consumer_registry import build_consumer_registry +from core.agentscope.runtime.registry_builder import build_consumer_registry def test_build_consumer_registry_from_system_agent_configs() -> None: diff --git a/backend/tests/unit/core/agentscope/runtime/test_runner.py b/backend/tests/unit/core/agentscope/runtime/test_runner.py index 5f66057..49d2624 100644 --- a/backend/tests/unit/core/agentscope/runtime/test_runner.py +++ b/backend/tests/unit/core/agentscope/runtime/test_runner.py @@ -45,17 +45,6 @@ def _user_context() -> UserContext: ) -def test_parse_agent_type_supports_known_stages() -> None: - assert AgentScopeRunner._parse_agent_type(stage_name="router") == AgentType.ROUTER - assert AgentScopeRunner._parse_agent_type(stage_name="worker") == AgentType.WORKER - assert AgentScopeRunner._parse_agent_type(stage_name="memory") == AgentType.MEMORY - - -def test_parse_agent_type_rejects_unknown_stage() -> None: - with pytest.raises(ValueError, match="unsupported stage name"): - AgentScopeRunner._parse_agent_type(stage_name="planner") - - def test_build_worker_input_messages_only_contains_router_contract() -> None: runner = AgentScopeRunner() router_output = RouterAgentOutput( diff --git a/backend/tests/unit/core/taskiq/test_app.py b/backend/tests/unit/core/taskiq/test_app.py index 34387d5..159f4bc 100644 --- a/backend/tests/unit/core/taskiq/test_app.py +++ b/backend/tests/unit/core/taskiq/test_app.py @@ -5,14 +5,13 @@ import sys import pytest -from core.taskiq.app import broker, bulk_broker, critical_broker, default_broker +from core.taskiq.app import broker, worker_agent_broker, worker_automation_broker def test_taskiq_broker_is_configured() -> None: assert broker is not None - assert default_broker is broker - assert critical_broker is not None - assert bulk_broker is not None + assert worker_agent_broker is broker + assert worker_automation_broker is not None def test_taskiq_app_configures_logging_on_import( diff --git a/backend/tests/unit/schemas/agent/test_pipeline_spec.py b/backend/tests/unit/schemas/agent/test_pipeline_spec.py index 99b40c2..a682149 100644 --- a/backend/tests/unit/schemas/agent/test_pipeline_spec.py +++ b/backend/tests/unit/schemas/agent/test_pipeline_spec.py @@ -2,13 +2,12 @@ from __future__ import annotations import pytest -from schemas.agent.consumer_registry import AgentConsumerBinding, ConsumerRegistry -from schemas.agent.pipeline_spec import ( - ContextPolicy, - ExecutorKind, - PipelineSpec, - StageSpec, +from core.agentscope.schemas.consumer_registry import ( + AgentConsumerBinding, + ConsumerRegistry, ) +from core.agentscope.schemas.pipeline_spec import ExecutorKind, PipelineSpec, StageSpec +from schemas.agent.system_agent import AgentType def test_consumer_registry_rejects_duplicate_bits() -> None: @@ -29,9 +28,9 @@ def test_pipeline_spec_requires_non_empty_stages() -> None: def test_stage_spec_normalizes_stage_name() -> None: spec = StageSpec( stage_name=" Worker ", + agent_type=AgentType.WORKER, executor_kind=ExecutorKind.REACT, - default_visibility_mask=1, - context_policy=ContextPolicy(consumer_agent_type="worker", count=20), ) assert spec.stage_name == "worker" + assert spec.agent_type == AgentType.WORKER diff --git a/backend/tests/unit/v1/agent/test_service.py b/backend/tests/unit/v1/agent/test_service.py index 9565322..0c5a536 100644 --- a/backend/tests/unit/v1/agent/test_service.py +++ b/backend/tests/unit/v1/agent/test_service.py @@ -159,7 +159,7 @@ def _user() -> CurrentUser: ) -def _build_run_input(*, urls: list[str], agent_type: str = "worker") -> RunAgentInput: +def _build_run_input(*, urls: list[str], runtime_mode: str = "chat") -> RunAgentInput: content: list[dict[str, str]] = [{"type": "text", "text": "hello"}] for url in urls: content.append({"type": "binary", "mimeType": "image/png", "url": url}) @@ -177,7 +177,7 @@ def _build_run_input(*, urls: list[str], agent_type: str = "worker") -> RunAgent ], "tools": [], "context": [], - "forwardedProps": {"agent_type": agent_type}, + "forwardedProps": {"runtime_mode": runtime_mode}, } ) @@ -275,7 +275,7 @@ async def test_enqueue_run_rejects_unknown_agent_type(monkeypatch) -> None: urls=[ f"{base_url}/storage/v1/object/sign/agent-test-bucket/{safe_path}?token=1" ], - agent_type="planner", + runtime_mode="planner", ) with pytest.raises(HTTPException) as exc_info: @@ -285,7 +285,7 @@ async def test_enqueue_run_rejects_unknown_agent_type(monkeypatch) -> None: @pytest.mark.asyncio -async def test_enqueue_run_rejects_memory_mode_for_api(monkeypatch) -> None: +async def test_enqueue_run_rejects_invalid_runtime_mode(monkeypatch) -> None: monkeypatch.setattr( agent_service_module.config.storage, "bucket", "agent-test-bucket" ) @@ -296,24 +296,12 @@ async def test_enqueue_run_rejects_memory_mode_for_api(monkeypatch) -> None: stream=_FakeStream(), attachment_storage=_FakeAttachmentStorage(), ) - base_url = str(config.supabase.url).rstrip("/") - safe_path = quote( - "agent-inputs/00000000-0000-0000-0000-000000000001/" - "00000000-0000-0000-0000-000000000001/uploads/a.png" - ) - run_input = _build_run_input( - urls=[ - f"{base_url}/storage/v1/object/sign/agent-test-bucket/{safe_path}?token=1" - ], - agent_type="memory", - ) + run_input = _build_run_input(urls=[], runtime_mode="planner") with pytest.raises(HTTPException) as exc_info: await service.enqueue_run(run_input=run_input, current_user=_user()) assert exc_info.value.status_code == 422 - assert exc_info.value.detail == "memory mode is automation-only" - assert repository.created_session_calls == 0 assert repository.persisted_user_messages == [] diff --git a/docs/plans/2026-03-20-navigation-cache-decoupling-design.md b/docs/plans/2026-03-20-navigation-cache-decoupling-design.md deleted file mode 100644 index 40de36c..0000000 --- a/docs/plans/2026-03-20-navigation-cache-decoupling-design.md +++ /dev/null @@ -1,310 +0,0 @@ -# 前端导航解耦与统一缓存重构设计 - -## 1. 背景与问题定义 - -当前 `apps` 端在日历(日/月)与待办页面中存在以下系统性问题: - -1. 页面切换语义错误:将业务 tab 切换实现为 `push/go` 混用,导致页面重建与路由栈膨胀。 -2. 数据刷新触发错误:页面通过路由监听触发 `load`,频繁重复请求后端。 -3. 状态职责耦合:导航状态、页面状态、数据状态边界不清,导致“切换逻辑改动会牵出数据 bug”。 -4. 回主页语义不一致:Dock 首页按钮被 `canPop -> pop` 策略污染,行为变成“返回上页”。 -5. 缓存能力分散:仅存在局部的个人信息缓存(`SettingsUserCache`),缺少统一可复用缓存模块。 - -目标是完成一次结构化重构,建立「解耦的导航切换 + 统一缓存 + 可控一致性」体系。 - -## 2. 目标与非目标 - -### 2.1 目标 - -1. Home/Calendar/Todo 切换不重建主页面(保持页面实例与滚动状态)。 -2. 日/月视图切换不触发整页重建和无必要网络请求。 -3. 建立统一缓存模块,合并个人信息缓存并覆盖 Calendar/Todo 数据读取。 -4. 启动体验采用「本地优先 + 后台静默刷新」策略,减少进入 App 的重复请求。 -5. 数据只在必要时刷新:手动下拉、写操作失效、生命周期关键点、缓存策略命中。 -6. 主页按钮语义固定为“回主页”,不再变成“返回上一页”。 -7. 一级页面唯一为 Home,日历日/月视图、待办、设置均为二级页面;二级页面侧滑只允许返回一级页面,不允许直接退出 App。 -8. App 退出入口仅存在于一级页面(Home)。 - -### 2.2 非目标 - -1. 本次不改后端协议与接口契约。 -2. 本次不引入复杂离线同步冲突解决(如多端 CRDT)。 -3. 本次不引入全量本地数据库迁移(先基于 SharedPreferences 持久化层)。 - -## 3. 复杂度与风险分级 - -- Complexity: `S3` - - 跨 router、calendar、todo、settings、DI 的架构级调整。 -- Risk Tier: `L1` - - 不触及鉴权协议和支付等高风险域,但涉及导航返回栈与数据一致性高回归区。 - -## 4. 架构总览 - -### 4.1 导航分层 - -采用分级导航: - -1. 一级页面(唯一):Home - - 仅 Home 允许触发系统退出路径。 -2. 二级页面(主业务入口) - - Calendar Day/Month - - Todo List(Quadrants) - - Settings - - 规则:二级页面的系统返回/侧滑返回统一回 Home,不允许直接退出 App。 -3. 三级页面(细节页) - - Calendar event detail/edit/share - - Todo detail/edit - - Settings 子页面(account/profile edit 等) - - 规则:三级页面返回到上一级(二级或三级上层)。 - -### 4.2 状态与数据边界 - -1. 导航状态:Shell 当前分支 index、Calendar 内部视图类型。 -2. UI 状态:选中日期、滚动位置、拖拽态、loading/error。 -3. 数据状态:统一缓存模块管理(内存 + 持久化 + 网络回写)。 - -结论:页面只发“意图”,不直接承担缓存与路由策略。 - -## 5. 统一缓存模块设计 - -## 5.1 模块结构 - -新增 `apps/lib/core/cache/`: - -1. `cache_key.dart` - - 统一 key 命名规范。 -2. `cache_policy.dart` - - TTL、软/硬过期、最小刷新间隔、刷新原因枚举。 -3. `cache_entry.dart` - - 标准缓存实体(data/fetchedAt/expiresAt/version/dirty)。 -4. `cache_store.dart` - - 抽象接口(get/set/remove/invalidateNamespace)。 -5. `memory_cache_store.dart` - - 会话级热缓存。 -6. `persistent_cache_store.dart` - - 本地冷缓存(SharedPreferences JSON)。 -7. `hybrid_cache_store.dart` - - 两级缓存协调与 singleflight 去重。 -8. `cache_invalidator.dart` - - 统一精准失效入口。 - -### 5.2 key 设计(首版) - -1. 用户信息 - - `user:profile:me` -2. 日历 - - `calendar:day:YYYY-MM-DD` - - `calendar:month:YYYY-MM` -3. 待办 - - `todo:list:pending` - - `todo:list:priority:`(按需) - - `todo:detail:`(按需) - -### 5.3 策略设计(平衡型) - -读取顺序:`memory -> persistent -> network`。 - -刷新策略: - -1. 软过期(stale-while-revalidate) - - 先展示缓存,后台静默刷新。 -2. 硬过期 - - 超过硬过期后必须请求网络或提示数据过旧。 -3. 最小刷新间隔 - - 避免频繁切换/回前台引发抖动请求。 - -建议默认值: - -1. `user:profile`:软过期 30min,硬过期 24h。 -2. `calendar:day`:软过期 2min,硬过期 30min。 -3. `calendar:month`:软过期 5min,硬过期 60min。 -4. `todo:list:pending`:软过期 2min,硬过期 30min。 - -### 5.4 个人信息缓存合并方案 - -现有 `SettingsUserCache` 并入统一缓存模块: - -1. 新建 `UserProfileRepository`(或在现有 settings service 中引入统一缓存)。 -2. `getProfile()` 通过 hybrid cache 获取 `user:profile:me`。 -3. 更新 profile 成功后立即写回缓存并同步持久化。 -4. 登出/会话失效时统一调用 `invalidateNamespace('user')`。 - -## 6. 一致性风险与解决方案 - -平衡型缓存会存在“短暂陈旧窗口”。本设计通过以下机制将体验风险降到可接受范围。 - -### 6.1 触发刷新矩阵 - -1. 手动下拉刷新:强制网络刷新。 -2. 写操作成功:精准失效受影响 key 并触发回填。 -3. App 回前台:若超过最小刷新间隔,触发静默刷新。 -4. 网络离线 -> 在线:触发静默刷新。 -5. 进入关键详情页:按策略进行 freshness check。 - -### 6.2 写后一致性 - -1. 乐观更新:本地先更新 UI 与缓存,避免“我刚改完却没变”。 -2. 失败回滚:API 失败时恢复旧值并 Toast 提示。 -3. 精准失效:不做全局清空,只失效关联 key,兼顾一致性与性能。 - -### 6.3 并发安全 - -1. singleflight:同 key 同时只允许一个网络请求。 -2. 版本保护:缓存写入比较 `updatedAt/version`,拒绝旧响应覆盖新状态。 -3. 失败兜底:请求失败不清空旧缓存,保持可读并允许重试。 - -### 6.4 可见性保障 - -1. 页面可显示“上次同步时间”(轻提示)。 -2. 硬过期数据需可见提醒(弱提示,不阻断基础浏览)。 -3. 提供稳定手动刷新入口。 - -### 6.5 日历提醒取消动作的一致性兜底 - -1. 用户在提醒弹层点击“取消/归档”时,前端必须立即发送归档请求,要求后端立刻将事件归档/过期。 -2. “延迟归档(outbox/pending)”仅在 App 进程不可用(被杀/未启动)时生效,作为离线或冷启动兜底。 -3. App 冷启动或恢复前台后,必须优先冲刷 pending 归档请求,确保最终一致性。 -4. 对用户可见行为要求:点击取消后 UI 立即反映归档状态,网络失败时展示重试提示,并保留 pending 记录。 - -## 7. 导航与页面职责重构 - -### 7.1 路由重构 - -1. `app_router` 引入 shell 分支,不再平铺所有主页面。 -2. Dock 切换改为 branch index 切换,不再 `push` 主页面。 -3. Calendar 内部 month/day 切换改为视图切换,不新增栈层。 -4. 事件详情/编辑等保留 `push`(细节页合理叠栈)。 - -### 7.2 回主页逻辑修正 - -1. Dock Home 统一执行“切到 Home 分支/`go('/home')`”。 -2. `returnToHomePreserveState` 仅用于非 Dock 的返回策略场景。 -3. 消除 `canPop -> pop` 对主页按钮语义的影响。 -4. 二级页面(Calendar Day/Month、Todo、Settings)统一拦截系统返回和侧滑返回,目标固定为 Home。 -5. App 退出只允许在 Home 页面生效(可采用双击退出或系统默认行为)。 - -### 7.3 页面职责收敛 - -1. Calendar/Todo 页面移除路由监听触发 `load`。 -2. 页面只调用 repository: - - `get(policy)` - - `refresh(force: true)` - - `mutate(...) + invalidate(...)` -3. 页面不直接感知“缓存在哪一层”。 - -## 8. 分阶段实施计划(里程碑) - -### M1 导航壳层与切换语义 - -1. 引入 shell + 分支保活。 -2. Dock 接口改造与主 tab 切换实现。 -3. Home 按钮语义修正。 -4. 建立分级返回约束:二级 -> Home,三级 -> 上一级,退出仅 Home。 - -### M2 统一缓存骨架 - -1. 新增 core cache 模块。 -2. 接入 user profile(替换 `SettingsUserCache`)。 -3. DI 注入 cache store 与 invalidator。 - -### M3 Calendar 接入 - -1. 引入 `CalendarRepository` 与 day/month key。 -2. 移除 route listener 自动刷新。 -3. 切换 month/day 时默认走缓存,不触发无必要请求。 - -### M4 Todo 接入 - -1. 引入 `TodoRepository` 与 list/detail key。 -2. 拖拽、完成、编辑后的精准失效。 -3. 下拉刷新走强制网络。 - -### M5 清理与验证 - -1. 清理旧缓存与重复加载逻辑。 -2. 补齐测试与性能观测。 -3. 评估参数并收敛默认策略。 -4. 验证提醒“点击取消即实时归档”与“App 关闭时延迟归档兜底”双路径。 - -## 9. 验收标准 - -### 9.1 体验验收 - -1. Home/Calendar/Todo 切换无明显重建卡顿。 -2. 日/月切换响应明显变快。 -3. 首次冷启动可先看到本地缓存内容。 -4. Dock Home 始终回主页。 -5. 二级页面侧滑返回永远回 Home,不直接退出 App。 - -### 9.2 网络验收 - -1. 切换页面时网络请求显著减少。 -2. 写操作后关联数据可及时更新。 -3. 手动刷新可强制拉取并回写缓存。 -4. 提醒取消动作触发实时归档请求,成功率可观测。 - -### 9.3 一致性验收 - -1. 不出现旧响应覆盖新数据。 -2. 离线后恢复在线可自动静默同步。 -3. 软过期/硬过期行为符合策略定义。 -4. 提醒归档在在线/离线/冷启动场景下保持最终一致。 - -## 10. 测试与验证计划 - -### 10.1 单元测试 - -1. `hybrid_cache_store`:命中链路、singleflight、软硬过期判定。 -2. `cache_invalidator`:写操作触发的 key 精准失效。 -3. repository:读缓存、后台刷新、失败兜底、版本保护。 - -### 10.2 组件/页面测试(高回归) - -1. Dock 切换不重建分支主页面。 -2. 日/月切换不重复触发全量加载。 -3. Home 按钮行为稳定。 -4. 二级页面系统返回不会触发 App 退出。 - -### 10.3 集成回归 - -1. Calendar -> Todo -> Calendar 多轮切换请求计数。 -2. Todo 完成后列表更新与缓存一致性。 -3. profile 更新后设置页/其他依赖页可见一致。 -4. 提醒取消 -> 立即归档 -> 日历列表刷新链路。 -5. App 杀进程后触发提醒,重启后 pending 归档自动冲刷。 - -## 11. 风险与回滚 - -### 11.1 主要风险 - -1. 导航壳层改造可能引发深链与返回栈回归。 -2. 缓存策略参数不当可能造成陈旧感。 -3. 早期失效 key 设计不完整可能出现局部不刷新。 - -### 11.2 控制策略 - -1. 按里程碑逐步落地,每个里程碑可单独回滚。 -2. 默认保留手动刷新兜底。 -3. 增加请求计数与缓存命中日志(开发态)。 - -### 11.3 回滚策略 - -1. 若 M1 不稳定,可先回退 shell 改造并保留缓存模块。 -2. 若缓存接入问题集中,可按域回退(user/calendar/todo 分域开关)。 - -## 12. 最终落地参数(2026-03-20) - -1. 导航分级 - - 一级页面唯一为 `Home`。 - - 二级页面(日/月、待办、设置)侧滑返回统一回 `Home`,不允许直接退出 App。 - - App 退出入口仅保留在 `Home`。 -2. 缓存默认策略 - - `user:profile`:软过期 30min,硬过期 24h。 - - `calendar:day`:软过期 2min,硬过期 30min。 - - `calendar:month`:软过期 5min,硬过期 60min。 - - `todo:list:pending`:软过期 2min,硬过期 30min。 -3. 生命周期刷新 - - App 回前台时启用最小间隔 5min 的静默刷新协调器。 -4. 提醒归档策略 - - App 活跃态点击取消:立即请求后端归档。 - - 延迟归档(pending/outbox)仅用于 App 不可用场景兜底。 diff --git a/docs/plans/2026-03-20-navigation-cache-decoupling-implementation-plan.md b/docs/plans/2026-03-20-navigation-cache-decoupling-implementation-plan.md deleted file mode 100644 index 7249f1b..0000000 --- a/docs/plans/2026-03-20-navigation-cache-decoupling-implementation-plan.md +++ /dev/null @@ -1,537 +0,0 @@ -# 前端导航解耦与统一缓存重构 Implementation Plan - -> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. - -**Goal:** 完成导航分级回退(一级唯一 Home)与统一缓存改造,实现本地优先显示、后台静默刷新、写后精准失效,并落地“提醒取消即实时归档 + App 关闭时延迟归档兜底”。 - -**Architecture:** 路由层采用“一级唯一 Home + 二级业务页 + 三级细节页”的分级返回模型,二级页面返回统一回 Home,退出入口仅 Home;数据层新增 `core/cache` 统一缓存模块(memory + persistent + hybrid);业务层通过 repository 接入缓存策略与失效器。提醒动作采用实时归档优先,pending outbox 仅用于 App 不可用场景兜底。 - -**Tech Stack:** Flutter, go_router, get_it, shared_preferences, flutter_test, mocktail - ---- - -### Task 0: 锁定导航分级与退出语义(一级/二级/三级) - -**Files:** -- Modify: `apps/lib/core/router/app_router.dart` -- Modify: `apps/lib/features/home/ui/navigation/home_return_policy.dart` -- Modify: `apps/lib/features/calendar/ui/screens/calendar_dayweek_screen.dart` -- Modify: `apps/lib/features/calendar/ui/screens/calendar_month_screen.dart` -- Modify: `apps/lib/features/todo/ui/screens/todo_quadrants_screen.dart` -- Modify: `apps/lib/features/settings/ui/screens/settings_screen.dart` -- Test: `apps/test/features/home/ui/navigation/home_return_policy_test.dart` - -**Step 1: Write the failing test** - -```dart -test('second-level pages should return to home instead of exiting app', () { - final action = resolveHomeReturnAction( - canPop: false, - isAuthEntry: false, - forceGoHome: true, - ); - expect(action, HomeReturnAction.goHome); -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/features/home/ui/navigation/home_return_policy_test.dart` -Expected: FAIL with old return behavior. - -**Step 3: Write minimal implementation** - -```dart -if (forceGoHome) return HomeReturnAction.goHome; -``` - -**Step 4: Run tests to verify they pass** - -Run: `cd apps && flutter test test/features/home/ui/navigation/home_return_policy_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/core/router/app_router.dart apps/lib/features/home/ui/navigation/home_return_policy.dart apps/lib/features/calendar/ui/screens/calendar_dayweek_screen.dart apps/lib/features/calendar/ui/screens/calendar_month_screen.dart apps/lib/features/todo/ui/screens/todo_quadrants_screen.dart apps/lib/features/settings/ui/screens/settings_screen.dart apps/test/features/home/ui/navigation/home_return_policy_test.dart -git commit -m "feat: enforce hierarchical back navigation and home-only exit" -``` - -### Task 1: 建立统一缓存核心模型与策略 - -**Files:** -- Create: `apps/lib/core/cache/cache_entry.dart` -- Create: `apps/lib/core/cache/cache_key.dart` -- Create: `apps/lib/core/cache/cache_policy.dart` -- Test: `apps/test/core/cache/cache_policy_test.dart` - -**Step 1: Write the failing test** - -```dart -import 'package:flutter_test/flutter_test.dart'; -import 'package:social_app/core/cache/cache_policy.dart'; - -void main() { - test('soft expired should allow stale read with background refresh', () { - final now = DateTime(2026, 3, 20, 12); - final policy = CachePolicy( - softTtl: const Duration(minutes: 2), - hardTtl: const Duration(minutes: 30), - minRefreshInterval: const Duration(minutes: 1), - ); - - final fetchedAt = now.subtract(const Duration(minutes: 3)); - final decision = policy.evaluate(now: now, fetchedAt: fetchedAt); - expect(decision.canUseCached, true); - expect(decision.shouldRefreshInBackground, true); - expect(decision.mustBlockForNetwork, false); - }); -} -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/core/cache/cache_policy_test.dart` -Expected: FAIL with missing cache policy symbols. - -**Step 3: Write minimal implementation** - -```dart -class CacheDecision { - final bool canUseCached; - final bool shouldRefreshInBackground; - final bool mustBlockForNetwork; - const CacheDecision({ - required this.canUseCached, - required this.shouldRefreshInBackground, - required this.mustBlockForNetwork, - }); -} -``` - -**Step 4: Run test to verify it passes** - -Run: `cd apps && flutter test test/core/cache/cache_policy_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/core/cache/cache_entry.dart apps/lib/core/cache/cache_key.dart apps/lib/core/cache/cache_policy.dart apps/test/core/cache/cache_policy_test.dart -git commit -m "feat: add unified cache policy primitives" -``` - -### Task 2: 实现 memory/persistent/hybrid cache store(含 singleflight) - -**Files:** -- Create: `apps/lib/core/cache/cache_store.dart` -- Create: `apps/lib/core/cache/memory_cache_store.dart` -- Create: `apps/lib/core/cache/persistent_cache_store.dart` -- Create: `apps/lib/core/cache/hybrid_cache_store.dart` -- Test: `apps/test/core/cache/hybrid_cache_store_test.dart` - -**Step 1: Write the failing test** - -```dart -test('same key concurrent load should execute loader once', () async { - var calls = 0; - final store = HybridCacheStore(...); - Future loader() async { - calls += 1; - return 'ok'; - } - await Future.wait([ - store.getOrLoad('k', loader: loader), - store.getOrLoad('k', loader: loader), - ]); - expect(calls, 1); -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/core/cache/hybrid_cache_store_test.dart` -Expected: FAIL with missing HybridCacheStore. - -**Step 3: Write minimal implementation** - -```dart -final Map> _inflight = {}; -``` - -**Step 4: Run test to verify it passes** - -Run: `cd apps && flutter test test/core/cache/hybrid_cache_store_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/core/cache/cache_store.dart apps/lib/core/cache/memory_cache_store.dart apps/lib/core/cache/persistent_cache_store.dart apps/lib/core/cache/hybrid_cache_store.dart apps/test/core/cache/hybrid_cache_store_test.dart -git commit -m "feat: implement hybrid cache store with singleflight" -``` - -### Task 3: 接入 DI 与统一失效器 - -**Files:** -- Create: `apps/lib/core/cache/cache_invalidator.dart` -- Modify: `apps/lib/core/di/injection.dart` -- Test: `apps/test/core/cache/cache_invalidator_test.dart` - -**Step 1: Write the failing test** - -```dart -test('invalidate calendar day should also invalidate month key', () { - final inv = CacheInvalidator(...); - inv.invalidateCalendarDay(DateTime(2026, 3, 20)); - expect(inv.wasInvalidated('calendar:day:2026-03-20'), true); - expect(inv.wasInvalidated('calendar:month:2026-03'), true); -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/core/cache/cache_invalidator_test.dart` -Expected: FAIL. - -**Step 3: Write minimal implementation** - -```dart -class CacheInvalidator { - void invalidateCalendarDay(DateTime date) { /* invalidate day + month */ } -} -``` - -**Step 4: Run test to verify it passes** - -Run: `cd apps && flutter test test/core/cache/cache_invalidator_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/core/cache/cache_invalidator.dart apps/lib/core/di/injection.dart apps/test/core/cache/cache_invalidator_test.dart -git commit -m "refactor: wire unified cache and invalidator in di" -``` - -### Task 4: 合并个人信息缓存(替换 SettingsUserCache) - -**Files:** -- Modify: `apps/lib/features/settings/data/services/settings_user_cache.dart` -- Create: `apps/lib/features/settings/data/services/user_profile_cache_repository.dart` -- Modify: `apps/lib/features/settings/ui/screens/settings_screen.dart` -- Test: `apps/test/features/settings/data/services/settings_user_cache_test.dart` -- Create: `apps/test/features/settings/data/services/user_profile_cache_repository_test.dart` - -**Step 1: Write the failing test** - -```dart -test('repository should return persistent cache first then refresh in background', () async { - // Arrange cached profile in persistent store - // Assert immediate cached result + refresh called once -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/features/settings/data/services/user_profile_cache_repository_test.dart` -Expected: FAIL. - -**Step 3: Write minimal implementation** - -```dart -class UserProfileCacheRepository { - Future getProfile({bool forceRefresh = false}) async { ... } -} -``` - -**Step 4: Run tests to verify they pass** - -Run: `cd apps && flutter test test/features/settings/data/services/settings_user_cache_test.dart test/features/settings/data/services/user_profile_cache_repository_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/features/settings/data/services/settings_user_cache.dart apps/lib/features/settings/data/services/user_profile_cache_repository.dart apps/lib/features/settings/ui/screens/settings_screen.dart apps/test/features/settings/data/services/settings_user_cache_test.dart apps/test/features/settings/data/services/user_profile_cache_repository_test.dart -git commit -m "refactor: merge profile cache into unified cache repository" -``` - -### Task 5: 路由改造为 StatefulShellRoute + Dock 切换分支 - -**Files:** -- Modify: `apps/lib/core/router/app_router.dart` -- Modify: `apps/lib/core/router/app_routes.dart` -- Modify: `apps/lib/features/calendar/ui/widgets/bottom_dock.dart` -- Modify: `apps/lib/features/home/ui/navigation/home_return_policy.dart` -- Test: `apps/test/core/router/app_routes_test.dart` -- Modify: `apps/test/features/home/ui/navigation/home_return_policy_test.dart` - -**Step 1: Write the failing test** - -```dart -test('dock home action should always resolve to goHome', () { - final action = resolveHomeReturnAction(canPop: true, isAuthEntry: false); - expect(action, HomeReturnAction.goHomeForDock); -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/features/home/ui/navigation/home_return_policy_test.dart` -Expected: FAIL with old behavior. - -**Step 3: Write minimal implementation** - -```dart -enum HomeReturnAction { pop, goHome, goHomeForDock } -``` - -**Step 4: Run tests to verify they pass** - -Run: `cd apps && flutter test test/core/router/app_routes_test.dart test/features/home/ui/navigation/home_return_policy_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/core/router/app_router.dart apps/lib/core/router/app_routes.dart apps/lib/features/calendar/ui/widgets/bottom_dock.dart apps/lib/features/home/ui/navigation/home_return_policy.dart apps/test/core/router/app_routes_test.dart apps/test/features/home/ui/navigation/home_return_policy_test.dart -git commit -m "feat: switch main navigation to stateful shell tabs" -``` - -### Task 6: Calendar repository 化并移除路由监听刷新 - -**Files:** -- Create: `apps/lib/features/calendar/data/services/calendar_repository.dart` -- Modify: `apps/lib/features/calendar/ui/screens/calendar_dayweek_screen.dart` -- Modify: `apps/lib/features/calendar/ui/screens/calendar_month_screen.dart` -- Modify: `apps/lib/features/calendar/ui/calendar_state_manager.dart` -- Create: `apps/test/features/calendar/data/services/calendar_repository_test.dart` - -**Step 1: Write the failing test** - -```dart -test('getDayEvents returns cache immediately and refreshes in background', () async { - // Arrange cache day key - // Assert cached list emitted before network completion -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/features/calendar/data/services/calendar_repository_test.dart` -Expected: FAIL. - -**Step 3: Write minimal implementation** - -```dart -class CalendarRepository { - Future> getDayEvents(DateTime date, {bool forceRefresh = false}) async { ... } -} -``` - -**Step 4: Run tests to verify they pass** - -Run: `cd apps && flutter test test/features/calendar/data/services/calendar_repository_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/features/calendar/data/services/calendar_repository.dart apps/lib/features/calendar/ui/screens/calendar_dayweek_screen.dart apps/lib/features/calendar/ui/screens/calendar_month_screen.dart apps/lib/features/calendar/ui/calendar_state_manager.dart apps/test/features/calendar/data/services/calendar_repository_test.dart -git commit -m "refactor: decouple calendar screens from route-driven reload" -``` - -### Task 7: Todo repository 化与写后精准失效 - -**Files:** -- Create: `apps/lib/features/todo/data/todo_repository.dart` -- Modify: `apps/lib/features/todo/ui/screens/todo_quadrants_screen.dart` -- Modify: `apps/lib/features/todo/data/todo_api.dart` -- Create: `apps/test/features/todo/todo_repository_test.dart` -- Modify: `apps/test/features/todo/quadrant_drag_test.dart` - -**Step 1: Write the failing test** - -```dart -test('complete todo should optimistically update and invalidate pending list key', () async { - // assert local list updated first, invalidator called -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/features/todo/todo_repository_test.dart` -Expected: FAIL. - -**Step 3: Write minimal implementation** - -```dart -class TodoRepository { - Future completeTodo(String id) async { ... } -} -``` - -**Step 4: Run tests to verify they pass** - -Run: `cd apps && flutter test test/features/todo/todo_repository_test.dart test/features/todo/quadrant_drag_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/features/todo/data/todo_repository.dart apps/lib/features/todo/ui/screens/todo_quadrants_screen.dart apps/lib/features/todo/data/todo_api.dart apps/test/features/todo/todo_repository_test.dart apps/test/features/todo/quadrant_drag_test.dart -git commit -m "feat: add todo cache repository and precise invalidation" -``` - -### Task 8: App 生命周期与网络恢复刷新策略 - -**Files:** -- Create: `apps/lib/core/cache/cache_refresh_coordinator.dart` -- Modify: `apps/lib/main.dart` -- Create: `apps/test/core/cache/cache_refresh_coordinator_test.dart` - -**Step 1: Write the failing test** - -```dart -test('resume should trigger refresh only when min interval elapsed', () { - // Arrange last refreshed timestamp - // Assert callback invocation count -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/core/cache/cache_refresh_coordinator_test.dart` -Expected: FAIL. - -**Step 3: Write minimal implementation** - -```dart -class CacheRefreshCoordinator with WidgetsBindingObserver { ... } -``` - -**Step 4: Run test to verify it passes** - -Run: `cd apps && flutter test test/core/cache/cache_refresh_coordinator_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/core/cache/cache_refresh_coordinator.dart apps/lib/main.dart apps/test/core/cache/cache_refresh_coordinator_test.dart -git commit -m "feat: add app lifecycle refresh coordinator" -``` - -### Task 9: 提醒取消实时归档与延迟归档兜底收敛 - -**Files:** -- Modify: `apps/lib/features/calendar/reminders/reminder_action_executor.dart` -- Modify: `apps/lib/features/calendar/reminders/ui/reminder_foreground_presenter.dart` -- Modify: `apps/lib/core/notifications/local_notification_service.dart` -- Modify: `apps/lib/main.dart` -- Modify: `apps/test/features/calendar/reminders/reminder_action_executor_test.dart` -- Modify: `apps/test/features/calendar/reminders/reminder_notification_bridge_test.dart` - -**Step 1: Write the failing test** - -```dart -test('archive action should send remote archive immediately when app active', () async { - // Arrange active app + online gateway - // Act archive action - // Assert remote archive called once and local pending outbox not created -}); -``` - -**Step 2: Run test to verify it fails** - -Run: `cd apps && flutter test test/features/calendar/reminders/reminder_action_executor_test.dart` -Expected: FAIL with delayed-only behavior. - -**Step 3: Write minimal implementation** - -```dart -if (isAppActive) { - await repository.archiveNow(eventId); -} else { - await outbox.enqueueArchive(eventId); -} -``` - -**Step 4: Run tests to verify they pass** - -Run: `cd apps && flutter test test/features/calendar/reminders/reminder_action_executor_test.dart test/features/calendar/reminders/reminder_notification_bridge_test.dart` -Expected: PASS. - -**Step 5: Commit** - -```bash -git add apps/lib/features/calendar/reminders/reminder_action_executor.dart apps/lib/features/calendar/reminders/ui/reminder_foreground_presenter.dart apps/lib/core/notifications/local_notification_service.dart apps/lib/main.dart apps/test/features/calendar/reminders/reminder_action_executor_test.dart apps/test/features/calendar/reminders/reminder_notification_bridge_test.dart -git commit -m "fix: prioritize realtime reminder archive with cold-start fallback" -``` - -### Task 10: 全量验证与文档同步 - -**Files:** -- Modify: `docs/protocols/*`(仅当路由/数据契约文档需更新时) -- Modify: `docs/plans/2026-03-20-navigation-cache-decoupling-design.md`(回填最终参数) - -**Step 1: Run focused tests** - -Run: - -```bash -cd apps && flutter test test/core/cache test/features/settings/data/services/settings_user_cache_test.dart test/features/calendar test/features/todo test/features/home/ui/navigation/home_return_policy_test.dart test/core/router/app_routes_test.dart -``` - -Expected: PASS. - -**Step 2: Run app-level verification** - -Run: `cd apps && flutter test` -Expected: PASS. - -**Step 3: Static checks** - -Run: `cd apps && flutter analyze` -Expected: No errors. - -**Step 4: Manual verification checklist** - -1. 冷启动先显示缓存,随后静默更新。 -2. Home/Calendar/Todo 来回切换不重建主页面。 -3. 日/月切换不触发无必要请求。 -4. Dock Home 始终回主页。 -5. 写后数据可见一致,失败可回滚提示。 -6. 二级页面侧滑返回只回 Home,不直接退出。 -7. 提醒点击取消时立刻归档;仅在 App 不可用时走 pending 兜底。 - -**Step 5: Commit** - -```bash -git add docs/plans/2026-03-20-navigation-cache-decoupling-design.md docs/protocols -git commit -m "docs: finalize navigation decoupling and unified cache rollout" -``` - -## 实施顺序约束 - -1. 必须先完成 Task 0-3 再改业务页面(否则会出现重复实现)。 -2. Task 0(分级返回)与 Task 5(路由壳层)要分开提交,便于单独回滚。 -3. 每个 Task 的测试必须在本 Task 完成后立即执行,避免堆积回归。 -4. 不允许在未通过 focused tests 的情况下进入全量验证。 - -## 回滚策略 - -1. 若返回语义回归:先回滚 Task 0 提交,再评估 Task 5。 -2. 若缓存一致性异常:优先回滚 Task 6/7 的 repository 接入提交。 -3. 若生命周期刷新过于频繁:关闭 Task 8 coordinator 挂载,保留手动刷新兜底。 -4. 若提醒实时归档异常:回滚 Task 9,仅保留 outbox 兜底路径。 - -## Done 定义 - -1. 所有测试与 analyze 通过。 -2. 主页按钮行为稳定,无“返回上一页”误行为。 -3. 切换页面请求数明显下降,写后一致性符合设计预期。 -4. 统一缓存已接管用户信息、日历、待办三域。 -5. 二级页面不再可直接侧滑退出 App。 -6. 提醒归档满足“实时优先、关闭兜底”策略。 diff --git a/docs/plans/2026-03-20-reminder-overlay-design.md b/docs/plans/2026-03-20-reminder-overlay-design.md deleted file mode 100644 index f5e4a18..0000000 --- a/docs/plans/2026-03-20-reminder-overlay-design.md +++ /dev/null @@ -1,221 +0,0 @@ -# Reminder Overlay 设计文档 - -## 概述 - -重构日历提醒机制,简化前台/后台判断逻辑,将所有提醒交互统一到独立的 ReminderOverlay 组件处理。 - -## 背景 - -当前实现复杂,涉及: -- App 启动状态判断(前台/后台) -- 离线归档请求队列 + 指数退避重试 -- 通知权限降级(Android Timer 模拟) -- 聚合通知批量操作 - -新方案利用 iOS/Android 原生通知分组能力,实现: -- 每条通知独立 payload,点击哪条处理哪条 -- 统一的 ReminderOverlay 处理所有用户交互 -- 操作完成后 app 退到后台 - -## 设计决策 - -| 决策项 | 选择 | -|--------|------| -| 关闭 overlay 后的行为 | 回到首页,保持缓存状态 | -| 同分钟多条通知处理 | 按点击顺序处理当前,剩余按时间排序 | -| iOS 冷启动 payload 传递 | UserDefaults(App Groups 方案) | -| Android 通知展示 | Full-screen intent(锁屏也弹窗) | -| 稍后提醒时间选项 | 5 分钟、15 分钟(下拉选项) | -| "完成"按钮行为 | 归档 + 关闭 + 退后台 | -| "稍后提醒"按钮行为 | 弹出选项 + 延后通知 + 关闭 + 退后台 | -| UI 组件 | 新建 ReminderOverlay(不复用现有) | - -## 核心流程 - -``` -通知到达 → 用户点击通知 → - ├─ App 已运行 → 恢复前台 → 直接收到 payload → 打开 ReminderOverlay - └─ App 未运行 → - ├─ iOS: 原生层写入 UserDefaults → Flutter 启动时读取 - └─ Android: full-screen intent 启动 → Flutter 收到 payload - -ReminderOverlay 显示: - - 日程标题 - - 当前时间 - - [稍后提醒 ▼] | [完成] - -用户操作: - ├─ 完成 → 归档请求 → 关闭 overlay → 退后台 - └─ 稍后提醒 → 选择时间 → 取消当前通知 + 注册新通知 → 关闭 overlay → 退后台 - -处理完当前 → 检查同分钟是否有多条 → - ├─ 有 → 打开下一条的 ReminderOverlay - └─ 无 → 保持退后台状态 -``` - -## 移除的组件 - -| 组件 | 文件路径 | 移除原因 | -|------|----------|----------| -| ReminderColdStartQueue | `lib/features/calendar/reminders/reminder_cold_start_queue.dart` | 不需要后台重放机制 | -| ReminderOutboxStore | `lib/features/calendar/reminders/reminder_outbox_store.dart` | 不需要离线归档队列 | -| ReminderForegroundPresenter | `lib/features/calendar/reminders/ui/reminder_foreground_presenter.dart` | 不需要前台判断 | -| ReminderPresentationCoordinator | `lib/features/calendar/reminders/ui/reminder_presentation_coordinator.dart` | 不需要防重复展示 | -| ReminderActionDedupeStore | `lib/features/calendar/reminders/reminder_action_dedupe_store.dart` | 通知原生幂等 | -| ReminderOverlapPolicy | `lib/features/calendar/reminders/reminder_overlap_policy.dart` | 改为原生分组 | -| Android Timer 模拟逻辑 | `LocalNotificationService` 内 | 不需要权限降级 | - -## 新增组件 - -### ReminderOverlay - -独立的状态管理页面,处理提醒交互。 - -**职责**: -- 显示日程标题和当前时间 -- 提供"稍后提醒"下拉选项(5分钟/15分钟) -- 提供"完成"按钮(归档) -- 处理完成后关闭 overlay - -**文件位置**:`lib/features/calendar/reminders/ui/reminder_overlay.dart` - -### ReminderQueueManager - -管理同分钟多条通知的处理队列。 - -**职责**: -- 存储同分钟的通知列表 -- 按点击顺序跟踪当前处理项 -- 处理完当前后调度下一项 - -**文件位置**:`lib/features/calendar/reminders/reminder_queue_manager.dart` - -### IOSNotificationPayloadBridge - -iOS 冷启动时从 UserDefaults 读取 notification payload。 - -**职责**: -- App 启动时检查是否有待处理的通知 launch -- 读取 payload 并打开对应的 ReminderOverlay -- 处理完成后清理 UserDefaults - -**文件位置**:`lib/core/notifications/ios_notification_payload_bridge.dart` - -## 平台差异处理 - -### iOS - -1. **通知点击启动 App**: - - 配置 `setPluginRegistrantCallback`(已有) - - iOS 原生层将 payload 写入 UserDefaults - - Flutter 启动时 `IOSNotificationPayloadBridge` 读取数据 - -2. **通知分组**: - - 使用 `threadIdentifier` 分组 - - 同一分钟的通知使用相同的 `threadIdentifier` - -### Android - -1. **Full-screen intent**: - - `AndroidNotificationDetails` 设置 `fullScreenIntent: true` - - 锁屏时直接弹出全屏 overlay - -2. **通知分组**: - - 使用 `groupKey` 分组 - - 同一分钟的通知使用相同的 `groupKey` - -## API 变化 - -### 归档请求 - -仍然使用现有的 `CalendarService.archiveEvent()`,但不再需要失败重试逻辑。 - -``` -POST /api/v1/calendar/events/{eventId}/archive -``` - -### 通知 Payload - -```json -{ - "eventId": "evt_xxx", - "title": "日程标题", - "startAt": "2026-03-20T10:00:00Z", - "endAt": "2026-03-20T11:00:00Z", - "timezone": "Asia/Shanghai", - "mode": "single", - "fireTimeBucket": 1774060800000 -} -``` - -## 数据流 - -### 通知发送流程(不变) - -``` -CalendarService.upsertEventReminder() - → LocalNotificationService.upsertEventReminder() - → flutter_local_notifications.zonedSchedule() -``` - -### 通知点击处理流程 - -``` -用户点击通知 - ├─ App 运行中 → onDidReceiveNotificationResponse(payload) - └─ App 未运行 - ├─ iOS → 原生写入 UserDefaults → Flutter 启动 → 读取 → 打开 overlay - └─ Android → full-screen intent → Flutter 收到 payload → 打开 overlay - -ReminderOverlay 打开 - ├─ 用户点击"完成" → archiveEvent() → 关闭 → 检查队列 → 有下一条则打开下一条 - └─ 用户点击"稍后提醒" → cancelNotification() + scheduleReminderAt() → 关闭 → 检查队列 → 有下一条则打开下一条 -``` - -## 错误处理 - -| 场景 | 处理方式 | -|------|----------| -| 归档请求失败 | 显示 toast 提示用户,操作已完成(下次打开 app 时同步) | -| 延后通知注册失败 | 显示 toast 提示用户,当前提醒已取消 | -| 同分钟多条处理时其中一条失败 | 跳过该条,处理下一条 | - -## 文件变更清单 - -### 删除 - -- `lib/features/calendar/reminders/reminder_cold_start_queue.dart` -- `lib/features/calendar/reminders/reminder_outbox_store.dart` -- `lib/features/calendar/reminders/reminder_action_dedupe_store.dart` -- `lib/features/calendar/reminders/reminder_overlap_policy.dart` -- `lib/features/calendar/reminders/ui/reminder_foreground_presenter.dart` -- `lib/features/calendar/reminders/ui/reminder_presentation_coordinator.dart` -- `lib/features/calendar/reminders/ui/widgets/reminder_action_sheet.dart` -- 相关测试文件 - -### 新增 - -- `lib/features/calendar/reminders/ui/reminder_overlay.dart` -- `lib/features/calendar/reminders/reminder_queue_manager.dart` -- `lib/core/notifications/ios_notification_payload_bridge.dart` - -### 修改 - -- `lib/core/notifications/local_notification_service.dart`(移除权限降级逻辑) -- `lib/main.dart`(集成 IOSNotificationPayloadBridge) -- 相关测试文件 - -## 测试策略 - -### 单元测试 -- ReminderQueueManager: 队列排序、下一条调度 -- IOSNotificationPayloadBridge: payload 读写 - -### 集成测试 -- 通知点击 → overlay 打开 → 操作 → 关闭 -- 同分钟多条通知顺序处理 - -### 手动测试 -- iOS 冷启动点击通知 -- Android 锁屏点击 full-screen intent 通知 -- 稍后提醒 5 分钟/15 分钟验证 diff --git a/docs/plans/2026-03-20-reminder-overlay-implementation-plan.md b/docs/plans/2026-03-20-reminder-overlay-implementation-plan.md deleted file mode 100644 index 10a3d21..0000000 --- a/docs/plans/2026-03-20-reminder-overlay-implementation-plan.md +++ /dev/null @@ -1,760 +0,0 @@ -# Reminder Overlay Implementation Plan - -> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. - -**Goal:** 简化日历提醒机制,用 ReminderOverlay 统一处理所有用户交互,移除前台/后台判断逻辑。 - -**Architecture:** 每条通知独立 payload,点击通知后打开 ReminderOverlay 处理用户操作(完成/稍后提醒)。同分钟多条通知按时间排序依次处理。操作完成后 app 退到后台。iOS 冷启动通过 UserDefaults 传递 payload,Android 使用 full-screen intent。 - -**Tech Stack:** Flutter, flutter_local_notifications, Provider/Bloc (状态管理) - ---- - -## Task 1: 创建 ReminderQueueManager - -**Files:** -- Create: `apps/lib/features/calendar/reminders/reminder_queue_manager.dart` -- Test: `apps/test/features/calendar/reminders/reminder_queue_manager_test.dart` - -**Step 1: 编写测试** - -```dart -import 'package:flutter_test/flutter_test.dart'; -import 'package:social_app/features/calendar/reminders/reminder_queue_manager.dart'; -import 'package:social_app/features/calendar/reminders/models/reminder_payload.dart'; - -void main() { - group('ReminderQueueManager', () { - test('按点击顺序处理,第一条处理完后处理剩余的按时间排序', () { - final manager = ReminderQueueManager(); - - final event1 = ReminderPayload(eventId: '1', title: 'Event 1', startAt: DateTime(2026, 3, 20, 10, 1), mode: ReminderPayloadMode.single); - final event2 = ReminderPayload(eventId: '2', title: 'Event 2', startAt: DateTime(2026, 3, 20, 10, 2), mode: ReminderPayloadMode.single); - final event3 = ReminderPayload(eventId: '3', title: 'Event 3', startAt: DateTime(2026, 3, 20, 10, 3), mode: ReminderPayloadMode.single); - - // 用户点击 event2 - manager.enqueueFromClick(event2); - // 剩余 event1 和 event3 按时间排序: event1 -> event3 - manager.enqueuePending([event1, event3]); - - expect(manager.currentPayload?.eventId, '2'); - manager.dequeueCurrent(); - expect(manager.currentPayload?.eventId, '1'); - manager.dequeueCurrent(); - expect(manager.currentPayload?.eventId, '3'); - manager.dequeueCurrent(); - expect(manager.isEmpty, true); - }); - - test('单条通知处理完直接清空', () { - final manager = ReminderQueueManager(); - final event = ReminderPayload(eventId: '1', title: 'Event 1', startAt: DateTime.now(), mode: ReminderPayloadMode.single); - - manager.enqueueFromClick(event); - expect(manager.isEmpty, false); - manager.dequeueCurrent(); - expect(manager.isEmpty, true); - }); - }); -} -``` - -**Step 2: 运行测试验证失败** - -Run: `cd apps && flutter test test/features/calendar/reminders/reminder_queue_manager_test.dart` -Expected: FAIL - ReminderQueueManager not defined - -**Step 3: 编写最小实现** - -```dart -import 'reminder_payload.dart'; - -class ReminderQueueManager { - ReminderPayload? _currentPayload; - final List _pending = []; - - void enqueueFromClick(ReminderPayload payload) { - _currentPayload = payload; - } - - void enqueuePending(List payloads) { - payloads.sort((a, b) => a.startAt.compareTo(b.startAt)); - _pending.addAll(payloads); - } - - ReminderPayload? get currentPayload => _currentPayload; - - bool get isEmpty => _currentPayload == null && _pending.isEmpty; - - void dequeueCurrent() { - _currentPayload = null; - if (_pending.isNotEmpty) { - _currentPayload = _pending.removeAt(0); - } - } - - void clear() { - _currentPayload = null; - _pending.clear(); - } -} -``` - -**Step 4: 运行测试验证通过** - -Run: `cd apps && flutter test test/features/calendar/reminders/reminder_queue_manager_test.dart` -Expected: PASS - -**Step 5: 提交** - -```bash -git add apps/lib/features/calendar/reminders/reminder_queue_manager.dart apps/test/features/calendar/reminders/reminder_queue_manager_test.dart -git commit -m "feat(calendar): add ReminderQueueManager for handling multiple notifications" -``` - ---- - -## Task 2: 创建 IOSNotificationPayloadBridge - -**Files:** -- Create: `apps/lib/core/notifications/ios_notification_payload_bridge.dart` -- Test: `apps/test/core/notifications/ios_notification_payload_bridge_test.dart` - -**Step 1: 编写测试** - -```dart -import 'package:flutter_test/flutter_test.dart'; -import 'package:shared_preferences/shared_preferences.dart'; -import 'package:social_app/core/notifications/ios_notification_payload_bridge.dart'; -import 'dart:convert'; - -void main() { - group('IOSNotificationPayloadBridge', () { - test('启动时读取待处理的 notification payload', () async { - SharedPreferences.setMockInitialValues({ - 'pending_notification_payload': jsonEncode({ - 'eventId': 'evt_123', - 'title': 'Test Event', - 'startAt': '2026-03-20T10:00:00Z', - 'mode': 'single', - }), - }); - - final prefs = await SharedPreferences.getInstance(); - final bridge = IOSNotificationPayloadBridge(prefs); - final payload = await bridge.getPendingPayload(); - - expect(payload?.eventId, 'evt_123'); - expect(payload?.title, 'Test Event'); - }); - - test('处理完成后清理 UserDefaults', () async { - SharedPreferences.setMockInitialValues({ - 'pending_notification_payload': jsonEncode({ - 'eventId': 'evt_123', - 'title': 'Test Event', - 'startAt': '2026-03-20T10:00:00Z', - 'mode': 'single', - }), - }); - - final prefs = await SharedPreferences.getInstance(); - final bridge = IOSNotificationPayloadBridge(prefs); - await bridge.clearPendingPayload(); - - final remaining = prefs.getString('pending_notification_payload'); - expect(remaining, isNull); - }); - }); -} -``` - -**Step 2: 运行测试验证失败** - -Run: `cd apps && flutter test test/core/notifications/ios_notification_payload_bridge_test.dart` -Expected: FAIL - IOSNotificationPayloadBridge not defined - -**Step 3: 编写最小实现** - -```dart -import 'dart:convert'; -import 'package:shared_preferences/shared_preferences.dart'; -import '../../features/calendar/reminders/models/reminder_payload.dart'; - -class IOSNotificationPayloadBridge { - static const String _key = 'pending_notification_payload'; - final SharedPreferences _prefs; - - IOSNotificationPayloadBridge(this._prefs); - - Future getPendingPayload() async { - final raw = _prefs.getString(_key); - if (raw == null || raw.isEmpty) { - return null; - } - try { - final json = Map.from(jsonDecode(raw) as Map); - return ReminderPayload.fromJson(json); - } catch (_) { - return null; - } - } - - Future clearPendingPayload() async { - await _prefs.remove(_key); - } -} -``` - -**Step 4: 运行测试验证通过** - -Run: `cd apps && flutter test test/core/notifications/ios_notification_payload_bridge_test.dart` -Expected: PASS - -**Step 5: 提交** - -```bash -git add apps/lib/core/notifications/ios_notification_payload_bridge.dart apps/test/core/notifications/ios_notification_payload_bridge_test.dart -git commit -m "feat(notifications): add IOSNotificationPayloadBridge for cold start handling" -``` - ---- - -## Task 3: 创建 ReminderOverlay UI 组件 - -**Files:** -- Create: `apps/lib/features/calendar/reminders/ui/reminder_overlay.dart` -- Test: `apps/test/features/calendar/reminders/reminder_overlay_test.dart` - -**Step 1: 编写测试** - -```dart -import 'package:flutter/material.dart'; -import 'package:flutter_test/flutter_test.dart'; -import 'package:shared_preferences/shared_preferences.dart'; -import 'package:social_app/features/calendar/reminders/ui/reminder_overlay.dart'; -import 'package:social_app/features/calendar/reminders/reminder_queue_manager.dart'; -import 'package:social_app/core/notifications/local_notification_service.dart'; -import 'package:social_app/core/notifications/ios_notification_payload_bridge.dart'; -import 'package:social_app/features/calendar/data/services/calendar_service.dart'; -import 'package:social_app/features/calendar/reminders/reminder_queue_manager.dart'; -import 'package:social_app/features/calendar/reminders/models/reminder_payload.dart'; - -void main() { - group('ReminderOverlay', () { - late ReminderQueueManager queueManager; - - setUp(() { - SharedPreferences.setMockInitialValues({}); - queueManager = ReminderQueueManager(); - }); - - testWidgets('显示日程标题和当前时间', (tester) async { - final payload = ReminderPayload( - eventId: '1', - title: 'Test Meeting', - startAt: DateTime(2026, 3, 20, 10, 0), - mode: ReminderPayloadMode.single, - ); - queueManager.enqueueFromClick(payload); - - await tester.pumpWidget( - MaterialApp( - home: ReminderOverlay( - queueManager: queueManager, - onComplete: () {}, - onSnooze: (minutes) {}, - onArchive: () {}, - ), - ), - ); - - expect(find.text('Test Meeting'), findsOneWidget); - // 当前时间显示在界面上 - expect(find.textContaining(':'), findsWidgets); - }); - - testWidgets('点击完成按钮触发归档', (tester) async { - bool archiveCalled = false; - final payload = ReminderPayload( - eventId: '1', - title: 'Test Meeting', - startAt: DateTime(2026, 3, 20, 10, 0), - mode: ReminderPayloadMode.single, - ); - queueManager.enqueueFromClick(payload); - - await tester.pumpWidget( - MaterialApp( - home: ReminderOverlay( - queueManager: queueManager, - onComplete: () {}, - onSnooze: (minutes) {}, - onArchive: () => archiveCalled = true, - ), - ), - ); - - await tester.tap(find.text('完成')); - await tester.pump(); - - expect(archiveCalled, true); - }); - - testWidgets('点击稍后提醒显示下拉选项', (tester) async { - final payload = ReminderPayload( - eventId: '1', - title: 'Test Meeting', - startAt: DateTime(2026, 3, 20, 10, 0), - mode: ReminderPayloadMode.single, - ); - queueManager.enqueueFromClick(payload); - - await tester.pumpWidget( - MaterialApp( - home: ReminderOverlay( - queueManager: queueManager, - onComplete: () {}, - onSnooze: (minutes) {}, - onArchive: () {}, - ), - ), - ); - - await tester.tap(find.text('稍后提醒')); - await tester.pumpAndSettle(); - - expect(find.text('5 分钟'), findsOneWidget); - expect(find.text('15 分钟'), findsOneWidget); - }); - }); -} -``` - -**Step 2: 运行测试验证失败** - -Run: `cd apps && flutter test test/features/calendar/reminders/reminder_overlay_test.dart` -Expected: FAIL - ReminderOverlay not defined - -**Step 3: 编写最小实现** - -```dart -import 'package:flutter/material.dart'; -import 'package:flutter/services.dart'; -import 'package:intl/intl.dart'; -import '../../../../core/theme/design_tokens.dart'; -import '../../../../shared/widgets/app_button.dart'; -import '../../reminders/reminder_queue_manager.dart'; -import '../../reminders/models/reminder_payload.dart'; - -class ReminderOverlay extends StatefulWidget { - const ReminderOverlay({ - super.key, - required this.queueManager, - required this.onComplete, - required this.onSnooze, - required this.onArchive, - }); - - final ReminderQueueManager queueManager; - final VoidCallback onComplete; - final void Function(int minutes) onSnooze; - final VoidCallback onArchive; - - @override - State createState() => _ReminderOverlayState(); -} - -class _ReminderOverlayState extends State { - bool _showSnoozeOptions = false; - final LayerLink _layerLink = LayerLink(); - OverlayEntry? _overlayEntry; - - ReminderPayload? get _currentPayload => widget.queueManager.currentPayload; - - @override - void dispose() { - _hideSnoozeOptions(); - super.dispose(); - } - - void _hideSnoozeOptions() { - _overlayEntry?.remove(); - _overlayEntry = null; - setState(() { - _showSnoozeOptions = false; - }); - } - - void _showSnoozeDropdown() { - _hideSnoozeOptions(); - - _overlayEntry = OverlayEntry( - builder: (context) => Positioned( - width: 120, - child: Material( - elevation: 4, - borderRadius: BorderRadius.circular(AppRadius.md), - child: Container( - decoration: BoxDecoration( - color: AppColors.white, - borderRadius: BorderRadius.circular(AppRadius.md), - border: Border.all(color: AppColors.borderSecondary), - ), - child: Column( - mainAxisSize: MainAxisSize.min, - children: [ - _SnoozeOption( - label: '5 分钟', - onTap: () { - _hideSnoozeOptions(); - widget.onSnooze(5); - }, - ), - Divider(height: 1, color: AppColors.borderSecondary), - _SnoozeOption( - label: '15 分钟', - onTap: () { - _hideSnoozeOptions(); - widget.onSnooze(15); - }, - ), - ], - ), - ), - ), - ), - ); - - Overlay.of(context).insert(_overlayEntry!); - setState(() { - _showSnoozeOptions = true; - }); - } - - void _handleComplete() { - widget.onArchive(); - widget.queueManager.dequeueCurrent(); - if (!widget.queueManager.isEmpty) { - // 下一条会通过外部状态管理打开新的 overlay - } - widget.onComplete(); - } - - void _handleSnooze(int minutes) { - widget.onSnooze(minutes); - widget.queueManager.dequeueCurrent(); - if (!widget.queueManager.isEmpty) { - // 下一条会通过外部状态管理打开新的 overlay - } - widget.onComplete(); - } - - @override - Widget build(BuildContext context) { - final payload = _currentPayload; - if (payload == null) { - return const SizedBox.shrink(); - } - - return Container( - color: AppColors.white, - child: SafeArea( - child: Padding( - padding: const EdgeInsets.all(AppSpacing.lg), - child: Column( - mainAxisAlignment: MainAxisAlignment.center, - crossAxisAlignment: CrossAxisAlignment.stretch, - children: [ - Text( - payload.title, - style: Theme.of(context).textTheme.headlineSmall?.copyWith( - color: AppColors.slate900, - fontWeight: FontWeight.w600, - ), - textAlign: TextAlign.center, - ), - const SizedBox(height: AppSpacing.sm), - Text( - DateFormat('HH:mm').format(DateTime.now()), - style: Theme.of(context).textTheme.titleLarge?.copyWith( - color: AppColors.slate500, - ), - textAlign: TextAlign.center, - ), - const SizedBox(height: AppSpacing.xl), - Row( - children: [ - Expanded( - child: CompositedTransformTarget( - link: _layerLink, - child: AppButton( - text: '稍后提醒', - isOutlined: true, - onPressed: _showSnoozeDropdown, - ), - ), - ), - const SizedBox(width: AppSpacing.md), - Expanded( - child: AppButton( - text: '完成', - onPressed: _handleComplete, - ), - ), - ], - ), - ], - ), - ), - ), - ); - } -} - -class _SnoozeOption extends StatelessWidget { - const _SnoozeOption({ - required this.label, - required this.onTap, - }); - - final String label; - final VoidCallback onTap; - - @override - Widget build(BuildContext context) { - return InkWell( - onTap: onTap, - child: Padding( - padding: const EdgeInsets.symmetric( - horizontal: AppSpacing.md, - vertical: AppSpacing.sm, - ), - child: Text( - label, - style: Theme.of(context).textTheme.bodyMedium?.copyWith( - color: AppColors.slate900, - ), - ), - ), - ); - } -} -``` - -**Step 4: 运行测试验证通过** - -Run: `cd apps && flutter test test/features/calendar/reminders/reminder_overlay_test.dart` -Expected: PASS - -**Step 5: 提交** - -```bash -git add apps/lib/features/calendar/reminders/ui/reminder_overlay.dart apps/test/features/calendar/reminders/reminder_overlay_test.dart -git commit -m "feat(calendar): add ReminderOverlay UI component" -``` - ---- - -## Task 4: 修改 LocalNotificationService - 移除权限降级逻辑 - -**Files:** -- Modify: `apps/lib/core/notifications/local_notification_service.dart` - -**Step 1: 阅读现有代码确认移除范围** - -Read: `apps/lib/core/notifications/local_notification_service.dart` - -**Step 2: 移除以下逻辑** - -1. 移除 `_canDeliverSystemNotification` 相关判断 -2. 移除 `_scheduleInAppFallbackRemindersFrom` 方法 -3. 移除 `_scheduleInAppFallbackPayload` 方法 -4. 移除 `_inAppFallbackTimersByEventId` 及相关方法 -5. 移除 `_trackFallback` 方法 -6. 移除 `rebuildUpcomingReminders` 中的降级分支 - -**Step 3: 验证 flutter analyze 通过** - -Run: `cd apps && flutter analyze lib/core/notifications/local_notification_service.dart` -Expected: 无错误 - -**Step 4: 提交** - -```bash -git add apps/lib/core/notifications/local_notification_service.dart -git commit -m "refactor(notifications): remove permission fallback logic" -``` - ---- - -## Task 5: 修改通知发送逻辑 - 使用原生分组 - -**Files:** -- Modify: `apps/lib/core/notifications/local_notification_service.dart` - -**Step 1: 添加 threadIdentifier/groupKey 到通知详情** - -在 `_buildNotificationDetails` 方法中: -- iOS: 添加 `threadIdentifier: _getThreadIdentifier(fireAt)` -- Android: 添加 `groupKey: _getGroupKey(fireAt)` - -**Step 2: 实现分组 key 方法** - -```dart -String _getThreadIdentifier(DateTime fireAt) { - final bucket = fireAt.millisecondsSinceEpoch ~/ const Duration(minutes: 1).inMilliseconds; - return 'calendar_reminder_$bucket'; -} - -String _getGroupKey(DateTime fireAt) { - final bucket = fireAt.millisecondsSinceEpoch ~/ const Duration(minutes: 1).inMilliseconds; - return 'com.socialapp.calendar.$bucket'; -} -``` - -**Step 3: 验证 flutter analyze 通过** - -Run: `cd apps && flutter analyze lib/core/notifications/local_notification_service.dart` -Expected: 无错误 - -**Step 4: 提交** - -```bash -git add apps/lib/core/notifications/local_notification_service.dart -git commit -m "feat(notifications): add native notification grouping by time bucket" -``` - ---- - -## Task 6: 修改 main.dart - 集成 iOS payload bridge - -**Files:** -- Modify: `apps/lib/main.dart` - -**Step 1: 在 main() 中添加启动时检查** - -在 `runApp` 之前或 app 初始化时: -1. 创建 IOSNotificationPayloadBridge 实例 -2. 调用 `getPendingPayload()` -3. 如果有 payload,通过 ReminderQueueManager 处理 - -**Step 2: 验证 flutter analyze 通过** - -Run: `cd apps && flutter analyze lib/main.dart` -Expected: 无错误 - -**Step 3: 提交** - -```bash -git add apps/lib/main.dart -git commit -m "feat(ios): integrate IOSNotificationPayloadBridge for cold start handling" -``` - ---- - -## Task 7: 删除废弃组件 - -**Files:** -- Delete: `apps/lib/features/calendar/reminders/reminder_cold_start_queue.dart` -- Delete: `apps/lib/features/calendar/reminders/reminder_outbox_store.dart` -- Delete: `apps/lib/features/calendar/reminders/reminder_action_dedupe_store.dart` -- Delete: `apps/lib/features/calendar/reminders/reminder_overlap_policy.dart` -- Delete: `apps/lib/features/calendar/reminders/ui/reminder_foreground_presenter.dart` -- Delete: `apps/lib/features/calendar/reminders/ui/reminder_presentation_coordinator.dart` -- Delete: `apps/lib/features/calendar/reminders/ui/widgets/reminder_action_sheet.dart` - -**Step 1: 删除文件** - -```bash -git rm apps/lib/features/calendar/reminders/reminder_cold_start_queue.dart -git rm apps/lib/features/calendar/reminders/reminder_outbox_store.dart -git rm apps/lib/features/calendar/reminders/reminder_action_dedupe_store.dart -git rm apps/lib/features/calendar/reminders/reminder_overlap_policy.dart -git rm apps/lib/features/calendar/reminders/ui/reminder_foreground_presenter.dart -git rm apps/lib/features/calendar/reminders/ui/reminder_presentation_coordinator.dart -git rm apps/lib/features/calendar/reminders/ui/widgets/reminder_action_sheet.dart -``` - -**Step 2: 删除测试文件** - -```bash -git rm apps/test/features/calendar/reminders/reminder_cold_start_queue_test.dart -git rm apps/test/features/calendar/reminders/reminder_outbox_store_test.dart -git rm apps/test/features/calendar/reminders/reminder_action_dedupe_store_test.dart -git rm apps/test/features/calendar/reminders/reminder_overlap_policy_test.dart -git rm apps/test/features/calendar/reminders/reminder_foreground_presenter_test.dart -git rm apps/test/features/calendar/reminders/reminder_presentation_coordinator_test.dart -git rm apps/test/features/calendar/reminders/reminder_action_sheet_test.dart -``` - -**Step 3: 运行 flutter analyze 验证无引用错误** - -Run: `cd apps && flutter analyze` -Expected: 无错误(可能有 deprecated warnings 可以忽略) - -**Step 4: 提交** - -```bash -git commit -m "refactor(calendar): remove deprecated reminder components" -``` - ---- - -## Task 8: iOS 原生层配置 (AppDelegate.swift) - -**Files:** -- Modify: `apps/ios/Runner/AppDelegate.swift` - -**Step 1: 添加 UserDefaults 写入逻辑** - -在 `userNotificationCenter(_, didReceive, withCompletionHandler)` 中: -1. 获取 notification 的 `userInfo` -2. 将 payload 写入 `UserDefaults.standard` -3. Key: `pending_notification_payload` - -**Step 2: 验证 Xcode build** - -Run: `cd apps && flutter build ios --simulator --no-codesign 2>&1 | head -50` -Expected: Build 成功 - -**Step 3: 提交** - -```bash -git add apps/ios/Runner/AppDelegate.swift -git commit -m "feat(ios): write notification payload to UserDefaults on cold start" -``` - ---- - -## 验证清单 - -完成所有任务后,运行以下验证: - -```bash -# 1. Flutter analyze -cd apps && flutter analyze - -# 2. 测试 -cd apps && flutter test test/features/calendar/reminders/ - -# 3. iOS build -cd apps && flutter build ios --simulator --no-codesign - -# 4. Android build -cd apps && flutter build apk --debug -``` - ---- - -## Plan Complete - -**Plan saved to:** `docs/plans/2026-03-20-reminder-overlay-implementation-plan.md` - -**Two execution options:** - -**1. Subagent-Driven (this session)** - I dispatch fresh subagent per task, review between tasks, fast iteration - -**2. Parallel Session (separate)** - Open new session with executing-plans, batch execution with checkpoints - -**Which approach?** diff --git a/docs/plans/visibility-mask-restructure.md b/docs/plans/visibility-mask-restructure.md new file mode 100644 index 0000000..b6d31d8 --- /dev/null +++ b/docs/plans/visibility-mask-restructure.md @@ -0,0 +1,151 @@ +# 可见性掩码重构方案 + +> 日期:2026-03-22 +> 状态:待执行 + +## 背景 + +现有可见性系统存在以下问题: +- `UI_REALTIME` 定义但从未使用 +- `visibility_consumer_bit` 语义模糊,用于 context 过滤但无法正确区分 chat/automation +- stage bits (16/17/18) 在 chat/automation 永不共享 thread 的设计下无意义 +- 无法正确实现:automation user_message 不进 /history、不进 context,automation agent_reply 进 /history 但不进 context + +## 设计目标 + +| runtime_mode | 消息 | /history 可见 | context_messages 组装 | +|-------------|------|:-------------:|:-------------------:| +| chat | user_message | ✅ | ✅ | +| chat | agent_reply | ✅ | ✅ | +| automation | user_message | ❌ | ❌ | +| automation | agent_reply | ✅ | ❌ | + +## 前提条件 + +- chat 和 automation **永不共享 thread_id**(已确认的设计约束) +- memory == automation,无需单独处理 + +--- + +## Bit 定义 + +``` +BIT 0 → UI_HISTORY → /history API 可见 +BIT 1 → CONTEXT_ASSEMBLY → 组装进 context_messages +``` + +> `UI_REALTIME` 废弃,删除。 +> `visibility_consumer_bit` 废弃,删除。 +> Stage bits (16/17/18) 废弃,删除。 + +--- + +## 消息 Mask 矩阵 + +| 消息 | runtime_mode | UI_HISTORY | CONTEXT_ASSEMBLY | Mask | +|------|-------------|:----------:|:---------------:|:----:| +| user_message | chat | 1 | 1 | **3** | +| user_message | automation | 0 | 0 | **0** | +| agent_reply | chat | 1 | 1 | **3** | +| agent_reply | automation | 1 | 0 | **1** | + +--- + +## 查询设计 + +| 查询 | Mask | 匹配规则 | +|------|------|---------| +| `/history` | `UI_HISTORY = 1` | `(message_mask & 1) != 0` | +| `context_messages` | `CONTEXT_ASSEMBLY = 2` | `(message_mask & 2) != 0` | + +--- + +## 查询结果验证 + +| 消息 | Mask | `/history & 1` | `/history` | `context & 2` | `context` | +|------|------|:-------------:|:----------:|:-------------:|:---------:| +| chat user_message | 3 | 1 ✅ | ✅ | 1 ✅ | ✅ | +| chat agent_reply | 3 | 1 ✅ | ✅ | 1 ✅ | ✅ | +| automation user_message | 0 | 0 ❌ | ❌ | 0 ❌ | ❌ | +| automation agent_reply | 1 | 1 ✅ | ✅ | 0 ❌ | ❌ | + +--- + +## 变更清单 + +### 1. `schemas/agent/visibility.py` + +- [ ] 删除 `UI_REALTIME = 1` 从 `SystemVisibilityBit` +- [ ] 删除 `VisibilityBitRef` 类 +- [ ] 保留 `bit_mask()` 函数 +- [ ] 保留 `VisibilityMask` 类(其他模块可能使用) + +### 2. `schemas/agent/system_agent.py` + +- [ ] 删除 `SystemAgentLLMConfig.visibility_consumer_bit` 字段 + +### 3. `core/config/static/database/system_agents.yaml` + +- [ ] 删除 `router.visibility_consumer_bit` +- [ ] 删除 `worker.visibility_consumer_bit` + +### 4. `v1/agent/service.py` + +- [ ] 重写 `_resolve_user_message_visibility_mask`: + ```python + async def _resolve_user_message_visibility_mask( + self, *, runtime_mode: RuntimeMode + ) -> int: + if runtime_mode == RuntimeMode.CHAT: + return UI_HISTORY | CONTEXT_ASSEMBLY # = 3 + return 0 # automation user_message + ``` + +### 5. `core/agentscope/events/store.py` + +- [ ] 重写 `_resolve_stage_visibility_mask`: + - chat stage (router/worker) → `UI_HISTORY | CONTEXT_ASSEMBLY` = 3 + - automation stage (memory) → `UI_HISTORY` = 1 +- [ ] 删除 `_load_stage_visibility_bit_map` 中对 `visibility_consumer_bit` 的依赖 +- [ ] 删除 `system_agents.yaml` 配置加载逻辑 + +### 6. `core/agentscope/runtime/context_service.py` + +- [ ] `load_context_messages` 查询 mask 改为 `CONTEXT_ASSEMBLY = 2` + ```python + visibility_mask = bit_mask(bit=int(SystemVisibilityBit.CONTEXT_ASSEMBLY)) + ``` + +### 7. `core/agentscope/runtime/tasks.py` + +- [ ] 删除 `_build_recent_context_messages` 中 memory job 的特殊处理 +- [ ] memory mode 改用 `runtime_mode=automation` 语义 + +### 8. `core/agentscope/runtime/runner.py` + +- [ ] 删除硬编码 `visibility_consumer_bit=18` 的 `SystemAgentLLMConfig` +- [ ] memory agent 配置改用 automation 语义 + +### 9. 清理迁移 + +- [ ] 更新 `schemas/agent/__init__.py` 导出(删除 `visibility_consumer_bit` 相关) +- [ ] 更新所有引用 `visibility_consumer_bit` 的文件 +- [ ] 运行测试验证 /history 和 context 组装行为 + +--- + +## 实施顺序 + +1. 新增 `CONTEXT_ASSEMBLY = 1` bit,更新 `service.py` +2. 更新 `events/store.py` 可见性逻辑 +3. 更新 `context_service.py` 查询 mask +4. 清理废弃配置和字段 +5. 运行测试验证 + +--- + +## 风险 + +- `VisibilityBitRef` 可能在其他未知位置使用(需全面搜索) +- `visibility_consumer_bit` 被 `runner.py` 硬编码,修改可能影响 memory pipeline +- 测试覆盖不足可能导致 regression diff --git a/docs/protocols/agent/api-endpoints.md b/docs/protocols/agent/api-endpoints.md index d8fb7bb..65b1783 100644 --- a/docs/protocols/agent/api-endpoints.md +++ b/docs/protocols/agent/api-endpoints.md @@ -27,8 +27,7 @@ Base URL: `/api/v1/agent` - Body: `RunAgentInput` - 详细结构见 `docs/protocols/agent/run-agent-input.md` -- `forwardedProps.agent_type` 必填,由调用方透传,task 不做默认赋值 -- `agent_type=memory` 仅用于自动化调度内部触发,API 入口返回 422 +- `forwardedProps.runtime_mode` 必填,值为 `"chat"` 或 `"automation"` ### Response @@ -83,9 +82,9 @@ Base URL: `/api/v1/agent` 当前阶段执行说明: -- `worker` 模式采用两阶段:`router` -> `worker`。 -- `memory` 模式保持单阶段:`memory`。 -- 因此阶段事件可能出现 `router` / `worker` / `memory`。 +- `chat` 模式采用两阶段:`router` -> `worker`。 +- `automation` 模式由后端业务逻辑决定具体 Agent 类型。 +- 因此阶段事件可能出现 `router` / `worker`。 ### 错误码 diff --git a/docs/protocols/agent/run-agent-input.md b/docs/protocols/agent/run-agent-input.md index 45af3b0..c006ed0 100644 --- a/docs/protocols/agent/run-agent-input.md +++ b/docs/protocols/agent/run-agent-input.md @@ -185,13 +185,13 @@ interface Context { --- -## forwardedProps Schema(支持 agent_type + client_time) +## forwardedProps Schema(支持 runtime_mode + client_time) `RunAgentInput.forwardedProps` 支持透传运行模式与客户端时间上下文。日历相关能力必须使用以下结构: ```typescript interface ForwardedProps { - agent_type: string; // 必填,运行模式(如 "worker" / "memory") + runtime_mode: "chat" | "automation"; // 必填,运行模式 client_time?: { device_timezone: string; // IANA 时区,例如 "America/Los_Angeles" client_now_iso: string; // RFC3339 带偏移时间,例如 "2026-03-16T09:12:33-07:00" @@ -200,6 +200,13 @@ interface ForwardedProps { } ``` +### 运行模式说明 + +| runtime_mode | 说明 | 后端 Pipeline | +|--------------|------|---------------| +| `chat` | 标准对话模式 | `router` -> `worker` | +| `automation` | 自动化任务模式 | 由后端业务逻辑决定具体 Agent 类型 | + ### 时间来源优先级(固定) 后端在运行时按以下顺序解析事件时区: @@ -214,9 +221,8 @@ interface ForwardedProps { - `device_timezone` 必须是有效 IANA 时区。 - `client_now_iso` 必须是 RFC3339 且包含时区偏移。 - `client_epoch_ms` 必须是整数毫秒时间戳。 -- `forwardedProps.agent_type` 必填,且必须匹配后端已注册的 agent type。 -- `agent_type=memory` 为自动化任务内部模式,HTTP `/agent/runs` 入口不接受该值。 -- `forwardedProps` 仅允许 `agent_type` 与 `client_time`,额外字段会触发 `422 invalid RunAgentInput.forwardedProps`。 +- `forwardedProps.runtime_mode` 必填,值必须为 `"chat"` 或 `"automation"`。 +- `forwardedProps` 仅允许 `runtime_mode` 与 `client_time`,额外字段会触发 `422 invalid RunAgentInput.forwardedProps`。 - 业务代码不得使用服务器本地时区作为事件语义时区。 ### 说明 @@ -238,7 +244,7 @@ Backend 实现了以下验证规则: | runId 最大 128 字符 | `runId exceeds length limit` | | messages ≤ 200 条 | `RunAgentInput.messages exceeds limit` | | user text ≤ 10,000 字符 | `RunAgentInput user message text exceeds limit` | -| forwardedProps.agent_type 必填 | `invalid RunAgentInput.forwardedProps` | +| forwardedProps.runtime_mode 必填 | `invalid RunAgentInput.forwardedProps` | | **恰好 1 条 user message** | `RunAgentInput.messages must contain exactly one user message` | | user message 必须在第一条 | `RunAgentInput.messages[0].role must be user` | | binary 必须是 image/* | `binary content requires image mimeType` | @@ -277,7 +283,7 @@ Backend 实现了以下验证规则: "tools": [], "context": [], "forwardedProps": { - "agent_type": "worker" + "runtime_mode": "chat" } } ``` @@ -309,7 +315,7 @@ Backend 实现了以下验证规则: "tools": [], "context": [], "forwardedProps": { - "agent_type": "worker" + "runtime_mode": "chat" } } ``` @@ -346,7 +352,7 @@ Backend 实现了以下验证规则: ], "context": [], "forwardedProps": { - "agent_type": "worker" + "runtime_mode": "chat" } } ``` @@ -368,7 +374,7 @@ Backend 实现了以下验证规则: "tools": [], "context": [], "forwardedProps": { - "agent_type": "worker", + "runtime_mode": "chat", "client_time": { "device_timezone": "America/Los_Angeles", "client_now_iso": "2026-03-16T09:12:33-07:00", @@ -531,5 +537,5 @@ interface UiSchemaRenderer { - `tools` 是前端工具通道字段;当前后端运行时不基于该字段构造后端工具 prompt - `RunAgentInput` 同时接受 camelCase 与 snake_case 别名输入(推荐统一使用 camelCase) - 日历能力依赖 `forwardedProps.client_time` 透传设备时间上下文;缺失时回退用户 profile 时区 -- `forwardedProps.agent_type` 是受控路由字段,必须由调用方显式传入;后端 task 不做默认赋值 +- `forwardedProps.runtime_mode` 是受控路由字段,必须由调用方显式传入;后端 task 不做默认赋值 - tool 消息在存储层用于运行时上下文续接,不在 `/history` 对外返回