feat: 添加用户行为分析功能

This commit is contained in:
qzl
2026-04-01 18:35:32 +08:00
parent 24eda6ff51
commit 2cdf075e92
25 changed files with 1321 additions and 5 deletions
+6
View File
@@ -89,6 +89,11 @@ class RuntimeSettings(BaseModel):
return self
class AnalyticsSettings(BaseModel):
data_path: str = "backend/data/analytics"
password: str = "analytics-secret"
class TaskiqSettings(BaseModel):
broker_url: str | None = None
result_backend_url: str | None = None
@@ -274,6 +279,7 @@ class Settings(BaseSettings):
taskiq: TaskiqSettings = TaskiqSettings()
database: DatabaseSettings = DatabaseSettings()
app_version: AppVersionSettings = AppVersionSettings()
analytics: AnalyticsSettings = AnalyticsSettings()
test: TestSettings = Field(default_factory=TestSettings)
@computed_field
+44
View File
@@ -0,0 +1,44 @@
from fastapi import APIRouter, HTTPException, status
from core.config.settings import config
from core.logging import get_logger
from v1.analytics.schemas import (
AnalyticsBatchRequest,
AnalyticsBatchResponse,
AnalyticsLoginRequest,
AnalyticsLoginResponse,
)
from v1.analytics.service import get_analytics_service
from v1.analytics.tasks import write_analytics_events
logger = get_logger("v1.analytics.router")
router = APIRouter(prefix="/analytics", tags=["analytics"])
@router.post("/events", response_model=AnalyticsBatchResponse)
async def receive_events(request: AnalyticsBatchRequest) -> AnalyticsBatchResponse:
"""接收埋点事件批次"""
service = get_analytics_service()
received = await service.enqueue_events(request)
events, date = service.get_and_clear_buffer()
if events:
await write_analytics_events(batch=events, date=date)
return AnalyticsBatchResponse(received=received, queued=True)
@router.post("/login", response_model=AnalyticsLoginResponse)
async def login(request: AnalyticsLoginRequest) -> AnalyticsLoginResponse:
"""Analytics Dashboard 登录"""
if request.password != config.analytics.password:
logger.warning("Analytics login failed: invalid password")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid password",
)
logger.info("Analytics login success")
return AnalyticsLoginResponse(success=True)
+54
View File
@@ -0,0 +1,54 @@
from datetime import datetime
from typing import Any, Literal
from pydantic import BaseModel, Field
class AnalyticsContext(BaseModel):
network_type: str | None = None
os_version: str | None = None
device_model: str | None = None
locale: str | None = None
timezone: str | None = None
class AnalyticsEvent(BaseModel):
event_id: str
event_type: str
timestamp: datetime
user_id: str
device_id: str
session_id: str
platform: Literal["android", "ios", "web"]
app_version: str
app_build: str | None = None
env: Literal["dev", "staging", "prod"]
page_name: str | None = None
trace_id: str | None = None
request_id: str | None = None
attributes: dict[str, Any] = Field(default_factory=dict)
metrics: dict[str, int | float] = Field(default_factory=dict)
context: AnalyticsContext | None = None
class AnalyticsBatchRequest(BaseModel):
client_time: datetime | None = None
sdk_version: str | None = None
events: list[AnalyticsEvent]
class AnalyticsBatchResponse(BaseModel):
received: int
queued: bool = True
class AnalyticsLoginRequest(BaseModel):
password: str
class AnalyticsLoginResponse(BaseModel):
success: bool
+60
View File
@@ -0,0 +1,60 @@
from datetime import datetime, timezone
from typing import Any
from core.logging import get_logger
from v1.analytics.schemas import AnalyticsBatchRequest
logger = get_logger("v1.analytics.service")
class AnalyticsService:
def __init__(self) -> None:
self._buffer: list[dict[str, Any]] = []
self._buffer_date: str | None = None
async def enqueue_events(self, request: AnalyticsBatchRequest) -> int:
"""接收事件并放入内存缓冲,返回接收数量"""
now = datetime.now(timezone.utc)
received_count = 0
for event in request.events:
event_dict = event.model_dump(mode="json")
self._buffer.append(event_dict)
received_count += 1
if self._buffer_date is None:
self._buffer_date = now.strftime("%Y-%m-%d")
logger.info(
"Analytics events received",
count=received_count,
buffer_size=len(self._buffer),
)
return received_count
def get_and_clear_buffer(self) -> tuple[list[dict[str, Any]], str]:
"""获取当前缓冲并清空,返回 (events, date)"""
if not self._buffer:
return [], self._buffer_date or datetime.now(timezone.utc).strftime(
"%Y-%m-%d"
)
events = self._buffer.copy()
date = self._buffer_date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
self._buffer.clear()
self._buffer_date = None
return events, date
_analytics_service: AnalyticsService | None = None
def get_analytics_service() -> AnalyticsService:
global _analytics_service
if _analytics_service is None:
_analytics_service = AnalyticsService()
return _analytics_service
+43
View File
@@ -0,0 +1,43 @@
import json
from pathlib import Path
from core.config.settings import config
from core.logging import get_logger
from core.taskiq.app import worker_general_broker
logger = get_logger("v1.analytics.tasks")
def _get_analytics_data_path() -> Path:
return Path(config.analytics.data_path)
@worker_general_broker.task(task_name="v1.analytics.write_events")
async def write_analytics_events(batch: list[dict], date: str) -> dict:
"""批量写入事件到 JSONL 文件"""
data_path = _get_analytics_data_path()
data_path.mkdir(parents=True, exist_ok=True)
events_by_type: dict[str, list[str]] = {}
for event_dict in batch:
event_type = event_dict.get("event_type", "unknown")
if event_type not in events_by_type:
events_by_type[event_type] = []
events_by_type[event_type].append(json.dumps(event_dict, ensure_ascii=False))
for event_type, lines in events_by_type.items():
file_path = data_path / f"{date}.jsonl"
with open(file_path, "a", encoding="utf-8") as f:
for line in lines:
f.write(line + "\n")
logger.info(
"Analytics events written",
date=date,
total_count=len(batch),
types=list(events_by_type.keys()),
)
return {"written": len(batch), "date": date}
+445
View File
@@ -0,0 +1,445 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Analytics Dashboard</title>
<style>
:root {
--bg: #f3f4f6;
--card: #ffffff;
--text: #111827;
--muted: #6b7280;
--line: #e5e7eb;
--primary: #0f766e;
--danger: #b91c1c;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: "Segoe UI", "PingFang SC", "Microsoft YaHei", sans-serif;
color: var(--text);
background: linear-gradient(180deg, #e6fffb 0%, var(--bg) 240px);
min-height: 100vh;
}
.container {
width: min(1100px, 92vw);
margin: 24px auto 40px;
}
.card {
background: var(--card);
border: 1px solid var(--line);
border-radius: 12px;
padding: 16px;
box-shadow: 0 6px 24px rgba(15, 118, 110, 0.08);
}
.grid {
display: grid;
gap: 12px;
}
.stats {
grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
}
.toolbar {
display: flex;
flex-wrap: wrap;
gap: 12px;
align-items: end;
margin-bottom: 12px;
}
.field {
display: flex;
flex-direction: column;
gap: 6px;
min-width: 170px;
}
input {
border: 1px solid #d1d5db;
border-radius: 8px;
padding: 8px 10px;
font-size: 14px;
}
button {
border: 0;
border-radius: 8px;
padding: 9px 14px;
font-size: 14px;
cursor: pointer;
color: #fff;
background: var(--primary);
}
button[disabled] { opacity: 0.6; cursor: not-allowed; }
.btn-ghost {
background: #374151;
}
.value {
margin-top: 8px;
font-size: 26px;
font-weight: 700;
}
.muted { color: var(--muted); }
.danger { color: var(--danger); }
table {
width: 100%;
border-collapse: collapse;
margin-top: 10px;
font-size: 14px;
}
th, td {
border-bottom: 1px solid var(--line);
padding: 8px;
text-align: left;
white-space: nowrap;
}
.chart-row {
display: flex;
align-items: center;
gap: 10px;
margin: 8px 0;
}
.bar {
height: 10px;
background: linear-gradient(90deg, #14b8a6, #0f766e);
border-radius: 999px;
min-width: 2px;
}
.hidden { display: none; }
</style>
</head>
<body>
<div class="container">
<div id="loginCard" class="card" style="max-width: 420px; margin: 100px auto;">
<h2 style="margin: 0 0 12px;">Analytics 登录</h2>
<p class="muted" style="margin-top: 0;">输入密码进入聚合分析页面</p>
<form id="loginForm">
<div class="field">
<label for="password">密码</label>
<input id="password" type="password" required />
</div>
<div style="margin-top: 12px; display: flex; gap: 10px;">
<button id="loginBtn" type="submit">登录</button>
</div>
<p id="loginError" class="danger" style="margin-bottom: 0;"></p>
</form>
</div>
<div id="dashboard" class="hidden">
<div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 12px;">
<h1 style="margin: 0;">Analytics 聚合看板</h1>
<button id="logoutBtn" class="btn-ghost">退出</button>
</div>
<div class="card" style="margin-bottom: 12px;">
<div class="toolbar">
<div class="field">
<label for="startDate">开始日期</label>
<input id="startDate" type="date" />
</div>
<div class="field">
<label for="endDate">结束日期</label>
<input id="endDate" type="date" />
</div>
<button id="loadBtn" type="button">加载数据</button>
</div>
<div id="status" class="muted"></div>
</div>
<div class="grid stats" id="summaryCards"></div>
<div class="card" style="margin-top: 12px;">
<h3 style="margin-top: 0;">按天趋势</h3>
<div id="dailyBars"></div>
</div>
<div class="card" style="margin-top: 12px; overflow-x: auto;">
<h3 style="margin-top: 0;">按天明细</h3>
<table>
<thead>
<tr>
<th>日期</th>
<th>DAU</th>
<th>登录数</th>
<th>对话完成数</th>
<th>平均停留(ms)</th>
</tr>
</thead>
<tbody id="dailyTable"></tbody>
</table>
</div>
</div>
</div>
<script>
const loginCard = document.getElementById("loginCard");
const dashboard = document.getElementById("dashboard");
const loginForm = document.getElementById("loginForm");
const loginBtn = document.getElementById("loginBtn");
const loginError = document.getElementById("loginError");
const logoutBtn = document.getElementById("logoutBtn");
const loadBtn = document.getElementById("loadBtn");
const startDateInput = document.getElementById("startDate");
const endDateInput = document.getElementById("endDate");
const statusEl = document.getElementById("status");
const summaryCards = document.getElementById("summaryCards");
const dailyBars = document.getElementById("dailyBars");
const dailyTable = document.getElementById("dailyTable");
const AUTH_KEY = "analytics_logged_in";
function formatDate(date) {
const y = date.getUTCFullYear();
const m = String(date.getUTCMonth() + 1).padStart(2, "0");
const d = String(date.getUTCDate()).padStart(2, "0");
return `${y}-${m}-${d}`;
}
function dateRange(startDate, endDate) {
const list = [];
const cursor = new Date(`${startDate}T00:00:00Z`);
const end = new Date(`${endDate}T00:00:00Z`);
while (cursor <= end) {
list.push(formatDate(cursor));
cursor.setUTCDate(cursor.getUTCDate() + 1);
}
return list;
}
function parseJsonl(text) {
if (!text.trim()) return [];
return text
.split("\n")
.map((line) => line.trim())
.filter(Boolean)
.map((line) => {
try {
return JSON.parse(line);
} catch {
return null;
}
})
.filter(Boolean);
}
async function fetchDayEvents(date) {
const res = await fetch(`/analytics-data/${date}.jsonl`, {
method: "GET",
});
if (res.status === 404) {
return [];
}
if (!res.ok) {
throw new Error(`读取 ${date} 失败: ${res.status}`);
}
return parseJsonl(await res.text());
}
function aggregateDay(events) {
const users = new Set();
let loginCount = 0;
let chatCount = 0;
let staySum = 0;
let stayCnt = 0;
for (const event of events) {
if (event.user_id) users.add(event.user_id);
if (event.event_type === "session.login") loginCount += 1;
if (event.event_type === "agent.chat_completed") chatCount += 1;
if (event.event_type === "page.view") {
const stay = event.metrics && event.metrics.stay_duration_ms;
if (typeof stay === "number") {
staySum += stay;
stayCnt += 1;
}
}
}
return {
dau: users.size,
loginCount,
chatCount,
avgStay: stayCnt ? staySum / stayCnt : 0,
};
}
function renderSummary(rows) {
const allUsers = new Set();
let totalLogins = 0;
let totalChats = 0;
let staySum = 0;
let stayCnt = 0;
rows.forEach((row) => {
row.users.forEach((u) => allUsers.add(u));
totalLogins += row.loginCount;
totalChats += row.chatCount;
staySum += row.staySum;
stayCnt += row.stayCnt;
});
const cards = [
{ label: "DAU(区间去重)", value: allUsers.size },
{ label: "总登录次数", value: totalLogins },
{ label: "总对话完成数", value: totalChats },
{ label: "平均停留(ms)", value: Math.round(stayCnt ? staySum / stayCnt : 0) },
];
summaryCards.innerHTML = cards
.map((card) => `<div class="card"><div class="muted">${card.label}</div><div class="value">${card.value}</div></div>`)
.join("");
}
function renderDaily(rows) {
const maxLogin = Math.max(1, ...rows.map((r) => r.loginCount));
dailyBars.innerHTML = rows
.map((r) => {
const width = Math.max(2, Math.round((r.loginCount / maxLogin) * 100));
return `<div class="chart-row"><div style="width:92px">${r.date}</div><div class="bar" style="width:${width}%"></div><div class="muted">登录 ${r.loginCount}</div></div>`;
})
.join("");
dailyTable.innerHTML = rows
.map(
(r) => `<tr>
<td>${r.date}</td>
<td>${r.dau}</td>
<td>${r.loginCount}</td>
<td>${r.chatCount}</td>
<td>${Math.round(r.avgStay)}</td>
</tr>`,
)
.join("");
}
async function loadData() {
const startDate = startDateInput.value;
const endDate = endDateInput.value;
if (!startDate || !endDate || startDate > endDate) {
statusEl.textContent = "请选择有效日期区间";
return;
}
loadBtn.disabled = true;
statusEl.textContent = "正在读取并聚合数据...";
try {
const dates = dateRange(startDate, endDate);
const rows = [];
for (const date of dates) {
const events = await fetchDayEvents(date);
const users = new Set();
let loginCount = 0;
let chatCount = 0;
let staySum = 0;
let stayCnt = 0;
for (const event of events) {
if (event.user_id) users.add(event.user_id);
if (event.event_type === "session.login") loginCount += 1;
if (event.event_type === "agent.chat_completed") chatCount += 1;
if (event.event_type === "page.view") {
const stay = event.metrics && event.metrics.stay_duration_ms;
if (typeof stay === "number") {
staySum += stay;
stayCnt += 1;
}
}
}
rows.push({
date,
users,
dau: users.size,
loginCount,
chatCount,
staySum,
stayCnt,
avgStay: stayCnt ? staySum / stayCnt : 0,
});
}
renderSummary(rows);
renderDaily(rows);
statusEl.textContent = `加载完成,共 ${rows.length}`;
} catch (err) {
statusEl.textContent = err.message || "加载失败";
} finally {
loadBtn.disabled = false;
}
}
async function login(password) {
const res = await fetch("/api/v1/analytics/login", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ password }),
});
if (!res.ok) {
throw new Error("密码错误");
}
sessionStorage.setItem(AUTH_KEY, "1");
}
function enterDashboard() {
loginCard.classList.add("hidden");
dashboard.classList.remove("hidden");
}
function exitDashboard() {
sessionStorage.removeItem(AUTH_KEY);
dashboard.classList.add("hidden");
loginCard.classList.remove("hidden");
}
loginForm.addEventListener("submit", async (event) => {
event.preventDefault();
loginError.textContent = "";
loginBtn.disabled = true;
try {
const password = document.getElementById("password").value;
await login(password);
enterDashboard();
await loadData();
} catch (err) {
loginError.textContent = err.message || "登录失败";
} finally {
loginBtn.disabled = false;
}
});
loadBtn.addEventListener("click", loadData);
logoutBtn.addEventListener("click", exitDashboard);
(function init() {
const today = new Date();
const start = new Date(today.getTime() - 7 * 24 * 60 * 60 * 1000);
startDateInput.value = formatDate(start);
endDateInput.value = formatDate(today);
if (sessionStorage.getItem(AUTH_KEY) === "1") {
enterDashboard();
loadData();
}
})();
</script>
</body>
</html>
+2
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from fastapi import APIRouter
from v1.agent.router import router as agent_router
from v1.analytics.router import router as analytics_router
from v1.app.router import router as app_router
from v1.automation_jobs.router import router as automation_jobs_router
from v1.auth.router import router as auth_router
@@ -18,6 +19,7 @@ router = APIRouter(prefix="/api/v1")
router.include_router(app_router)
router.include_router(auth_router)
router.include_router(agent_router)
router.include_router(analytics_router)
router.include_router(automation_jobs_router)
router.include_router(friendships_router)
router.include_router(memories_router)