Files
eryao/backend/scripts/generate_feedback_report.py
qzl 6a2a9d2c87 feat(feedback): implement user feedback collection system with email reporting
Backend:
- Add user_feedback table with RLS policy
- Create feedback submission API (multipart/form-data)
- Implement xlsx report generation with embedded images
- Add scheduled email delivery via Feishu SMTP
- Create HTML email templates (daily_report, no_feedback)

Frontend:
- Add feedback screen with type selection and image picker
- Support anonymous submission via skipAuth flag
- Collect device info and app version

Protocol:
- Document feedback API contract and error codes
- Update http-error-codes.md with FEEDBACK_* codes
2026-04-20 12:49:54 +08:00

120 lines
4.0 KiB
Python

"""手动触发反馈报告生成并推送邮件
用法:
cd /home/qzl/Code/eryao/.worktrees/feat-user-feedback
PYTHONPATH=backend/src uv run python backend/scripts/generate_feedback_report.py [--all] [--no-email]
"""
from __future__ import annotations
import argparse
import asyncio
from datetime import datetime, timedelta
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from sqlalchemy import select
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from models.user_feedback import UserFeedback
from v1.feedback.report import generate_feedback_report
from v1.feedback.tasks import send_feedback_report_email
async def _fetch_all_feedbacks() -> list[UserFeedback]:
async with AsyncSessionLocal() as session:
stmt = select(UserFeedback).order_by(UserFeedback.created_at.desc())
result = await session.execute(stmt)
return list(result.scalars().all())
async def main():
parser = argparse.ArgumentParser(
description="Generate feedback report and send email"
)
parser.add_argument(
"--all", action="store_true", help="Generate report for all feedbacks"
)
parser.add_argument("--no-email", action="store_true", help="Skip email sending")
args = parser.parse_args()
feedbacks = await _fetch_all_feedbacks()
report_path: Path | None = None
try:
if args.all:
if not feedbacks:
print("No feedbacks found in database")
return
reports_dir = Path(config.runtime.log_dir).parent / "reports"
report_path = await generate_feedback_report(
feedbacks, output_dir=reports_dir
)
print(f"\n=== 全量报告生成完成 ===")
else:
if not feedbacks:
print("No feedbacks found in database")
if not args.no_email:
print("\n发送无反馈通知邮件...")
now = datetime.now()
push_hour = 10
end_time = now.replace(
hour=push_hour, minute=0, second=0, microsecond=0
)
start_time = end_time - timedelta(days=1)
await send_feedback_report_email(
feedbacks=[],
start_time=start_time,
end_time=end_time,
push_hour=push_hour,
report_path=None,
)
print("无反馈通知邮件已发送")
return
reports_dir = Path(config.runtime.log_dir).parent / "reports"
report_path = await generate_feedback_report(
feedbacks, output_dir=reports_dir
)
print(f"\n=== 报告生成完成 ===")
print(f"文件路径: {report_path}")
print(f"文件大小: {report_path.stat().st_size:,} bytes")
if not args.no_email:
now = datetime.now()
push_hour = 10
end_time = now.replace(hour=push_hour, minute=0, second=0, microsecond=0)
start_time = end_time - timedelta(days=1)
print(f"\n发送邮件到: {config.feedback_report.email}")
print(
f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M')} ~ {end_time.strftime('%Y-%m-%d %H:%M')}"
)
print(f"反馈数量: {len(feedbacks)}")
await send_feedback_report_email(
feedbacks=feedbacks,
start_time=start_time,
end_time=end_time,
push_hour=push_hour,
report_path=report_path,
)
print("邮件发送成功")
else:
print("\n跳过邮件发送 (--no-email)")
finally:
if report_path and report_path.exists():
report_path.unlink()
print(f"\n临时文件已清理: {report_path}")
if __name__ == "__main__":
asyncio.run(main())