feat: 添加 Analytics 分析功能(行为追踪、错误码、协议更新)

This commit is contained in:
qzl
2026-04-02 11:52:23 +08:00
parent b101826de5
commit 7b6dbe72c3
24 changed files with 682 additions and 52 deletions
+10
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, AsyncGenerator
from fastapi import FastAPI, HTTPException, Request
@@ -8,6 +9,7 @@ from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from starlette.staticfiles import StaticFiles
from starlette.exceptions import HTTPException as StarletteHTTPException
from core.config.settings import config
@@ -59,6 +61,14 @@ app.add_middleware(
)
app.include_router(mobile_router)
_analytics_web_dir = Path(__file__).resolve().parent / "v1" / "analytics" / "web"
app.mount(
"/analytics",
StaticFiles(directory=_analytics_web_dir, html=True),
name="analytics-web",
)
logger.info(
"Web application initialized",
environment=config.runtime.environment,
+3 -3
View File
@@ -48,12 +48,12 @@ class TaskiqQueueClient:
def _select_queue_task(command: dict[str, object]) -> Any:
from core.agentscope.runtime.tasks import (
run_command_task_agent,
run_command_task_automation,
run_command_task_general,
)
queue = str(command.get("queue", "agent")).strip().lower()
if queue == "automation":
return run_command_task_automation
if queue == "general":
return run_command_task_general
return run_command_task_agent
async def enqueue(
+133 -3
View File
@@ -1,6 +1,16 @@
from fastapi import APIRouter, HTTPException, status
import base64
import hashlib
import hmac
import json
import re
import time
from pathlib import Path
from fastapi import APIRouter, Header, status
from fastapi.responses import PlainTextResponse
from core.config.settings import config
from core.http.errors import ApiProblemError
from core.logging import get_logger
from v1.analytics.schemas import (
AnalyticsBatchRequest,
@@ -15,6 +25,94 @@ from v1.analytics.tasks import write_analytics_events
logger = get_logger("v1.analytics.router")
router = APIRouter(prefix="/analytics", tags=["analytics"])
_TOKEN_TTL_SECONDS = 300
_DATE_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$")
def _get_signing_secret() -> bytes:
return config.analytics.password.encode("utf-8")
def _issue_access_token() -> str:
expires_at = int(time.time()) + _TOKEN_TTL_SECONDS
payload = {"exp": expires_at}
payload_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8")
signature = hmac.new(_get_signing_secret(), payload_bytes, hashlib.sha256).digest()
return (
base64.urlsafe_b64encode(payload_bytes).decode("utf-8")
+ "."
+ base64.urlsafe_b64encode(signature).decode("utf-8")
)
def _parse_bearer_token(authorization: str | None) -> str:
if authorization is None:
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_AUTH_HEADER_MISSING",
detail="Missing authorization header",
)
if not authorization.startswith("Bearer "):
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_AUTH_SCHEME_INVALID",
detail="Invalid authorization scheme",
)
token = authorization.removeprefix("Bearer ").strip()
if not token:
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_AUTH_TOKEN_MISSING",
detail="Missing token",
)
return token
def _verify_access_token(token: str) -> None:
parts = token.split(".")
if len(parts) != 2:
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_TOKEN_MALFORMED",
detail="Malformed token",
)
payload_b64, signature_b64 = parts
try:
payload_bytes = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
provided_signature = base64.urlsafe_b64decode(signature_b64.encode("utf-8"))
except Exception as exc:
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_TOKEN_MALFORMED",
detail="Malformed token",
) from exc
expected_signature = hmac.new(
_get_signing_secret(), payload_bytes, hashlib.sha256
).digest()
if not hmac.compare_digest(provided_signature, expected_signature):
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_TOKEN_SIGNATURE_INVALID",
detail="Invalid token signature",
)
try:
payload = json.loads(payload_bytes)
except json.JSONDecodeError as exc:
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_TOKEN_PAYLOAD_INVALID",
detail="Malformed token payload",
) from exc
expires_at = payload.get("exp")
if not isinstance(expires_at, int) or int(time.time()) > expires_at:
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_TOKEN_EXPIRED",
detail="Token expired",
)
@router.post("/events", response_model=AnalyticsBatchResponse)
@@ -35,10 +133,42 @@ async def login(request: AnalyticsLoginRequest) -> AnalyticsLoginResponse:
"""Analytics Dashboard 登录"""
if request.password != config.analytics.password:
logger.warning("Analytics login failed: invalid password")
raise HTTPException(
raise ApiProblemError(
status_code=status.HTTP_401_UNAUTHORIZED,
code="ANALYTICS_LOGIN_PASSWORD_INVALID",
detail="Invalid password",
)
logger.info("Analytics login success")
return AnalyticsLoginResponse(success=True)
return AnalyticsLoginResponse(
success=True,
data_base_url="/api/v1/analytics/data",
token=_issue_access_token(),
)
@router.get("/data/{date}", response_class=PlainTextResponse)
async def read_day_events(
date: str,
authorization: str | None = Header(default=None),
) -> PlainTextResponse:
token = _parse_bearer_token(authorization)
_verify_access_token(token)
if not _DATE_PATTERN.match(date):
raise ApiProblemError(
status_code=status.HTTP_400_BAD_REQUEST,
code="ANALYTICS_DATE_FORMAT_INVALID",
detail="Invalid date format",
)
file_path = Path(config.analytics.data_path) / f"{date}.jsonl"
if not file_path.exists() or not file_path.is_file():
raise ApiProblemError(
status_code=status.HTTP_404_NOT_FOUND,
code="ANALYTICS_FILE_NOT_FOUND",
detail="Analytics file not found",
)
content = file_path.read_text(encoding="utf-8")
return PlainTextResponse(content=content, media_type="application/x-ndjson")
+2
View File
@@ -52,3 +52,5 @@ class AnalyticsLoginRequest(BaseModel):
class AnalyticsLoginResponse(BaseModel):
success: bool
data_base_url: str
token: str
+22 -1
View File
@@ -204,6 +204,8 @@
const dailyTable = document.getElementById("dailyTable");
const AUTH_KEY = "analytics_logged_in";
const DATA_BASE_URL_KEY = "analytics_data_base_url";
const AUTH_TOKEN_KEY = "analytics_auth_token";
function formatDate(date) {
const y = date.getUTCFullYear();
@@ -240,8 +242,11 @@
}
async function fetchDayEvents(date) {
const res = await fetch(`/analytics-data/${date}.jsonl`, {
const dataBaseUrl = sessionStorage.getItem(DATA_BASE_URL_KEY) || "/api/v1/analytics/data";
const token = sessionStorage.getItem(AUTH_TOKEN_KEY);
const res = await fetch(`${dataBaseUrl}/${date}`, {
method: "GET",
headers: token ? { Authorization: `Bearer ${token}` } : {},
});
if (res.status === 404) {
return [];
@@ -397,7 +402,17 @@
if (!res.ok) {
throw new Error("密码错误");
}
const payload = await res.json();
const dataBaseUrl = typeof payload.data_base_url === "string" && payload.data_base_url
? payload.data_base_url
: "/api/v1/analytics/data";
const token = typeof payload.token === "string" ? payload.token : "";
if (!token) {
throw new Error("登录响应缺少 token");
}
sessionStorage.setItem(AUTH_KEY, "1");
sessionStorage.setItem(DATA_BASE_URL_KEY, dataBaseUrl);
sessionStorage.setItem(AUTH_TOKEN_KEY, token);
}
function enterDashboard() {
@@ -407,6 +422,8 @@
function exitDashboard() {
sessionStorage.removeItem(AUTH_KEY);
sessionStorage.removeItem(DATA_BASE_URL_KEY);
sessionStorage.removeItem(AUTH_TOKEN_KEY);
dashboard.classList.add("hidden");
loginCard.classList.remove("hidden");
}
@@ -436,6 +453,10 @@
startDateInput.value = formatDate(start);
endDateInput.value = formatDate(today);
if (sessionStorage.getItem(AUTH_KEY) === "1") {
if (!sessionStorage.getItem(AUTH_TOKEN_KEY)) {
exitDashboard();
return;
}
enterDashboard();
loadData();
}