refactor: 重命名 automation worker 为 general worker 并完善错误处理

This commit is contained in:
qzl
2026-04-01 17:24:52 +08:00
parent 0fe28a1c62
commit 24eda6ff51
19 changed files with 760 additions and 25 deletions
+2 -2
View File
@@ -30,9 +30,9 @@ SOCIAL_REDIS__DB=0
############
# Worker 队列分组配置
# agent: 常规异步任务
# automation: 批处理/重计算/可延迟任务
# general: 通用任务(analytics 写入 + automation 批处理
SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY=2
SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=2
SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY=2
############
# Automation 调度器配置
+8 -1
View File
@@ -8,6 +8,7 @@ class LogEntry {
final String? funcName;
final int? lineNo;
final String? errorType;
final String? errorMessage;
final String? stackTrace;
final Map<String, dynamic>? extra;
@@ -19,6 +20,7 @@ class LogEntry {
this.funcName,
this.lineNo,
this.errorType,
this.errorMessage,
this.stackTrace,
this.extra,
});
@@ -31,6 +33,7 @@ class LogEntry {
if (funcName != null) 'func_name': funcName,
if (lineNo != null) 'line_no': lineNo,
if (errorType != null) 'error_type': errorType,
if (errorMessage != null) 'error_message': errorMessage,
if (stackTrace != null) 'stack_trace': stackTrace,
if (extra != null && extra!.isNotEmpty) 'extra': extra,
};
@@ -43,8 +46,9 @@ class LogEntry {
].join('');
final locationStr = location.isNotEmpty ? ' [$location]' : '';
final errorStr = errorType != null ? ' [$errorType]' : '';
final errorMsgStr = errorMessage != null ? ' $errorMessage' : '';
final extraStr = extra != null && extra!.isNotEmpty ? ' $extra' : '';
return '$ts ${level.name.toUpperCase().padRight(7)} [$module$locationStr]$errorStr $message$extraStr';
return '$ts ${level.name.toUpperCase().padRight(7)} [$module$locationStr]$errorStr $message$errorMsgStr$extraStr';
}
String toFileString() {
@@ -59,6 +63,9 @@ class LogEntry {
if (errorType != null) {
sb.writeln(' Error: $errorType');
}
if (errorMessage != null) {
sb.writeln(' ErrorMessage: $errorMessage');
}
if (stackTrace != null) {
sb.writeln(' StackTrace:');
sb.writeln(stackTrace);
+1
View File
@@ -154,6 +154,7 @@ class LogService {
funcName: funcName,
lineNo: lineNo,
errorType: error.runtimeType.toString(),
errorMessage: error.toString(),
stackTrace: stackTrace.toString(),
extra: extra,
),
+11 -1
View File
@@ -72,8 +72,18 @@ class Logger {
required StackTrace stackTrace,
Map<String, dynamic>? extra,
}) {
final entry = LogEntry(
timestamp: DateTime.now(),
level: LogLevel.error,
message: message,
module: module,
errorType: error.runtimeType.toString(),
errorMessage: error.toString(),
stackTrace: stackTrace.toString(),
extra: extra,
);
if (_isNoOp) {
debugPrint('[$module] ERROR: $message, error: $error');
debugPrint(entry.toConsoleString());
return;
}
_service!.error(
@@ -110,6 +110,7 @@ class ReminderSchedulerService {
>();
final androidGranted = await android?.requestNotificationsPermission();
await android?.requestExactAlarmsPermission();
final iosGranted = await ios?.requestPermissions(alert: true, sound: true);
final macosGranted = await macos?.requestPermissions(
alert: true,
@@ -57,6 +57,8 @@ String? mapErrorCodeToL10nKey(
return 'errorNotFound';
case 'AGENT_USER_ID_INVALID':
return 'errorGenericSafe';
case 'AGENT_UPSTREAM_CONNECTION_ERROR':
return 'errorNetwork';
case 'INVALID_BINARY_URL_HOST':
return 'errorAgentInvalidBinaryUrl';
case 'INVALID_BINARY_URL_BUCKET':
@@ -242,6 +244,8 @@ String? resolveErrorCodeMessage(
return L10n.current.errorGenericSafe;
case 'errorReLogin':
return L10n.current.errorReLogin;
case 'errorNetwork':
return L10n.current.errorNetwork;
default:
return null;
}
@@ -5,6 +5,7 @@ from typing import Any, Awaitable, Callable, Protocol
from ag_ui.core.types import RunAgentInput
from agentscope.message import Msg
from openai import APIConnectionError
from core.agentscope.runtime.runner import AgentScopeRunner
from core.logging import get_logger
from schemas.domain.automation import RuntimeConfig
@@ -106,6 +107,23 @@ class AgentScopeRuntimeOrchestrator:
},
)
raise
except APIConnectionError:
logger.warning(
"agentscope upstream connection failed",
thread_id=thread_id,
run_id=run_id,
)
await self._pipeline.emit(
session_id=thread_id,
event={
"type": "RUN_ERROR",
"threadId": thread_id,
"runId": run_id,
"message": "network error",
"code": "AGENT_UPSTREAM_CONNECTION_ERROR",
},
)
raise
except Exception:
logger.exception(
"agentscope runtime execution failed",
+3 -3
View File
@@ -26,7 +26,7 @@ from core.auth.models import CurrentUser
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.logging import get_logger
from core.taskiq.app import worker_agent_broker, worker_automation_broker
from core.taskiq.app import worker_agent_broker, worker_general_broker
from schemas.agent.forwarded_props import (
RuntimeMode,
parse_forwarded_props_runtime_mode,
@@ -345,6 +345,6 @@ async def run_command_task_agent(command: dict[str, object]) -> dict[str, object
return await run_agentscope_task(command)
@worker_automation_broker.task(task_name="tasks.agentscope.run_command.automation")
async def run_command_task_automation(command: dict[str, object]) -> dict[str, object]:
@worker_general_broker.task(task_name="tasks.agentscope.run_command.general")
async def run_command_task_general(command: dict[str, object]) -> dict[str, object]:
return await run_agentscope_task(command)
+2 -2
View File
@@ -1,3 +1,3 @@
from core.taskiq.app import broker, worker_agent_broker, worker_automation_broker
from core.taskiq.app import broker, worker_agent_broker, worker_general_broker
__all__ = ["broker", "worker_agent_broker", "worker_automation_broker"]
__all__ = ["broker", "worker_agent_broker", "worker_general_broker"]
+3 -2
View File
@@ -12,6 +12,7 @@ log_service_banner(
environment=config.runtime.environment,
)
def _build_broker(queue_name: str) -> ListQueueBroker:
return ListQueueBroker(
url=config.taskiq_broker_url,
@@ -22,8 +23,8 @@ def _build_broker(queue_name: str) -> ListQueueBroker:
worker_agent_broker = _build_broker("agent")
worker_automation_broker = _build_broker("automation")
worker_general_broker = _build_broker("general")
broker = worker_agent_broker
__all__ = ["broker", "worker_agent_broker", "worker_automation_broker"]
__all__ = ["broker", "worker_agent_broker", "worker_general_broker"]
+1 -1
View File
@@ -142,7 +142,7 @@ class AgentService:
metadata=user_message_metadata,
)
queue = "automation" if runtime_mode == RuntimeMode.AUTOMATION else "agent"
queue = "general" if runtime_mode == RuntimeMode.AUTOMATION else "agent"
task_id = await self._queue.enqueue(
command={
"command": "run",
+2 -2
View File
@@ -5,13 +5,13 @@ import sys
import pytest
from core.taskiq.app import broker, worker_agent_broker, worker_automation_broker
from core.taskiq.app import broker, worker_agent_broker, worker_general_broker
def test_taskiq_broker_is_configured() -> None:
assert broker is not None
assert worker_agent_broker is broker
assert worker_automation_broker is not None
assert worker_general_broker is not None
def test_taskiq_app_configures_logging_on_import(
+2 -2
View File
@@ -29,9 +29,9 @@ SOCIAL_REDIS__DB=0
# Worker 队列分组配置
############
# agent: 常规异步任务
# automation: 批处理/重计算/可延迟任务
# general: 通用任务(analytics 写入 + automation 批处理
SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY=3
SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=1
SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY=1
############
# Automation 调度器配置
+2 -2
View File
@@ -2,7 +2,7 @@
本目录是单机 `docker compose` 的生产交付包,架构为:
- 应用层:`web + worker-agent + worker-automation + scheduler + init-job`
- 应用层:`web + worker-agent + worker-general + scheduler + init-job`
- 中间件:`redis`
- 数据与认证:云 Supabase(通过环境变量访问)
- 反向代理:由服务器侧 nginx 托管(不在本目录编排)
@@ -81,7 +81,7 @@ cp deploy/.env.prod.example deploy/.env.prod
### 2) 启动常驻服务
```bash
docker compose --env-file deploy/.env.prod -f deploy/docker-compose.prod.yml up -d redis web worker-agent worker-automation scheduler
docker compose --env-file deploy/.env.prod -f deploy/docker-compose.prod.yml up -d redis web worker-agent worker-general scheduler
```
### 3) 执行一次性 bootstrap
+4 -4
View File
@@ -80,21 +80,21 @@ services:
- ../logs:/app/logs
- ./static/releases:/app/deploy/static/releases:ro
worker-automation:
worker-general:
image: ${SOCIAL_BACKEND_IMAGE:-social-app-backend:prod}
container_name: social-prod-worker-automation
container_name: social-prod-worker-general
restart: unless-stopped
env_file:
- ./.env.prod
environment:
- PYTHONPATH=/app/backend/src
- PYTHONDONTWRITEBYTECODE=1
- SOCIAL_RUNTIME__SERVICE_NAME=worker-automation
- SOCIAL_RUNTIME__SERVICE_NAME=worker-general
- SOCIAL_RUNTIME__ENVIRONMENT=${SOCIAL_RUNTIME__ENVIRONMENT:-prod}
- SOCIAL_REDIS__HOST=redis
- SOCIAL_REDIS__PORT=6379
command: >
sh -c '.venv/bin/taskiq worker core.taskiq.app:worker_automation_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY:-1}'
sh -c '.venv/bin/taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY:-1}'
depends_on:
redis:
condition: service_healthy
+691
View File
@@ -0,0 +1,691 @@
# Analytics 数据埋点与可视化方案
## 1. 概述
为 Flutter 移动应用实现数据埋点系统,采集用户行为数据,通过异步方式发送至后端存储,并提供可视化网站展示。
### 目标
- 监测每日用户登录时间
- 记录 Agent 对话次数
- 统计每个页面的点击次数和停留时长
- 统一数据格式,参照 OpenTelemetry 简化版规范
- 数据持久化到本地文件,供可视化网站读取
---
## 2. 系统架构
```
┌─────────────────┐ 异步 POST ┌─────────────────┐ taskiq ┌─────────────────────────┐
│ Flutter App │ ───────────────► │ FastAPI │ ──────────► │ backend/data/analytics/ │
│ (埋点 SDK) │ /v1/analytics │ /v1/analytics │ async │ {YYYY-MM-DD}.jsonl │
└─────────────────┘ └─────────────────┘ └─────────────────────────┘
│ │
│ │ taskiq worker (general queue)
▼ ▼
┌─────────────────┐
│ Redis Queue │
└─────────────────┘
```
---
## 3. 数据格式规范
### 3.1 统一事件信封结构
所有埋点事件使用统一顶层结构:
```json
{
"event_id": "01JQ2G5Q3N6Y5N8R7M4K2P1T9A",
"event_type": "session.login",
"timestamp": "2026-04-01T10:30:00.123Z",
"user_id": "user_123",
"device_id": "install_a93f5d7c21",
"session_id": "sess_7c2d5e8f91",
"platform": "android",
"app_version": "1.0.0",
"app_build": "100",
"env": "prod",
"page_name": "login",
"trace_id": "trace_8d2f6c1a",
"request_id": null,
"attributes": {},
"metrics": {},
"context": {
"network_type": "wifi",
"os_version": "Android 14",
"device_model": "Xiaomi 13",
"locale": "zh-CN",
"timezone": "Asia/Taipei"
}
}
```
### 3.2 顶层字段定义
| 字段 | 类型 | 必填 | 示例 | 说明 |
|------|------|------|------|------|
| `event_id` | string | 是 | `01JQ2G...` | 事件唯一 IDULID/UUID,用于幂等去重 |
| `event_type` | string | 是 | `page.view` | 事件类型,dot 命名法 |
| `timestamp` | string | 是 | `2026-04-01T10:30:00.123Z` | 事件发生时间,UTC ISO8601 |
| `user_id` | string | 是 | `user_123` | 用户 ID(必填) |
| `device_id` | string | 是 | `install_xxx` | 设备标识 |
| `session_id` | string | 是 | `sess_xxx` | App 一次启动会话 ID |
| `platform` | string | 是 | `android` | android / ios / web |
| `app_version` | string | 是 | `1.0.0` | App 版本号 |
| `app_build` | string \| null | 否 | `100` | 构建号 |
| `env` | string | 是 | `prod` | dev / staging / prod |
| `page_name` | string \| null | 否 | `home` | 当前页面名 |
| `trace_id` | string \| null | 否 | `trace_xxx` | 用于串联日志、接口、错误 |
| `request_id` | string \| null | 否 | `req_xxx` | 与某次请求关联时可传 |
| `attributes` | object | 是 | `{}` | 离散属性、分类信息 |
| `metrics` | object | 是 | `{}` | 可聚合数值指标 |
| `context` | object | 否 | `{...}` | 客户端环境上下文 |
### 3.3 attributes 规范
只放离散属性、分类信息、上下文标签。
**允许内容:** string, boolean, number, null, 一层简单 object 或 string list
**适合放这里:**
- `method`: "password"
- `page_from`: "home"
- `conversation_id`: "conv_123"
- `element_id`: "send_button"
- `logout_reason`: "manual"
**不适合放这里(应进 metrics):**
- 停留时长、点击次数、响应时间、消息数、会话时长
### 3.4 metrics 规范
只放数值型、可聚合、可统计指标。
**允许内容:** int, float
**适合放这里:**
- `stay_duration_ms`: 停留时长(毫秒)
- `click_count`: 点击次数
- `response_time_ms`: 响应耗时
- `message_count`: 消息数量
- `session_duration_s`: 会话时长(秒)
### 3.5 context 规范
通用环境上下文:
```json
{
"network_type": "wifi",
"os_version": "Android 14",
"device_model": "Xiaomi 13",
"locale": "zh-CN",
"timezone": "Asia/Taipei"
}
```
| 字段 | 类型 | 说明 |
|------|------|------|
| `network_type` | string \| null | wifi / cellular / offline / unknown |
| `os_version` | string \| null | 操作系统版本 |
| `device_model` | string \| null | 设备型号 |
| `locale` | string \| null | 当前语言地区 |
| `timezone` | string \| null | 时区标识 |
### 3.6 事件类型定义
| event_type | 触发时机 | attributes | metrics |
|------------|---------|------------|---------|
| `session.login` | 用户登录成功 | `method` (password/phone_code/oauth) | - |
| `session.logout` | 用户登出 | `reason` (manual/expired/kickout) | `session_duration_s` |
| `agent.chat_completed` | Agent 对话完成 | `conversation_id`, `scenario` | `message_count`, `response_time_ms` |
| `page.view` | 页面退出时 | `page_from` | `stay_duration_ms`, `click_count` |
| `ui.click` | 元素点击时 | `element_id`, `element_type` | - |
### 3.7 事件数据结构
#### session.login
```json
{
"event_id": "01JQ2G5Q3N6Y5N8R7M4K2P1T9A",
"event_type": "session.login",
"timestamp": "2026-04-01T10:30:00.123Z",
"user_id": "user_123",
"device_id": "install_a93f5d7c21",
"session_id": "sess_7c2d5e8f91",
"platform": "android",
"app_version": "1.0.0",
"app_build": "100",
"env": "prod",
"page_name": "login",
"trace_id": "trace_login_001",
"request_id": "req_login_001",
"attributes": { "method": "password" },
"metrics": {},
"context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" }
}
```
#### session.logout
```json
{
"event_id": "01JQ2G6A7X9N2T4R8K1M5P3Q7B",
"event_type": "session.logout",
"timestamp": "2026-04-01T12:00:00.000Z",
"user_id": "user_123",
"device_id": "install_a93f5d7c21",
"session_id": "sess_7c2d5e8f91",
"platform": "android",
"app_version": "1.0.0",
"app_build": "100",
"env": "prod",
"page_name": "settings",
"trace_id": "trace_logout_001",
"request_id": null,
"attributes": { "reason": "manual" },
"metrics": { "session_duration_s": 5400 },
"context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" }
}
```
#### agent.chat_completed
```json
{
"event_id": "01JQ2G7B9V4K8N3R1T6M2P5Q8C",
"event_type": "agent.chat_completed",
"timestamp": "2026-04-01T13:20:15.456Z",
"user_id": "user_123",
"device_id": "install_a93f5d7c21",
"session_id": "sess_7c2d5e8f91",
"platform": "android",
"app_version": "1.0.0",
"app_build": "100",
"env": "prod",
"page_name": "chat",
"trace_id": "trace_agent_001",
"request_id": "req_agent_001",
"attributes": { "conversation_id": "conv_987", "scenario": "assistant" },
"metrics": { "message_count": 4, "response_time_ms": 1320 },
"context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" }
}
```
#### page.view
```json
{
"event_id": "01JQ2G8C1M7R4T9K2P6N5Q3D8E",
"event_type": "page.view",
"timestamp": "2026-04-01T14:05:30.000Z",
"user_id": "user_123",
"device_id": "install_a93f5d7c21",
"session_id": "sess_7c2d5e8f91",
"platform": "android",
"app_version": "1.0.0",
"app_build": "100",
"env": "prod",
"page_name": "home",
"trace_id": "trace_page_home_001",
"request_id": null,
"attributes": { "page_from": "login" },
"metrics": { "stay_duration_ms": 18234, "click_count": 7 },
"context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" }
}
```
#### ui.click
```json
{
"event_id": "01JQ2G9D4P8N1R6T3K5M7Q2E9F",
"event_type": "ui.click",
"timestamp": "2026-04-01T14:00:12.000Z",
"user_id": "user_123",
"device_id": "install_a93f5d7c21",
"session_id": "sess_7c2d5e8f91",
"platform": "android",
"app_version": "1.0.0",
"app_build": "100",
"env": "prod",
"page_name": "home",
"trace_id": "trace_click_001",
"request_id": null,
"attributes": { "element_id": "create_task_button", "element_type": "button" },
"metrics": {},
"context": { "network_type": "wifi", "os_version": "Android 14", "device_model": "Xiaomi 13", "locale": "zh-CN", "timezone": "Asia/Taipei" }
}
```
---
## 4. 批量上报请求结构
### 4.1 请求体
```json
{
"client_time": "2026-04-01T14:10:00.000Z",
"sdk_version": "1.0.0",
"events": [
{ /* */ }
]
}
```
### 4.2 字段定义
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `client_time` | string | 否 | 本次上报请求发起时间 |
| `sdk_version` | string | 否 | 埋点 SDK 版本 |
| `events` | array | 是 | 事件数组,建议 1~100 条 |
---
## 5. 后端落盘结构
### 5.1 路径结构
按日期分桶,便于后续聚合查询:
```
backend/data/analytics/
├── 2026-04-01.jsonl
├── 2026-04-02.jsonl
└── 2026-04-03.jsonl
```
### 5.2 JSONL 单行格式
每行一个完整事件对象:
```jsonl
{"event_id":"01JQ2G5Q3N6Y5N8R7M4K2P1T9A","event_type":"session.login","timestamp":"2026-04-01T10:30:00.123Z",...}
{"event_id":"01JQ2G6A7X9N2T4R8K1M5P3Q7B","event_type":"session.logout","timestamp":"2026-04-01T12:00:00.000Z",...}
```
---
## 6. Python 类型定义
```python
from typing import Any, Literal
from pydantic import BaseModel, Field
from datetime import datetime
class AnalyticsContext(BaseModel):
network_type: str | None = None
os_version: str | None = None
device_model: str | None = None
locale: str | None = None
timezone: str | None = None
class AnalyticsEvent(BaseModel):
event_id: str
event_type: str
timestamp: datetime
user_id: str
device_id: str
session_id: str
platform: Literal["android", "ios", "web"]
app_version: str
app_build: str | None = None
env: Literal["dev", "staging", "prod"]
page_name: str | None = None
trace_id: str | None = None
request_id: str | None = None
attributes: dict[str, Any] = Field(default_factory=dict)
metrics: dict[str, int | float] = Field(default_factory=dict)
context: AnalyticsContext | None = None
class AnalyticsBatchRequest(BaseModel):
client_time: datetime | None = None
sdk_version: str | None = None
events: list[AnalyticsEvent]
```
---
## 7. Flutter 埋点 SDK 设计
### 4.1 目录结构
```
apps/lib/core/analytics/
├── tracker.dart # 埋点入口,单例
├── config.dart # 配置(endpoint、flush 策略)
├── events/ # 事件定义
│ ├── base_event.dart # 基础事件类
│ ├── login_event.dart
│ ├── logout_event.dart
│ ├── conversation_event.dart
│ └── page_view_event.dart
├── queue/
│ └── event_queue.dart # 内存队列
└── sender.dart # 异步 HTTP 发送器
```
### 4.2 核心流程
1. **事件采集**:调用 `AnalyticsTracker.track(event)` 记录事件
2. **队列缓冲**:事件先入内存队列
3. **批量发送**:队列满(50条)或定时(30秒)触发批量发送
4. **失败重试**:发送失败则落盘本地,下次启动时重试
### 4.3 初始化
`apps/lib/main.dart` 的 App 初始化阶段:
```dart
await AnalyticsTracker.init(
endpoint: '${Env.apiBaseUrl}/v1/analytics/events',
deviceId: deviceInfo.id,
);
```
### 4.4 页面停留时长计算
- `page.view` 事件在页面 `initState` 时发送 `enter_time`
- 在页面 `dispose` 时补充 `stay_duration_ms`
- 点击次数由各页面手动调用 `trackClick(pageName, elementId)`
### 4.5 Agent 对话统计
- 在 Agent 对话完成回调中触发 `agent.conversation` 事件
- `message_count` 为用户发送的消息数
- `response_time_ms` 为首次响应耗时
---
## 5. 后端接收路由
### 5.1 路由定义
**端点:** `POST /v1/analytics/events`
### 5.2 请求格式
```json
{
"events": [
{ /* */ },
{ /* */ }
]
}
```
### 5.3 响应格式
```json
{
"received": 10,
"queued": true
}
```
### 5.4 异步写入流程
```
请求 → 内存队列缓冲 → [队列满100条 或 超时5秒] → taskiq 任务 → 批量写入文件
```
- API 接收事件后立即放入内存队列,返回 202 Accepted
- 后台 taskiq worker 消费队列,批量写入 JSONL 文件
- 使用文件锁保证多 worker 并发写入安全
### 5.5 Taskiq 任务定义
```python
@taskify
async def write_analytics_events(batch: list[AnalyticsEvent], date: str):
"""批量写入事件到 JSONL 文件"""
# 按 event_type 分组写入对应文件
# 每条事件追加到当日文件末尾
```
### 5.6 文件写入
路径:`backend/data/analytics/{event_type}/{YYYY-MM-DD}.jsonl`
写入策略:
- 每条事件追加到当日文件末尾
- 使用文件锁保证并发写入安全
- 目录不存在时自动创建
### 5.7 目录结构
```
backend/data/analytics/
├── session.login/
│ ├── 2026-04-01.jsonl
│ └── 2026-04-02.jsonl
├── session.logout/
│ └── 2026-04-01.jsonl
├── agent.conversation/
│ └── 2026-04-01.jsonl
└── page.view/
└── 2026-04-01.jsonl
```
---
## 6. 可视化网站
### 6.1 技术栈
- **框架**React 18 + Vite
- **图表**ECharts
- **样式**TailwindCSS
- **HTTP**Axios
### 6.2 项目结构
```
web/
├── src/
│ ├── main.jsx
│ ├── App.jsx
│ ├── pages/
│ │ ├── Login.jsx
│ │ └── Dashboard.jsx
│ ├── components/
│ │ ├── StatCard.jsx
│ │ ├── LoginTrendChart.jsx
│ │ ├── PageClickChart.jsx
│ │ └── StayDurationChart.jsx
│ ├── services/
│ │ └── api.js
│ └── styles/
├── public/
├── package.json
├── vite.config.js
└── .env
```
### 6.3 访问方式
与后端打包在一起,通过子路径访问:
```
http://域名/analytics/ # 静态网站
http://域名/api/v1/analytics/ # API 路由
```
实现方式:FastAPI mount 静态文件目录
```python
from fastapi.staticfiles import StaticFiles
app.mount("/analytics", StaticFiles(directory="web/dist", html=True), name="analytics")
```
### 6.4 认证方式
**推荐:后端验证密码,返回 JWT Token**
1. 前端登录页输入密码
2. 调用 `POST /api/v1/analytics/login` 验证
3. 后端读取 `.env``ANALYTICS_PASSWORD` 验证
4. 验证成功返回 JWT Token,前端存 sessionStorage
5. 后续请求带 Token,后端验证
### 6.5 页面设计
**登录页 (`/login`)**
- 简单密码输入框
- 调用后端 API 验证
- 登录成功后写入 sessionStorage
**仪表盘 (`/dashboard`)**
- 顶部:日期范围选择器(默认最近7天)
- 统计卡片:
- DAU(当日独立用户数)
- 累计对话次数
- 累计页面点击量
- 平均停留时长
- 图表:
- 折线图:每日登录趋势
- 柱状图:各页面点击量 Top 10
- 热力图:页面停留时长分布
### 6.6 数据读取
- 前端通过 `GET /api/v1/analytics/summary` 获取聚合数据
- 后端解析 `backend/data/analytics/*.jsonl` 文件并聚合
- 提供 `GET /api/v1/analytics/daily` 等查询接口
---
## 7. 环境变量
### 7.1 backend/.env 新增
```env
# Analytics 数据存储路径
ANALYTICS_DATA_PATH=backend/data/analytics
# 可视化网站密码(用于 /api/v1/analytics/login 验证)
ANALYTICS_PASSWORD=your-secure-password
```
### 7.2 后端登录验证 API
**端点:** `POST /api/v1/analytics/login`
**请求:**
```json
{
"password": "your-password"
}
```
**响应(成功):**
```json
{
"token": "jwt-token-here"
}
```
**响应(失败):**
```json
{
"error": "invalid_password"
}
```
### 7.3 Web 环境变量
web 构建时可通过环境变量注入 API 地址:
```env
VITE_API_BASE_URL=http://localhost:5775
```
注:密码不在前端存储,改为后端验证方式。
---
## 8. Worker Queue 重构
### 8.1 背景
现有 `automation` 队列仅用于将任务转发到 `agent` 队列,职责单一。为支持 analytics 异步写入,将 `automation` 重构为 `general`,用于承载所有非实时任务。
### 8.2 重构范围
| 文件 | 改动内容 |
|------|---------|
| `backend/src/core/taskiq/app.py` | `worker_automation_broker``worker_general_broker`queue name `"automation"``"general"` |
| `backend/src/core/taskiq/__init__.py` | 更新 export 名称 |
| `backend/src/core/agentscope/runtime/tasks.py` | import 和 `@worker_automation_broker.task``@worker_general_broker.task` |
| `backend/src/v1/agent/service.py:145` | `queue = "automation"``queue = "general"` |
| `backend/tests/unit/core/taskiq/test_app.py` | 更新引用 |
| `infra/scripts/app.sh` | `WORKER_AUTOMATION_CMD``WORKER_GENERAL_CMD`tmux window 改名 |
| `deploy/docker-compose.prod.yml` | worker-automation service → worker-general |
### 8.3 环境变量改动
```env
# 当前
SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY=1
# 改为
SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY=1
```
涉及文件:
- `.env`
- `.env.example`
- `deploy/.env.prod`
- `deploy/.env.prod.example`
---
## 9. 实施计划
| 阶段 | 任务 | 产出 |
|------|------|------|
| 0 | Worker Queue 重构 (`automation``general`) | 所有 queue 相关文件 |
| 1 | 创建 `docs/plans/2026-04-01-analytics-design.md` | 设计文档 |
| 2 | 实现后端 `POST /v1/analytics/events` 路由 | `backend/src/v1/analytics/` |
| 3 | 实现 Flutter 埋点 SDK | `apps/lib/core/analytics/` |
| 4 | 集成埋点到现有 App 页面 | 修改 `home_screen``auth` 等 |
| 5 | 搭建 React 项目框架 | `web/` |
| 6 | 实现 Dashboard 页面和图表 | ECharts 组件 |
| 7 | 端到端测试验证 | 测试报告 |
---
## 10. 风险与限制
- **数据丢失**:批量发送失败时落盘本地,最多保留次日重试
- **并发写入**:多实例部署时需使用分布式锁,当前设计仅支持单实例
- **密码安全**:简单密码校验,生产环境建议使用 JWT
---
## 11. 后续扩展方向
- 支持更多事件类型(分享、搜索、错误)
- 接入 OpenTelemetry Collector
- 接入 ClickHouse 进行 OLAP 查询
- 支持数据导出功能
@@ -73,6 +73,7 @@ When creating/modifying/deprecating any code, this table must be updated in the
| `AGENT_SESSION_ID_INVALID` | agent | 422 | Session ID is not a valid UUID |
| `AGENT_SESSION_NOT_FOUND` | agent | 404 | Agent chat session does not exist |
| `AGENT_USER_ID_INVALID` | agent | 422 | User ID is not a valid UUID |
| `AGENT_UPSTREAM_CONNECTION_ERROR` | agent | 503 | Upstream AI service connection failed (network/proxy issue) |
| `INVALID_BINARY_URL_HOST` | agent | 422 | Signed URL host is invalid |
| `INVALID_BINARY_URL_BUCKET` | agent | 422 | Signed URL bucket is invalid |
| `INVALID_BINARY_URL_PATH_SCOPE` | agent | 422 | Signed URL path scope is invalid |
@@ -201,6 +202,7 @@ Exit code policy:
- `AGENT_SESSION_ID_INVALID`
- `AGENT_SESSION_NOT_FOUND`
- `AGENT_USER_ID_INVALID`
- `AGENT_UPSTREAM_CONNECTION_ERROR`
## Compatibility Strategy
+3 -3
View File
@@ -160,14 +160,14 @@ ${SOCIAL_WEB__HOST:-0.0.0.0} --port ${WEB_PORT} --workers \
${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}"
WORKER_AGENT_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-agent uv run taskiq worker core.taskiq.app:worker_agent_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AGENT__CONCURRENCY:-2}"
WORKER_AUTOMATION_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-automation uv run taskiq worker core.taskiq.app:worker_automation_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__AUTOMATION__CONCURRENCY:-1}"
WORKER_GENERAL_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=worker-general uv run taskiq worker core.taskiq.app:worker_general_broker core.agentscope.runtime.tasks --workers ${SOCIAL_WORKER__GROUPS__GENERAL__CONCURRENCY:-1}"
SCHEDULER_CMD="cd '$ROOT_DIR' && PYTHONPATH=backend/src SOCIAL_RUNTIME__SERVICE_NAME=scheduler uv run python -m core.runtime.cli automation-scheduler"
echo "Starting tmux workers in session '$SESSION_NAME'..."
tmux new-session -d -s "$SESSION_NAME" -n web "bash -lc \"$WEB_CMD; echo '[web] exited'; exec bash\""
tmux new-window -t "$SESSION_NAME" -n worker-agent "bash -lc \"$WORKER_AGENT_CMD; echo '[worker-agent] exited'; exec bash\""
tmux new-window -t "$SESSION_NAME" -n worker-automation "bash -lc \"$WORKER_AUTOMATION_CMD; echo '[worker-automation] exited'; exec bash\""
tmux new-window -t "$SESSION_NAME" -n worker-general "bash -lc \"$WORKER_GENERAL_CMD; echo '[worker-general] exited'; exec bash\""
tmux new-window -t "$SESSION_NAME" -n scheduler "bash -lc \"$SCHEDULER_CMD; echo '[scheduler] exited'; exec bash\""
echo ""
@@ -175,7 +175,7 @@ ${SOCIAL_WEB__WORKERS:-2} --log-level ${UVICORN_LOG_LEVEL}"
echo "Log files will be created in logs/ directory:"
echo " - web.log, web.error.log"
echo " - worker-agent.log, worker-agent.error.log"
echo " - worker-automation.log, worker-automation.error.log"
echo " - worker-general.log, worker-general.error.log"
echo " - scheduler.log, scheduler.error.log"
echo ""
echo "tmux attach -t $SESSION_NAME"