From 24eda6ff5138918450f4eac7d7e3fa0e776e19ba Mon Sep 17 00:00:00 2001 From: qzl Date: Wed, 1 Apr 2026 17:24:52 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E5=91=BD=E5=90=8D=20autom?= =?UTF-8?q?ation=20worker=20=E4=B8=BA=20general=20worker=20=E5=B9=B6?= =?UTF-8?q?=E5=AE=8C=E5=96=84=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 4 +- apps/lib/core/logging/log_entry.dart | 9 +- apps/lib/core/logging/log_service.dart | 1 + apps/lib/core/logging/logger.dart | 12 +- .../services/reminder_scheduler_service.dart | 1 + apps/lib/data/network/error_code_mapper.dart | 4 + .../core/agentscope/runtime/orchestrator.py | 18 + backend/src/core/agentscope/runtime/tasks.py | 6 +- backend/src/core/taskiq/__init__.py | 4 +- backend/src/core/taskiq/app.py | 5 +- backend/src/v1/agent/service.py | 2 +- backend/src/v1/analytics/__init__.py | 0 backend/tests/unit/core/taskiq/test_app.py | 4 +- deploy/.env.prod.example | 4 +- deploy/README.md | 4 +- deploy/docker-compose.prod.yml | 8 +- docs/plans/2026-04-01-analytics-design.md | 691 ++++++++++++++++++ docs/protocols/common/http-error-codes.md | 2 + infra/scripts/app.sh | 6 +- 19 files changed, 760 insertions(+), 25 deletions(-) create mode 100644 backend/src/v1/analytics/__init__.py create mode 100644 docs/plans/2026-04-01-analytics-design.md diff --git a/.env.example b/.env.example index 677d62e..2897a8e 100644 --- a/.env.example +++ b/.env.example @@ -30,9 +30,9 @@ SOCIAL_REDIS__DB=0 ############ # Worker 队列分组配置 # agent: 常规异步任务 -# automation: 批处理/重计算/可延迟任务 +# general: 通用任务(analytics 写入 + automation 批处理) SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY=2 -SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=2 +SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY=2 ############ # Automation 调度器配置 diff --git a/apps/lib/core/logging/log_entry.dart b/apps/lib/core/logging/log_entry.dart index 2f3478f..1f9e734 100644 --- a/apps/lib/core/logging/log_entry.dart +++ b/apps/lib/core/logging/log_entry.dart @@ -8,6 +8,7 @@ class LogEntry { final String? funcName; final int? lineNo; final String? errorType; + final String? errorMessage; final String? stackTrace; final Map? extra; @@ -19,6 +20,7 @@ class LogEntry { this.funcName, this.lineNo, this.errorType, + this.errorMessage, this.stackTrace, this.extra, }); @@ -31,6 +33,7 @@ class LogEntry { if (funcName != null) 'func_name': funcName, if (lineNo != null) 'line_no': lineNo, if (errorType != null) 'error_type': errorType, + if (errorMessage != null) 'error_message': errorMessage, if (stackTrace != null) 'stack_trace': stackTrace, if (extra != null && extra!.isNotEmpty) 'extra': extra, }; @@ -43,8 +46,9 @@ class LogEntry { ].join(''); final locationStr = location.isNotEmpty ? ' [$location]' : ''; final errorStr = errorType != null ? ' [$errorType]' : ''; + final errorMsgStr = errorMessage != null ? ' $errorMessage' : ''; final extraStr = extra != null && extra!.isNotEmpty ? ' $extra' : ''; - return '$ts ${level.name.toUpperCase().padRight(7)} [$module$locationStr]$errorStr $message$extraStr'; + return '$ts ${level.name.toUpperCase().padRight(7)} [$module$locationStr]$errorStr $message$errorMsgStr$extraStr'; } String toFileString() { @@ -59,6 +63,9 @@ class LogEntry { if (errorType != null) { sb.writeln(' Error: $errorType'); } + if (errorMessage != null) { + sb.writeln(' ErrorMessage: $errorMessage'); + } if (stackTrace != null) { sb.writeln(' StackTrace:'); sb.writeln(stackTrace); diff --git a/apps/lib/core/logging/log_service.dart b/apps/lib/core/logging/log_service.dart index 3e137cd..4566c50 100644 --- a/apps/lib/core/logging/log_service.dart +++ b/apps/lib/core/logging/log_service.dart @@ -154,6 +154,7 @@ class LogService { funcName: funcName, lineNo: lineNo, errorType: error.runtimeType.toString(), + errorMessage: error.toString(), stackTrace: stackTrace.toString(), extra: extra, ), diff --git a/apps/lib/core/logging/logger.dart b/apps/lib/core/logging/logger.dart index d85fd1b..4808e99 100644 --- a/apps/lib/core/logging/logger.dart +++ b/apps/lib/core/logging/logger.dart @@ -72,8 +72,18 @@ class Logger { required StackTrace stackTrace, Map? extra, }) { + final entry = LogEntry( + timestamp: DateTime.now(), + level: LogLevel.error, + message: message, + module: module, + errorType: error.runtimeType.toString(), + errorMessage: error.toString(), + stackTrace: stackTrace.toString(), + extra: extra, + ); if (_isNoOp) { - debugPrint('[$module] ERROR: $message, error: $error'); + debugPrint(entry.toConsoleString()); return; } _service!.error( diff --git a/apps/lib/core/notification/services/reminder_scheduler_service.dart b/apps/lib/core/notification/services/reminder_scheduler_service.dart index be8a774..b1400d8 100644 --- a/apps/lib/core/notification/services/reminder_scheduler_service.dart +++ b/apps/lib/core/notification/services/reminder_scheduler_service.dart @@ -110,6 +110,7 @@ class ReminderSchedulerService { >(); final androidGranted = await android?.requestNotificationsPermission(); + await android?.requestExactAlarmsPermission(); final iosGranted = await ios?.requestPermissions(alert: true, sound: true); final macosGranted = await macos?.requestPermissions( alert: true, diff --git a/apps/lib/data/network/error_code_mapper.dart b/apps/lib/data/network/error_code_mapper.dart index bbaaa37..2910fe2 100644 --- a/apps/lib/data/network/error_code_mapper.dart +++ b/apps/lib/data/network/error_code_mapper.dart @@ -57,6 +57,8 @@ String? mapErrorCodeToL10nKey( return 'errorNotFound'; case 'AGENT_USER_ID_INVALID': return 'errorGenericSafe'; + case 'AGENT_UPSTREAM_CONNECTION_ERROR': + return 'errorNetwork'; case 'INVALID_BINARY_URL_HOST': return 'errorAgentInvalidBinaryUrl'; case 'INVALID_BINARY_URL_BUCKET': @@ -242,6 +244,8 @@ String? resolveErrorCodeMessage( return L10n.current.errorGenericSafe; case 'errorReLogin': return L10n.current.errorReLogin; + case 'errorNetwork': + return L10n.current.errorNetwork; default: return null; } diff --git a/backend/src/core/agentscope/runtime/orchestrator.py b/backend/src/core/agentscope/runtime/orchestrator.py index fce6f0b..e4a1a8e 100644 --- a/backend/src/core/agentscope/runtime/orchestrator.py +++ b/backend/src/core/agentscope/runtime/orchestrator.py @@ -5,6 +5,7 @@ from typing import Any, Awaitable, Callable, Protocol from ag_ui.core.types import RunAgentInput from agentscope.message import Msg +from openai import APIConnectionError from core.agentscope.runtime.runner import AgentScopeRunner from core.logging import get_logger from schemas.domain.automation import RuntimeConfig @@ -106,6 +107,23 @@ class AgentScopeRuntimeOrchestrator: }, ) raise + except APIConnectionError: + logger.warning( + "agentscope upstream connection failed", + thread_id=thread_id, + run_id=run_id, + ) + await self._pipeline.emit( + session_id=thread_id, + event={ + "type": "RUN_ERROR", + "threadId": thread_id, + "runId": run_id, + "message": "network error", + "code": "AGENT_UPSTREAM_CONNECTION_ERROR", + }, + ) + raise except Exception: logger.exception( "agentscope runtime execution failed", diff --git a/backend/src/core/agentscope/runtime/tasks.py b/backend/src/core/agentscope/runtime/tasks.py index 4774990..bb4337f 100644 --- a/backend/src/core/agentscope/runtime/tasks.py +++ b/backend/src/core/agentscope/runtime/tasks.py @@ -26,7 +26,7 @@ from core.auth.models import CurrentUser from core.config.settings import config from core.db.session import AsyncSessionLocal from core.logging import get_logger -from core.taskiq.app import worker_agent_broker, worker_automation_broker +from core.taskiq.app import worker_agent_broker, worker_general_broker from schemas.agent.forwarded_props import ( RuntimeMode, parse_forwarded_props_runtime_mode, @@ -345,6 +345,6 @@ async def run_command_task_agent(command: dict[str, object]) -> dict[str, object return await run_agentscope_task(command) -@worker_automation_broker.task(task_name="tasks.agentscope.run_command.automation") -async def run_command_task_automation(command: dict[str, object]) -> dict[str, object]: +@worker_general_broker.task(task_name="tasks.agentscope.run_command.general") +async def run_command_task_general(command: dict[str, object]) -> dict[str, object]: return await run_agentscope_task(command) diff --git a/backend/src/core/taskiq/__init__.py b/backend/src/core/taskiq/__init__.py index 59b081b..f767be6 100644 --- a/backend/src/core/taskiq/__init__.py +++ b/backend/src/core/taskiq/__init__.py @@ -1,3 +1,3 @@ -from core.taskiq.app import broker, worker_agent_broker, worker_automation_broker +from core.taskiq.app import broker, worker_agent_broker, worker_general_broker -__all__ = ["broker", "worker_agent_broker", "worker_automation_broker"] +__all__ = ["broker", "worker_agent_broker", "worker_general_broker"] diff --git a/backend/src/core/taskiq/app.py b/backend/src/core/taskiq/app.py index d20cf0b..22ab470 100644 --- a/backend/src/core/taskiq/app.py +++ b/backend/src/core/taskiq/app.py @@ -12,6 +12,7 @@ log_service_banner( environment=config.runtime.environment, ) + def _build_broker(queue_name: str) -> ListQueueBroker: return ListQueueBroker( url=config.taskiq_broker_url, @@ -22,8 +23,8 @@ def _build_broker(queue_name: str) -> ListQueueBroker: worker_agent_broker = _build_broker("agent") -worker_automation_broker = _build_broker("automation") +worker_general_broker = _build_broker("general") broker = worker_agent_broker -__all__ = ["broker", "worker_agent_broker", "worker_automation_broker"] +__all__ = ["broker", "worker_agent_broker", "worker_general_broker"] diff --git a/backend/src/v1/agent/service.py b/backend/src/v1/agent/service.py index 21d46ea..f2259c7 100644 --- a/backend/src/v1/agent/service.py +++ b/backend/src/v1/agent/service.py @@ -142,7 +142,7 @@ class AgentService: metadata=user_message_metadata, ) - queue = "automation" if runtime_mode == RuntimeMode.AUTOMATION else "agent" + queue = "general" if runtime_mode == RuntimeMode.AUTOMATION else "agent" task_id = await self._queue.enqueue( command={ "command": "run", diff --git a/backend/src/v1/analytics/__init__.py b/backend/src/v1/analytics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/tests/unit/core/taskiq/test_app.py b/backend/tests/unit/core/taskiq/test_app.py index 159f4bc..153af93 100644 --- a/backend/tests/unit/core/taskiq/test_app.py +++ b/backend/tests/unit/core/taskiq/test_app.py @@ -5,13 +5,13 @@ import sys import pytest -from core.taskiq.app import broker, worker_agent_broker, worker_automation_broker +from core.taskiq.app import broker, worker_agent_broker, worker_general_broker def test_taskiq_broker_is_configured() -> None: assert broker is not None assert worker_agent_broker is broker - assert worker_automation_broker is not None + assert worker_general_broker is not None def test_taskiq_app_configures_logging_on_import( diff --git a/deploy/.env.prod.example b/deploy/.env.prod.example index b050b68..4be1b08 100644 --- a/deploy/.env.prod.example +++ b/deploy/.env.prod.example @@ -29,9 +29,9 @@ SOCIAL_REDIS__DB=0 # Worker 队列分组配置 ############ # agent: 常规异步任务 -# automation: 批处理/重计算/可延迟任务 +# general: 通用任务(analytics 写入 + automation 批处理) SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY=3 -SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=1 +SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY=1 ############ # Automation 调度器配置 diff --git a/deploy/README.md b/deploy/README.md index 39af5f8..a390ceb 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -2,7 +2,7 @@ 本目录是单机 `docker compose` 的生产交付包,架构为: -- 应用层:`web + worker-agent + worker-automation + scheduler + init-job` +- 应用层:`web + worker-agent + worker-general + scheduler + init-job` - 中间件:`redis` - 数据与认证:云 Supabase(通过环境变量访问) - 反向代理:由服务器侧 nginx 托管(不在本目录编排) @@ -81,7 +81,7 @@ cp deploy/.env.prod.example deploy/.env.prod ### 2) 启动常驻服务 ```bash -docker compose --env-file deploy/.env.prod -f deploy/docker-compose.prod.yml up -d redis web worker-agent worker-automation scheduler +docker compose --env-file deploy/.env.prod -f deploy/docker-compose.prod.yml up -d redis web worker-agent worker-general scheduler ``` ### 3) 执行一次性 bootstrap diff --git a/deploy/docker-compose.prod.yml b/deploy/docker-compose.prod.yml index 1a0db9b..94e0c07 100644 --- a/deploy/docker-compose.prod.yml +++ b/deploy/docker-compose.prod.yml @@ -80,21 +80,21 @@ services: - ../logs:/app/logs - ./static/releases:/app/deploy/static/releases:ro - worker-automation: + worker-general: image: ${SOCIAL_BACKEND_IMAGE:-social-app-backend:prod} - container_name: social-prod-worker-automation + container_name: social-prod-worker-general restart: unless-stopped env_file: - ./.env.prod environment: - PYTHONPATH=/app/backend/src - PYTHONDONTWRITEBYTECODE=1 - - SOCIAL_RUNTIME__SERVICE_NAME=worker-automation + - SOCIAL_RUNTIME__SERVICE_NAME=worker-general - SOCIAL_RUNTIME__ENVIRONMENT=${SOCIAL_RUNTIME__ENVIRONMENT:-prod} - SOCIAL_REDIS__HOST=redis - SOCIAL_REDIS__PORT=6379 command: > - sh -c '.venv/bin/taskiq worker core.taskiq.app:worker_automation_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY:-1}' + sh -c '.venv/bin/taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY:-1}' depends_on: redis: condition: service_healthy diff --git a/docs/plans/2026-04-01-analytics-design.md b/docs/plans/2026-04-01-analytics-design.md new file mode 100644 index 0000000..195394d --- /dev/null +++ b/docs/plans/2026-04-01-analytics-design.md @@ -0,0 +1,691 @@ +# Analytics 数据埋点与可视化方案 + +## 1. 概述 + +为 Flutter 移动应用实现数据埋点系统,采集用户行为数据,通过异步方式发送至后端存储,并提供可视化网站展示。 + +### 目标 + +- 监测每日用户登录时间 +- 记录 Agent 对话次数 +- 统计每个页面的点击次数和停留时长 +- 统一数据格式,参照 OpenTelemetry 简化版规范 +- 数据持久化到本地文件,供可视化网站读取 + +--- + +## 2. 系统架构 + +``` +┌─────────────────┐ 异步 POST ┌─────────────────┐ taskiq ┌─────────────────────────┐ +│ Flutter App │ ───────────────► │ FastAPI │ ──────────► │ backend/data/analytics/ │ +│ (埋点 SDK) │ /v1/analytics │ /v1/analytics │ async │ {YYYY-MM-DD}.jsonl │ +└─────────────────┘ └─────────────────┘ └─────────────────────────┘ + │ │ + │ │ taskiq worker (general queue) + ▼ ▼ + ┌─────────────────┐ + │ Redis Queue │ + └─────────────────┘ +``` + +--- + +## 3. 数据格式规范 + +### 3.1 统一事件信封结构 + +所有埋点事件使用统一顶层结构: + +```json +{ + "event_id": "01JQ2G5Q3N6Y5N8R7M4K2P1T9A", + "event_type": "session.login", + "timestamp": "2026-04-01T10:30:00.123Z", + + "user_id": "user_123", + "device_id": "install_a93f5d7c21", + "session_id": "sess_7c2d5e8f91", + + "platform": "android", + "app_version": "1.0.0", + "app_build": "100", + "env": "prod", + + "page_name": "login", + "trace_id": "trace_8d2f6c1a", + "request_id": null, + + "attributes": {}, + "metrics": {}, + + "context": { + "network_type": "wifi", + "os_version": "Android 14", + "device_model": "Xiaomi 13", + "locale": "zh-CN", + "timezone": "Asia/Taipei" + } +} +``` + +### 3.2 顶层字段定义 + +| 字段 | 类型 | 必填 | 示例 | 说明 | +|------|------|------|------|------| +| `event_id` | string | 是 | `01JQ2G...` | 事件唯一 ID,ULID/UUID,用于幂等去重 | +| `event_type` | string | 是 | `page.view` | 事件类型,dot 命名法 | +| `timestamp` | string | 是 | `2026-04-01T10:30:00.123Z` | 事件发生时间,UTC ISO8601 | +| `user_id` | string | 是 | `user_123` | 用户 ID(必填) | +| `device_id` | string | 是 | `install_xxx` | 设备标识 | +| `session_id` | string | 是 | `sess_xxx` | App 一次启动会话 ID | +| `platform` | string | 是 | `android` | android / ios / web | +| `app_version` | string | 是 | `1.0.0` | App 版本号 | +| `app_build` | string \| null | 否 | `100` | 构建号 | +| `env` | string | 是 | `prod` | dev / staging / prod | +| `page_name` | string \| null | 否 | `home` | 当前页面名 | +| `trace_id` | string \| null | 否 | `trace_xxx` | 用于串联日志、接口、错误 | +| `request_id` | string \| null | 否 | `req_xxx` | 与某次请求关联时可传 | +| `attributes` | object | 是 | `{}` | 离散属性、分类信息 | +| `metrics` | object | 是 | `{}` | 可聚合数值指标 | +| `context` | object | 否 | `{...}` | 客户端环境上下文 | + +### 3.3 attributes 规范 + +只放离散属性、分类信息、上下文标签。 + +**允许内容:** string, boolean, number, null, 一层简单 object 或 string list + +**适合放这里:** +- `method`: "password" +- `page_from`: "home" +- `conversation_id`: "conv_123" +- `element_id`: "send_button" +- `logout_reason`: "manual" + +**不适合放这里(应进 metrics):** +- 停留时长、点击次数、响应时间、消息数、会话时长 + +### 3.4 metrics 规范 + +只放数值型、可聚合、可统计指标。 + +**允许内容:** int, float + +**适合放这里:** +- `stay_duration_ms`: 停留时长(毫秒) +- `click_count`: 点击次数 +- `response_time_ms`: 响应耗时 +- `message_count`: 消息数量 +- `session_duration_s`: 会话时长(秒) + +### 3.5 context 规范 + +通用环境上下文: + +```json +{ + "network_type": "wifi", + "os_version": "Android 14", + "device_model": "Xiaomi 13", + "locale": "zh-CN", + "timezone": "Asia/Taipei" +} +``` + +| 字段 | 类型 | 说明 | +|------|------|------| +| `network_type` | string \| null | wifi / cellular / offline / unknown | +| `os_version` | string \| null | 操作系统版本 | +| `device_model` | string \| null | 设备型号 | +| `locale` | string \| null | 当前语言地区 | +| `timezone` | string \| null | 时区标识 | + +### 3.6 事件类型定义 + +| event_type | 触发时机 | attributes | metrics | +|------------|---------|------------|---------| +| `session.login` | 用户登录成功 | `method` (password/phone_code/oauth) | - | +| `session.logout` | 用户登出 | `reason` (manual/expired/kickout) | `session_duration_s` | +| `agent.chat_completed` | Agent 对话完成 | `conversation_id`, `scenario` | `message_count`, `response_time_ms` | +| `page.view` | 页面退出时 | `page_from` | `stay_duration_ms`, `click_count` | +| `ui.click` | 元素点击时 | `element_id`, `element_type` | - | + +### 3.7 事件数据结构 + +#### session.login + +```json +{ + "event_id": "01JQ2G5Q3N6Y5N8R7M4K2P1T9A", + "event_type": "session.login", + "timestamp": "2026-04-01T10:30:00.123Z", + "user_id": "user_123", + "device_id": "install_a93f5d7c21", + "session_id": "sess_7c2d5e8f91", + "platform": "android", + "app_version": "1.0.0", + "app_build": "100", + "env": "prod", + "page_name": "login", + "trace_id": "trace_login_001", + "request_id": "req_login_001", + "attributes": { "method": "password" }, + "metrics": {}, + "context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" } +} +``` + +#### session.logout + +```json +{ + "event_id": "01JQ2G6A7X9N2T4R8K1M5P3Q7B", + "event_type": "session.logout", + "timestamp": "2026-04-01T12:00:00.000Z", + "user_id": "user_123", + "device_id": "install_a93f5d7c21", + "session_id": "sess_7c2d5e8f91", + "platform": "android", + "app_version": "1.0.0", + "app_build": "100", + "env": "prod", + "page_name": "settings", + "trace_id": "trace_logout_001", + "request_id": null, + "attributes": { "reason": "manual" }, + "metrics": { "session_duration_s": 5400 }, + "context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" } +} +``` + +#### agent.chat_completed + +```json +{ + "event_id": "01JQ2G7B9V4K8N3R1T6M2P5Q8C", + "event_type": "agent.chat_completed", + "timestamp": "2026-04-01T13:20:15.456Z", + "user_id": "user_123", + "device_id": "install_a93f5d7c21", + "session_id": "sess_7c2d5e8f91", + "platform": "android", + "app_version": "1.0.0", + "app_build": "100", + "env": "prod", + "page_name": "chat", + "trace_id": "trace_agent_001", + "request_id": "req_agent_001", + "attributes": { "conversation_id": "conv_987", "scenario": "assistant" }, + "metrics": { "message_count": 4, "response_time_ms": 1320 }, + "context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" } +} +``` + +#### page.view + +```json +{ + "event_id": "01JQ2G8C1M7R4T9K2P6N5Q3D8E", + "event_type": "page.view", + "timestamp": "2026-04-01T14:05:30.000Z", + "user_id": "user_123", + "device_id": "install_a93f5d7c21", + "session_id": "sess_7c2d5e8f91", + "platform": "android", + "app_version": "1.0.0", + "app_build": "100", + "env": "prod", + "page_name": "home", + "trace_id": "trace_page_home_001", + "request_id": null, + "attributes": { "page_from": "login" }, + "metrics": { "stay_duration_ms": 18234, "click_count": 7 }, + "context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" } +} +``` + +#### ui.click + +```json +{ + "event_id": "01JQ2G9D4P8N1R6T3K5M7Q2E9F", + "event_type": "ui.click", + "timestamp": "2026-04-01T14:00:12.000Z", + "user_id": "user_123", + "device_id": "install_a93f5d7c21", + "session_id": "sess_7c2d5e8f91", + "platform": "android", + "app_version": "1.0.0", + "app_build": "100", + "env": "prod", + "page_name": "home", + "trace_id": "trace_click_001", + "request_id": null, + "attributes": { "element_id": "create_task_button", "element_type": "button" }, + "metrics": {}, + "context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" } +} +``` + +--- + +## 4. 批量上报请求结构 + +### 4.1 请求体 + +```json +{ + "client_time": "2026-04-01T14:10:00.000Z", + "sdk_version": "1.0.0", + "events": [ + { /* 事件对象 */ } + ] +} +``` + +### 4.2 字段定义 + +| 字段 | 类型 | 必填 | 说明 | +|------|------|------|------| +| `client_time` | string | 否 | 本次上报请求发起时间 | +| `sdk_version` | string | 否 | 埋点 SDK 版本 | +| `events` | array | 是 | 事件数组,建议 1~100 条 | + +--- + +## 5. 后端落盘结构 + +### 5.1 路径结构 + +按日期分桶,便于后续聚合查询: + +``` +backend/data/analytics/ +├── 2026-04-01.jsonl +├── 2026-04-02.jsonl +└── 2026-04-03.jsonl +``` + +### 5.2 JSONL 单行格式 + +每行一个完整事件对象: + +```jsonl +{"event_id":"01JQ2G5Q3N6Y5N8R7M4K2P1T9A","event_type":"session.login","timestamp":"2026-04-01T10:30:00.123Z",...} +{"event_id":"01JQ2G6A7X9N2T4R8K1M5P3Q7B","event_type":"session.logout","timestamp":"2026-04-01T12:00:00.000Z",...} +``` + +--- + +## 6. Python 类型定义 + +```python +from typing import Any, Literal +from pydantic import BaseModel, Field +from datetime import datetime + + +class AnalyticsContext(BaseModel): + network_type: str | None = None + os_version: str | None = None + device_model: str | None = None + locale: str | None = None + timezone: str | None = None + + +class AnalyticsEvent(BaseModel): + event_id: str + event_type: str + timestamp: datetime + + user_id: str + device_id: str + session_id: str + + platform: Literal["android", "ios", "web"] + app_version: str + app_build: str | None = None + env: Literal["dev", "staging", "prod"] + + page_name: str | None = None + trace_id: str | None = None + request_id: str | None = None + + attributes: dict[str, Any] = Field(default_factory=dict) + metrics: dict[str, int | float] = Field(default_factory=dict) + context: AnalyticsContext | None = None + + +class AnalyticsBatchRequest(BaseModel): + client_time: datetime | None = None + sdk_version: str | None = None + events: list[AnalyticsEvent] +``` + +--- + +## 7. Flutter 埋点 SDK 设计 + +### 4.1 目录结构 + +``` +apps/lib/core/analytics/ +├── tracker.dart # 埋点入口,单例 +├── config.dart # 配置(endpoint、flush 策略) +├── events/ # 事件定义 +│ ├── base_event.dart # 基础事件类 +│ ├── login_event.dart +│ ├── logout_event.dart +│ ├── conversation_event.dart +│ └── page_view_event.dart +├── queue/ +│ └── event_queue.dart # 内存队列 +└── sender.dart # 异步 HTTP 发送器 +``` + +### 4.2 核心流程 + +1. **事件采集**:调用 `AnalyticsTracker.track(event)` 记录事件 +2. **队列缓冲**:事件先入内存队列 +3. **批量发送**:队列满(50条)或定时(30秒)触发批量发送 +4. **失败重试**:发送失败则落盘本地,下次启动时重试 + +### 4.3 初始化 + +在 `apps/lib/main.dart` 的 App 初始化阶段: + +```dart +await AnalyticsTracker.init( + endpoint: '${Env.apiBaseUrl}/v1/analytics/events', + deviceId: deviceInfo.id, +); +``` + +### 4.4 页面停留时长计算 + +- `page.view` 事件在页面 `initState` 时发送 `enter_time` +- 在页面 `dispose` 时补充 `stay_duration_ms` +- 点击次数由各页面手动调用 `trackClick(pageName, elementId)` + +### 4.5 Agent 对话统计 + +- 在 Agent 对话完成回调中触发 `agent.conversation` 事件 +- `message_count` 为用户发送的消息数 +- `response_time_ms` 为首次响应耗时 + +--- + +## 5. 后端接收路由 + +### 5.1 路由定义 + +**端点:** `POST /v1/analytics/events` + +### 5.2 请求格式 + +```json +{ + "events": [ + { /* 事件对象 */ }, + { /* 事件对象 */ } + ] +} +``` + +### 5.3 响应格式 + +```json +{ + "received": 10, + "queued": true +} +``` + +### 5.4 异步写入流程 + +``` +请求 → 内存队列缓冲 → [队列满100条 或 超时5秒] → taskiq 任务 → 批量写入文件 +``` + +- API 接收事件后立即放入内存队列,返回 202 Accepted +- 后台 taskiq worker 消费队列,批量写入 JSONL 文件 +- 使用文件锁保证多 worker 并发写入安全 + +### 5.5 Taskiq 任务定义 + +```python +@taskify +async def write_analytics_events(batch: list[AnalyticsEvent], date: str): + """批量写入事件到 JSONL 文件""" + # 按 event_type 分组写入对应文件 + # 每条事件追加到当日文件末尾 +``` + +### 5.6 文件写入 + +路径:`backend/data/analytics/{event_type}/{YYYY-MM-DD}.jsonl` + +写入策略: +- 每条事件追加到当日文件末尾 +- 使用文件锁保证并发写入安全 +- 目录不存在时自动创建 + +### 5.7 目录结构 + +``` +backend/data/analytics/ +├── session.login/ +│ ├── 2026-04-01.jsonl +│ └── 2026-04-02.jsonl +├── session.logout/ +│ └── 2026-04-01.jsonl +├── agent.conversation/ +│ └── 2026-04-01.jsonl +└── page.view/ + └── 2026-04-01.jsonl +``` + +--- + +## 6. 可视化网站 + +### 6.1 技术栈 + +- **框架**:React 18 + Vite +- **图表**:ECharts +- **样式**:TailwindCSS +- **HTTP**:Axios + +### 6.2 项目结构 + +``` +web/ +├── src/ +│ ├── main.jsx +│ ├── App.jsx +│ ├── pages/ +│ │ ├── Login.jsx +│ │ └── Dashboard.jsx +│ ├── components/ +│ │ ├── StatCard.jsx +│ │ ├── LoginTrendChart.jsx +│ │ ├── PageClickChart.jsx +│ │ └── StayDurationChart.jsx +│ ├── services/ +│ │ └── api.js +│ └── styles/ +├── public/ +├── package.json +├── vite.config.js +└── .env +``` + +### 6.3 访问方式 + +与后端打包在一起,通过子路径访问: + +``` +http://域名/analytics/ # 静态网站 +http://域名/api/v1/analytics/ # API 路由 +``` + +实现方式:FastAPI mount 静态文件目录 + +```python +from fastapi.staticfiles import StaticFiles + +app.mount("/analytics", StaticFiles(directory="web/dist", html=True), name="analytics") +``` + +### 6.4 认证方式 + +**推荐:后端验证密码,返回 JWT Token** + +1. 前端登录页输入密码 +2. 调用 `POST /api/v1/analytics/login` 验证 +3. 后端读取 `.env` 中 `ANALYTICS_PASSWORD` 验证 +4. 验证成功返回 JWT Token,前端存 sessionStorage +5. 后续请求带 Token,后端验证 + +### 6.5 页面设计 + +**登录页 (`/login`)** +- 简单密码输入框 +- 调用后端 API 验证 +- 登录成功后写入 sessionStorage + +**仪表盘 (`/dashboard`)** +- 顶部:日期范围选择器(默认最近7天) +- 统计卡片: + - DAU(当日独立用户数) + - 累计对话次数 + - 累计页面点击量 + - 平均停留时长 +- 图表: + - 折线图:每日登录趋势 + - 柱状图:各页面点击量 Top 10 + - 热力图:页面停留时长分布 + +### 6.6 数据读取 + +- 前端通过 `GET /api/v1/analytics/summary` 获取聚合数据 +- 后端解析 `backend/data/analytics/*.jsonl` 文件并聚合 +- 提供 `GET /api/v1/analytics/daily` 等查询接口 + +--- + +## 7. 环境变量 + +### 7.1 backend/.env 新增 + +```env +# Analytics 数据存储路径 +ANALYTICS_DATA_PATH=backend/data/analytics + +# 可视化网站密码(用于 /api/v1/analytics/login 验证) +ANALYTICS_PASSWORD=your-secure-password +``` + +### 7.2 后端登录验证 API + +**端点:** `POST /api/v1/analytics/login` + +**请求:** +```json +{ + "password": "your-password" +} +``` + +**响应(成功):** +```json +{ + "token": "jwt-token-here" +} +``` + +**响应(失败):** +```json +{ + "error": "invalid_password" +} +``` + +### 7.3 Web 环境变量 + +web 构建时可通过环境变量注入 API 地址: + +```env +VITE_API_BASE_URL=http://localhost:5775 +``` + +注:密码不在前端存储,改为后端验证方式。 + +--- + +## 8. Worker Queue 重构 + +### 8.1 背景 + +现有 `automation` 队列仅用于将任务转发到 `agent` 队列,职责单一。为支持 analytics 异步写入,将 `automation` 重构为 `general`,用于承载所有非实时任务。 + +### 8.2 重构范围 + +| 文件 | 改动内容 | +|------|---------| +| `backend/src/core/taskiq/app.py` | `worker_automation_broker` → `worker_general_broker`,queue name `"automation"` → `"general"` | +| `backend/src/core/taskiq/__init__.py` | 更新 export 名称 | +| `backend/src/core/agentscope/runtime/tasks.py` | import 和 `@worker_automation_broker.task` → `@worker_general_broker.task` | +| `backend/src/v1/agent/service.py:145` | `queue = "automation"` → `queue = "general"` | +| `backend/tests/unit/core/taskiq/test_app.py` | 更新引用 | +| `infra/scripts/app.sh` | `WORKER_AUTOMATION_CMD` → `WORKER_GENERAL_CMD`,tmux window 改名 | +| `deploy/docker-compose.prod.yml` | worker-automation service → worker-general | + +### 8.3 环境变量改动 + +```env +# 当前 +SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=1 + +# 改为 +SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY=1 +``` + +涉及文件: +- `.env` +- `.env.example` +- `deploy/.env.prod` +- `deploy/.env.prod.example` + +--- + +## 9. 实施计划 + +| 阶段 | 任务 | 产出 | +|------|------|------| +| 0 | Worker Queue 重构 (`automation` → `general`) | 所有 queue 相关文件 | +| 1 | 创建 `docs/plans/2026-04-01-analytics-design.md` | 设计文档 | +| 2 | 实现后端 `POST /v1/analytics/events` 路由 | `backend/src/v1/analytics/` | +| 3 | 实现 Flutter 埋点 SDK | `apps/lib/core/analytics/` | +| 4 | 集成埋点到现有 App 页面 | 修改 `home_screen`、`auth` 等 | +| 5 | 搭建 React 项目框架 | `web/` | +| 6 | 实现 Dashboard 页面和图表 | ECharts 组件 | +| 7 | 端到端测试验证 | 测试报告 | + +--- + +## 10. 风险与限制 + +- **数据丢失**:批量发送失败时落盘本地,最多保留次日重试 +- **并发写入**:多实例部署时需使用分布式锁,当前设计仅支持单实例 +- **密码安全**:简单密码校验,生产环境建议使用 JWT + +--- + +## 11. 后续扩展方向 + +- 支持更多事件类型(分享、搜索、错误) +- 接入 OpenTelemetry Collector +- 接入 ClickHouse 进行 OLAP 查询 +- 支持数据导出功能 diff --git a/docs/protocols/common/http-error-codes.md b/docs/protocols/common/http-error-codes.md index fc08cba..af8d2e6 100644 --- a/docs/protocols/common/http-error-codes.md +++ b/docs/protocols/common/http-error-codes.md @@ -73,6 +73,7 @@ When creating/modifying/deprecating any code, this table must be updated in the | `AGENT_SESSION_ID_INVALID` | agent | 422 | Session ID is not a valid UUID | | `AGENT_SESSION_NOT_FOUND` | agent | 404 | Agent chat session does not exist | | `AGENT_USER_ID_INVALID` | agent | 422 | User ID is not a valid UUID | +| `AGENT_UPSTREAM_CONNECTION_ERROR` | agent | 503 | Upstream AI service connection failed (network/proxy issue) | | `INVALID_BINARY_URL_HOST` | agent | 422 | Signed URL host is invalid | | `INVALID_BINARY_URL_BUCKET` | agent | 422 | Signed URL bucket is invalid | | `INVALID_BINARY_URL_PATH_SCOPE` | agent | 422 | Signed URL path scope is invalid | @@ -201,6 +202,7 @@ Exit code policy: - `AGENT_SESSION_ID_INVALID` - `AGENT_SESSION_NOT_FOUND` - `AGENT_USER_ID_INVALID` +- `AGENT_UPSTREAM_CONNECTION_ERROR` ## Compatibility Strategy diff --git a/infra/scripts/app.sh b/infra/scripts/app.sh index 229bb2a..6df57a4 100755 --- a/infra/scripts/app.sh +++ b/infra/scripts/app.sh @@ -160,14 +160,14 @@ ${SOCIAL_WEB__HOST:-0.0.0.0} --port ${WEB_PORT} --workers \ ${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}" WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY:-2}" - WORKER_AUTOMATION_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-automation uv run taskiq worker core.taskiq.app:worker_automation_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY:-1}" + WORKER_GENERAL_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-general uv run taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY:-1}" SCHEDULER_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=scheduler uv run python -m core.runtime.cli automation-scheduler" echo "Starting tmux workers in session '$SESSION_NAME'..." tmux new-session -d -s "$SESSION_NAME" -n web "bash -lc \"$WEB_CMD; echo '[web] exited'; exec bash\"" tmux new-window -t "$SESSION_NAME" -n worker-agent "bash -lc \"$WORKER_AGENT_CMD; echo '[worker-agent] exited'; exec bash\"" - tmux new-window -t "$SESSION_NAME" -n worker-automation "bash -lc \"$WORKER_AUTOMATION_CMD; echo '[worker-automation] exited'; exec bash\"" + tmux new-window -t "$SESSION_NAME" -n worker-general "bash -lc \"$WORKER_GENERAL_CMD; echo '[worker-general] exited'; exec bash\"" tmux new-window -t "$SESSION_NAME" -n scheduler "bash -lc \"$SCHEDULER_CMD; echo '[scheduler] exited'; exec bash\"" echo "" @@ -175,7 +175,7 @@ ${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}" echo "Log files will be created in logs/ directory:" echo " - web.log, web.error.log" echo " - worker-agent.log, worker-agent.error.log" - echo " - worker-automation.log, worker-automation.error.log" + echo " - worker-general.log, worker-general.error.log" echo " - scheduler.log, scheduler.error.log" echo "" echo "tmux attach -t $SESSION_NAME"