"""手动触发反馈报告生成并推送邮件 用法: cd /home/qzl/Code/eryao/.worktrees/feat-user-feedback PYTHONPATH=backend/src uv run python backend/scripts/generate_feedback_report.py [--all] [--no-email] """ from __future__ import annotations import argparse import asyncio from datetime import datetime, timedelta from pathlib import Path import sys sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) from sqlalchemy import select from core.config.settings import config from core.db.session import AsyncSessionLocal from models.user_feedback import UserFeedback from v1.feedback.report import generate_feedback_report from v1.feedback.tasks import send_feedback_report_email async def _fetch_all_feedbacks() -> list[UserFeedback]: async with AsyncSessionLocal() as session: stmt = select(UserFeedback).order_by(UserFeedback.created_at.desc()) result = await session.execute(stmt) return list(result.scalars().all()) async def main(): parser = argparse.ArgumentParser( description="Generate feedback report and send email" ) parser.add_argument( "--all", action="store_true", help="Generate report for all feedbacks" ) parser.add_argument("--no-email", action="store_true", help="Skip email sending") args = parser.parse_args() feedbacks = await _fetch_all_feedbacks() report_path: Path | None = None try: if args.all: if not feedbacks: print("No feedbacks found in database") return reports_dir = Path(config.runtime.log_dir).parent / "reports" report_path = await generate_feedback_report( feedbacks, output_dir=reports_dir ) print(f"\n=== 全量报告生成完成 ===") else: if not feedbacks: print("No feedbacks found in database") if not args.no_email: print("\n发送无反馈通知邮件...") now = datetime.now() push_hour = 10 end_time = now.replace( hour=push_hour, minute=0, second=0, microsecond=0 ) start_time = end_time - timedelta(days=1) await send_feedback_report_email( feedbacks=[], start_time=start_time, end_time=end_time, push_hour=push_hour, report_path=None, ) print("无反馈通知邮件已发送") return reports_dir = Path(config.runtime.log_dir).parent / "reports" report_path = await generate_feedback_report( feedbacks, output_dir=reports_dir ) print(f"\n=== 报告生成完成 ===") print(f"文件路径: {report_path}") print(f"文件大小: {report_path.stat().st_size:,} bytes") if not args.no_email: now = datetime.now() push_hour = 10 end_time = now.replace(hour=push_hour, minute=0, second=0, microsecond=0) start_time = end_time - timedelta(days=1) print(f"\n发送邮件到: {config.feedback_report.email}") print( f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M')} ~ {end_time.strftime('%Y-%m-%d %H:%M')}" ) print(f"反馈数量: {len(feedbacks)}") await send_feedback_report_email( feedbacks=feedbacks, start_time=start_time, end_time=end_time, push_hour=push_hour, report_path=report_path, ) print("邮件发送成功") else: print("\n跳过邮件发送 (--no-email)") finally: if report_path and report_path.exists(): report_path.unlink() print(f"\n临时文件已清理: {report_path}") if __name__ == "__main__": asyncio.run(main())