feat(feedback): implement user feedback collection system with email reporting

Backend:
- Add user_feedback table with RLS policy
- Create feedback submission API (multipart/form-data)
- Implement xlsx report generation with embedded images
- Add scheduled email delivery via Feishu SMTP
- Create HTML email templates (daily_report, no_feedback)

Frontend:
- Add feedback screen with type selection and image picker
- Support anonymous submission via skipAuth flag
- Collect device info and app version

Protocol:
- Document feedback API contract and error codes
- Update http-error-codes.md with FEEDBACK_* codes
This commit is contained in:
qzl
2026-04-20 12:49:54 +08:00
parent 913ed26f8d
commit 6a2a9d2c87
46 changed files with 4768 additions and 9 deletions
@@ -0,0 +1,118 @@
"""create user_feedback table
Revision ID: 20260417_0001
Revises: 20260416_0003
Create Date: 2026-04-17
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID
revision: str = "20260417_0001"
down_revision: Union[str, Sequence[str], None] = "20260416_0003"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"user_feedback",
sa.Column(
"id",
UUID(as_uuid=True),
server_default=sa.text("gen_random_uuid()"),
primary_key=True,
),
sa.Column(
"user_id",
UUID(as_uuid=True),
sa.ForeignKey("auth.users.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"feedback_type",
sa.String(20),
nullable=False,
server_default="other",
),
sa.Column("content", sa.Text, nullable=False),
sa.Column(
"images",
JSONB,
nullable=False,
server_default=sa.text("'[]'::jsonb"),
),
sa.Column(
"device_info",
JSONB,
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column("app_version", sa.String(20), nullable=False),
sa.Column("os_version", sa.String(50), nullable=False),
sa.Column(
"status",
sa.String(20),
nullable=False,
server_default="pending",
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
op.create_index("ix_user_feedback_user_id", "user_feedback", ["user_id"])
op.create_index("ix_user_feedback_created_at", "user_feedback", ["created_at"])
op.create_index("ix_user_feedback_status", "user_feedback", ["status"])
op.execute("COMMENT ON TABLE user_feedback IS '用户反馈表'")
op.execute(
"COMMENT ON COLUMN user_feedback.user_id IS "
"'用户ID,NULL表示匿名(勾选不上传我的个人信息)'"
)
op.execute(
"COMMENT ON COLUMN user_feedback.feedback_type IS "
"'反馈类型: bug/suggestion/other'"
)
op.execute("COMMENT ON COLUMN user_feedback.content IS '反馈内容,最多500字'")
op.execute(
"COMMENT ON COLUMN user_feedback.images IS '图片Storage路径列表,最多3张'"
)
op.execute(
"COMMENT ON COLUMN user_feedback.device_info IS "
"'设备信息JSON,匿名时照样采集(不涉及隐私)'"
)
op.execute(
"COMMENT ON COLUMN user_feedback.status IS '处理状态: pending/processed'"
)
op.execute("ALTER TABLE public.user_feedback ENABLE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY "Service role full access on user_feedback"
ON public.user_feedback
FOR ALL
TO service_role
USING (true)
WITH CHECK (true)
""")
def downgrade() -> None:
op.execute(
'DROP POLICY IF EXISTS "Service role full access on user_feedback" ON public.user_feedback'
)
op.execute("ALTER TABLE public.user_feedback DISABLE ROW LEVEL SECURITY")
op.drop_table("user_feedback")
+119
View File
@@ -0,0 +1,119 @@
"""手动触发反馈报告生成并推送邮件
用法:
cd /home/qzl/Code/eryao/.worktrees/feat-user-feedback
PYTHONPATH=backend/src uv run python backend/scripts/generate_feedback_report.py [--all] [--no-email]
"""
from __future__ import annotations
import argparse
import asyncio
from datetime import datetime, timedelta
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from sqlalchemy import select
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from models.user_feedback import UserFeedback
from v1.feedback.report import generate_feedback_report
from v1.feedback.tasks import send_feedback_report_email
async def _fetch_all_feedbacks() -> list[UserFeedback]:
async with AsyncSessionLocal() as session:
stmt = select(UserFeedback).order_by(UserFeedback.created_at.desc())
result = await session.execute(stmt)
return list(result.scalars().all())
async def main():
parser = argparse.ArgumentParser(
description="Generate feedback report and send email"
)
parser.add_argument(
"--all", action="store_true", help="Generate report for all feedbacks"
)
parser.add_argument("--no-email", action="store_true", help="Skip email sending")
args = parser.parse_args()
feedbacks = await _fetch_all_feedbacks()
report_path: Path | None = None
try:
if args.all:
if not feedbacks:
print("No feedbacks found in database")
return
reports_dir = Path(config.runtime.log_dir).parent / "reports"
report_path = await generate_feedback_report(
feedbacks, output_dir=reports_dir
)
print(f"\n=== 全量报告生成完成 ===")
else:
if not feedbacks:
print("No feedbacks found in database")
if not args.no_email:
print("\n发送无反馈通知邮件...")
now = datetime.now()
push_hour = 10
end_time = now.replace(
hour=push_hour, minute=0, second=0, microsecond=0
)
start_time = end_time - timedelta(days=1)
await send_feedback_report_email(
feedbacks=[],
start_time=start_time,
end_time=end_time,
push_hour=push_hour,
report_path=None,
)
print("无反馈通知邮件已发送")
return
reports_dir = Path(config.runtime.log_dir).parent / "reports"
report_path = await generate_feedback_report(
feedbacks, output_dir=reports_dir
)
print(f"\n=== 报告生成完成 ===")
print(f"文件路径: {report_path}")
print(f"文件大小: {report_path.stat().st_size:,} bytes")
if not args.no_email:
now = datetime.now()
push_hour = 10
end_time = now.replace(hour=push_hour, minute=0, second=0, microsecond=0)
start_time = end_time - timedelta(days=1)
print(f"\n发送邮件到: {config.feedback_report.email}")
print(
f"时间范围: {start_time.strftime('%Y-%m-%d %H:%M')} ~ {end_time.strftime('%Y-%m-%d %H:%M')}"
)
print(f"反馈数量: {len(feedbacks)}")
await send_feedback_report_email(
feedbacks=feedbacks,
start_time=start_time,
end_time=end_time,
push_hour=push_hour,
report_path=report_path,
)
print("邮件发送成功")
else:
print("\n跳过邮件发送 (--no-email)")
finally:
if report_path and report_path.exists():
report_path.unlink()
print(f"\n临时文件已清理: {report_path}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,26 @@
"""手动触发 worker-general 定时任务:生成反馈报告
用法:
cd /home/qzl/Code/eryao/.worktrees/feat-user-feedback
PYTHONPATH=backend/src uv run python backend/scripts/trigger_feedback_report.py
"""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from core.taskiq.app import worker_general_broker
from v1.feedback.tasks import generate_daily_feedback_report
def main():
task = generate_daily_feedback_report.kiq()
result = worker_general_broker.wait_result(task, timeout=120)
print(f"Task result: {result.return_value}")
if __name__ == "__main__":
main()
+25
View File
@@ -153,8 +153,13 @@ class StorageSettings(BaseModel):
bucket: str = Field(default="avatars", min_length=3, max_length=63)
max_size_mb: int = Field(default=2, ge=1, le=10)
class FeedbackSettings(BaseModel):
bucket: str = Field(default="feedback-images", min_length=3, max_length=63)
max_size_mb: int = Field(default=5, ge=1, le=20)
attachment: AttachmentSettings = Field(default_factory=AttachmentSettings)
avatar: AvatarSettings = Field(default_factory=AvatarSettings)
feedback: FeedbackSettings = Field(default_factory=FeedbackSettings)
class LlmSettings(BaseModel):
@@ -235,6 +240,22 @@ def _resolve_env_file() -> str:
PROJECT_ROOT = _resolve_project_root()
class FeedbackReportSettings(BaseModel):
email: str = Field(default="support@example.com", description="客服邮箱")
cron: str = Field(default="0 10 * * *", description="报告生成cron表达式")
enabled: bool = Field(default=False, description="是否启用报告推送")
class EmailSettings(BaseModel):
host: str = Field(default="smtp.feishu.cn", description="SMTP 服务器地址")
port: int = Field(default=465, ge=1, le=65535, description="SMTP 端口")
username: str = Field(default="", description="SMTP 用户名")
password: SecretStr = Field(default=SecretStr(""), description="SMTP 密码")
use_ssl: bool = Field(default=True, description="是否使用 SSL")
from_address: str = Field(default="noreply@example.com", description="发件人地址")
from_name: str = Field(default="Eryao Feedback", description="发件人显示名称")
class Settings(BaseSettings):
runtime: RuntimeSettings = RuntimeSettings()
cors: CorsSettings = CorsSettings()
@@ -250,6 +271,10 @@ class Settings(BaseSettings):
taskiq: TaskiqSettings = Field(default_factory=TaskiqSettings)
agent_runtime: AgentRuntimeSettings = Field(default_factory=AgentRuntimeSettings)
points_policy: PointsPolicySettings = Field(default_factory=PointsPolicySettings)
feedback_report: FeedbackReportSettings = Field(
default_factory=FeedbackReportSettings
)
email: EmailSettings = Field(default_factory=EmailSettings)
@computed_field
@property
View File
+74
View File
@@ -0,0 +1,74 @@
from __future__ import annotations
from dataclasses import dataclass, field
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import aiosmtplib
from structlog import get_logger
from core.config.settings import config
logger = get_logger("core.email.sender")
@dataclass
class EmailAttachment:
filename: str
content: bytes
content_type: str = "application/octet-stream"
@dataclass
class EmailMessage:
to: str
subject: str
body_html: str
attachments: list[EmailAttachment] = field(default_factory=list)
class EmailSender:
def __init__(self) -> None:
self._settings = config.email
async def send(self, message: EmailMessage) -> bool:
msg = MIMEMultipart()
msg["From"] = f"{self._settings.from_name} <{self._settings.from_address}>"
msg["To"] = message.to
msg["Subject"] = message.subject
msg.attach(MIMEText(message.body_html, "html", "utf-8"))
for attachment in message.attachments:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.content)
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename*=UTF-8''{attachment.filename}",
)
msg.attach(part)
try:
await aiosmtplib.send(
msg,
hostname=self._settings.host,
port=self._settings.port,
username=self._settings.username,
password=self._settings.password.get_secret_value(),
use_tls=self._settings.use_ssl,
)
logger.info("Email sent", to=message.to, subject=message.subject)
return True
except Exception as e:
logger.error(
"Failed to send email",
to=message.to,
subject=message.subject,
error=str(e),
)
raise
email_sender = EmailSender()
+16
View File
@@ -0,0 +1,16 @@
from __future__ import annotations
from pathlib import Path
from string import Template
from structlog import get_logger
logger = get_logger("core.email.template_loader")
_TEMPLATES_DIR = Path(__file__).parent / "templates"
def load_template(category: str, name: str) -> Template:
template_path = _TEMPLATES_DIR / category / name
content = template_path.read_text(encoding="utf-8")
return Template(content)
@@ -0,0 +1,68 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="margin: 0; padding: 0; background-color: #f4f5f7; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;">
<table width="100%" cellpadding="0" cellspacing="0" style="background-color: #f4f5f7; padding: 32px 0;">
<tr>
<td align="center">
<table width="600" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; overflow: hidden; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<tr>
<td style="background-color: #4472C4; padding: 24px 32px;">
<h1 style="margin: 0; color: #ffffff; font-size: 20px; font-weight: 600;">
用户反馈日报
</h1>
</td>
</tr>
<tr>
<td style="padding: 24px 32px;">
<p style="margin: 0 0 16px; color: #333; font-size: 14px; line-height: 1.6;">
您好,
</p>
<p style="margin: 0 0 16px; color: #333; font-size: 14px; line-height: 1.6;">
以下是 <strong>${start_date} ${start_hour}:00</strong>
<strong>${end_date} ${end_hour}:00</strong> 期间的用户反馈汇总。
</p>
<table width="100%" cellpadding="0" cellspacing="0" style="margin: 20px 0;">
<tr>
<td style="padding: 16px; background-color: #f0f4ff; border-radius: 6px; text-align: center; width: 33%;">
<p style="margin: 0; font-size: 28px; font-weight: 700; color: #4472C4;">${total_count}</p>
<p style="margin: 4px 0 0; font-size: 12px; color: #666;">反馈总数</p>
</td>
<td style="padding: 16px; background-color: #fff3e0; border-radius: 6px; text-align: center; width: 33%;">
<p style="margin: 0; font-size: 28px; font-weight: 700; color: #ed6c02;">${bug_count}</p>
<p style="margin: 4px 0 0; font-size: 12px; color: #666;">问题反馈</p>
</td>
<td style="padding: 16px; background-color: #e8f5e9; border-radius: 6px; text-align: center; width: 33%;">
<p style="margin: 0; font-size: 28px; font-weight: 700; color: #2e7d32;">${suggestion_count}</p>
<p style="margin: 4px 0 0; font-size: 12px; color: #666;">功能建议</p>
</td>
</tr>
</table>
<p style="margin: 16px 0 0; color: #666; font-size: 13px; line-height: 1.6;">
详细反馈内容及截图请查看附件中的 xlsx 报告。
</p>
</td>
</tr>
<tr>
<td style="padding: 16px 32px; border-top: 1px solid #eee; background-color: #fafafa;">
<p style="margin: 0; font-size: 11px; color: #999; text-align: center;">
此邮件由 Eryao 反馈系统自动发送,请勿直接回复。<br>
报告生成时间:${generated_at}
</p>
</td>
</tr>
</table>
</td>
</tr>
</table>
</body>
</html>
@@ -0,0 +1,62 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="margin: 0; padding: 0; background-color: #f4f5f7; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;">
<table width="100%" cellpadding="0" cellspacing="0" style="background-color: #f4f5f7; padding: 32px 0;">
<tr>
<td align="center">
<table width="600" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; overflow: hidden; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<tr>
<td style="background-color: #95a5a6; padding: 24px 32px;">
<h1 style="margin: 0; color: #ffffff; font-size: 20px; font-weight: 600;">
用户反馈日报
</h1>
</td>
</tr>
<tr>
<td style="padding: 32px;">
<p style="margin: 0 0 16px; color: #333; font-size: 14px; line-height: 1.6;">
您好,
</p>
<p style="margin: 0 0 16px; color: #333; font-size: 14px; line-height: 1.6;">
<strong>${start_date} ${start_hour}:00</strong>
<strong>${end_date} ${end_hour}:00</strong> 期间暂无用户反馈。
</p>
<table width="100%" cellpadding="0" cellspacing="0" style="margin: 20px 0;">
<tr>
<td style="padding: 24px; background-color: #f8f9fa; border-radius: 6px; text-align: center;">
<p style="margin: 0; font-size: 32px;">📭</p>
<p style="margin: 8px 0 0; font-size: 14px; color: #999;">
今日无反馈数据
</p>
</td>
</tr>
</table>
<p style="margin: 16px 0 0; color: #999; font-size: 12px; line-height: 1.6;">
如有反馈,系统将在下一个报告周期自动推送。
</p>
</td>
</tr>
<tr>
<td style="padding: 16px 32px; border-top: 1px solid #eee; background-color: #fafafa;">
<p style="margin: 0; font-size: 11px; color: #999; text-align: center;">
此邮件由 Eryao 反馈系统自动发送,请勿直接回复。<br>
报告生成时间:${generated_at}
</p>
</td>
</tr>
</table>
</td>
</tr>
</table>
</body>
</html>
+43
View File
@@ -0,0 +1,43 @@
from __future__ import annotations
import uuid
from sqlalchemy import Index, String, Text, text
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column
from core.db.base import Base, TimestampMixin
class UserFeedback(TimestampMixin, Base):
__tablename__ = "user_feedback"
__table_args__ = (
Index("ix_user_feedback_user_id", "user_id"),
Index("ix_user_feedback_created_at", "created_at"),
Index("ix_user_feedback_status", "status"),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
server_default=text("gen_random_uuid()"),
primary_key=True,
)
user_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
nullable=True,
)
feedback_type: Mapped[str] = mapped_column(
String(20), nullable=False, server_default="other"
)
content: Mapped[str] = mapped_column(Text, nullable=False)
images: Mapped[list[str]] = mapped_column(
JSONB, nullable=False, server_default=text("'[]'::jsonb"), default=list
)
device_info: Mapped[dict] = mapped_column(
JSONB, nullable=False, server_default=text("'{}'::jsonb"), default=dict
)
app_version: Mapped[str] = mapped_column(String(20), nullable=False)
os_version: Mapped[str] = mapped_column(String(50), nullable=False)
status: Mapped[str] = mapped_column(
String(20), nullable=False, server_default="pending"
)
+2
View File
@@ -110,6 +110,7 @@ class SupabaseService(BaseServiceProvider):
buckets = [
(config.storage.attachment.bucket, False),
(config.storage.avatar.bucket, True),
(config.storage.feedback.bucket, False),
]
def _check_and_create() -> None:
@@ -170,6 +171,7 @@ class SupabaseService(BaseServiceProvider):
allowed_buckets = {
config.storage.attachment.bucket,
config.storage.avatar.bucket,
config.storage.feedback.bucket,
}
if bucket not in allowed_buckets:
raise RuntimeError("Invalid storage bucket")
View File
+49
View File
@@ -0,0 +1,49 @@
from __future__ import annotations
import asyncio
from typing import Annotated
from uuid import UUID
from fastapi import Depends, Header
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth.models import CurrentUser
from core.db import get_db
from services.base.supabase import supabase_service
from v1.feedback.repository import FeedbackRepository
from v1.feedback.service import FeedbackService
async def get_optional_user(
authorization: str | None = Header(default=None),
) -> CurrentUser | None:
if not authorization:
return None
scheme, _, token = authorization.partition(" ")
if scheme.lower() != "bearer" or not token:
return None
try:
client = supabase_service.get_client()
response = await asyncio.to_thread(client.auth.get_user, token)
user = getattr(response, "user", None)
user_id = getattr(user, "id", None)
if not isinstance(user_id, str) or not user_id:
return None
return CurrentUser(
id=UUID(user_id),
email=getattr(user, "email", None),
role=getattr(user, "role", None),
)
except Exception:
return None
def get_feedback_service(
session: Annotated[AsyncSession, Depends(get_db)],
) -> FeedbackService:
return FeedbackService(
repository=FeedbackRepository(session=session),
storage=supabase_service,
)
+220
View File
@@ -0,0 +1,220 @@
from __future__ import annotations
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from tempfile import gettempdir
from openpyxl import Workbook
from openpyxl.drawing.image import Image as XLImage
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from structlog import get_logger
from core.config.settings import config
from models.user_feedback import UserFeedback
from services.base.supabase import supabase_service
logger = get_logger("v1.feedback.report")
IMAGE_PREVIEW_WIDTH_PX = 100
IMAGE_PREVIEW_HEIGHT_PX = 100
IMAGE_COL_WIDTH = 15
IMAGE_ROW_HEIGHT = 80
async def download_image_from_storage(storage_path: str) -> bytes:
bucket_name = config.storage.feedback.bucket
try:
return await supabase_service.download_bytes(
bucket=bucket_name, path=storage_path
)
except Exception as e:
logger.warning("Failed to download image", path=storage_path, error=str(e))
raise
def _embed_image_in_cell(ws, img_data: bytes, row: int, col: int) -> bool:
try:
img_buffer = BytesIO(img_data)
xl_img = XLImage(img_buffer)
xl_img.width = IMAGE_PREVIEW_WIDTH_PX
xl_img.height = IMAGE_PREVIEW_HEIGHT_PX
xl_img.anchor = f"{get_column_letter(col)}{row}"
ws.add_image(xl_img)
return True
except Exception as e:
logger.warning("Failed to embed image", row=row, col=col, error=str(e))
return False
async def generate_feedback_report(
feedbacks: list[UserFeedback],
*,
output_dir: Path | None = None,
filename_prefix: str = "feedback_report",
) -> Path:
wb = Workbook()
ws = wb.active
assert ws is not None
ws.title = "用户反馈"
header_font = Font(bold=True, color="FFFFFF", size=11)
header_fill = PatternFill(
start_color="4472C4", end_color="4472C4", fill_type="solid"
)
header_alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style="thin", color="D9D9D9"),
right=Side(style="thin", color="D9D9D9"),
top=Side(style="thin", color="D9D9D9"),
bottom=Side(style="thin", color="D9D9D9"),
)
center_alignment = Alignment(horizontal="center", vertical="center")
headers = [
"序号",
"提交时间",
"反馈类型",
"用户身份",
"设备信息",
"App版本",
"系统版本",
"反馈内容",
"图片数量",
"状态",
"截图1",
"截图2",
"截图3",
]
col_widths = [
6,
18,
12,
12,
25,
10,
14,
50,
10,
10,
IMAGE_COL_WIDTH,
IMAGE_COL_WIDTH,
IMAGE_COL_WIDTH,
]
for col, (header, width) in enumerate(zip(headers, col_widths), start=1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
ws.column_dimensions[get_column_letter(col)].width = width
ws.row_dimensions[1].height = 25
ws.freeze_panes = "A2"
type_colors = {
"bug": PatternFill(start_color="FCE4D6", end_color="FCE4D6", fill_type="solid"),
"suggestion": PatternFill(
start_color="E2EFDA", end_color="E2EFDA", fill_type="solid"
),
"other": PatternFill(
start_color="FFF2CC", end_color="FFF2CC", fill_type="solid"
),
}
for row_idx, fb in enumerate(feedbacks, start=2):
ws.cell(row=row_idx, column=1, value=row_idx - 1).alignment = center_alignment
ws.cell(row=row_idx, column=2, value=fb.created_at.strftime("%Y-%m-%d %H:%M"))
type_cell = ws.cell(row=row_idx, column=3, value=fb.feedback_type)
type_cell.alignment = center_alignment
type_cell.fill = type_colors.get(fb.feedback_type, PatternFill())
if fb.user_id:
ws.cell(row=row_idx, column=4, value=f"用户:{str(fb.user_id)[:8]}...")
else:
ws.cell(row=row_idx, column=4, value="匿名").alignment = center_alignment
device_info = fb.device_info or {}
ws.cell(
row=row_idx,
column=5,
value=f"{device_info.get('platform', '-')} / {device_info.get('model', '-')}",
)
ws.cell(
row=row_idx, column=6, value=fb.app_version
).alignment = center_alignment
ws.cell(row=row_idx, column=7, value=fb.os_version)
content_cell = ws.cell(row=row_idx, column=8, value=fb.content)
content_cell.alignment = Alignment(vertical="top", wrap_text=True)
ws.cell(
row=row_idx, column=9, value=len(fb.images) if fb.images else 0
).alignment = center_alignment
status_cell = ws.cell(row=row_idx, column=10, value=fb.status)
status_cell.alignment = center_alignment
if fb.status == "pending":
status_cell.fill = PatternFill(
start_color="FFF2CC", end_color="FFF2CC", fill_type="solid"
)
elif fb.status == "processed":
status_cell.fill = PatternFill(
start_color="C6EFCE", end_color="C6EFCE", fill_type="solid"
)
ws.row_dimensions[row_idx].height = IMAGE_ROW_HEIGHT
images = fb.images or []
for img_idx in range(3):
img_col = 11 + img_idx
img_cell = ws.cell(row=row_idx, column=img_col, value="")
img_cell.border = thin_border
if img_idx < len(images):
try:
img_data = await download_image_from_storage(images[img_idx])
success = _embed_image_in_cell(ws, img_data, row_idx, img_col)
if not success:
img_cell.value = "加载失败"
img_cell.alignment = center_alignment
except Exception:
img_cell.value = "加载失败"
img_cell.alignment = center_alignment
else:
img_cell.value = "-"
img_cell.alignment = center_alignment
for col in range(1, 11):
ws.cell(row=row_idx, column=col).border = thin_border
ws.auto_filter.ref = (
f"A1:{get_column_letter(len(headers))}{max(2, len(feedbacks) + 1)}"
)
if output_dir is None:
output_dir = Path(gettempdir())
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
report_path = output_dir / f"{filename_prefix}_{timestamp}.xlsx"
wb.save(report_path)
logger.info(
"Feedback report generated", path=str(report_path), count=len(feedbacks)
)
return report_path
async def cleanup_report_file(report_path: Path) -> None:
try:
if report_path.exists():
report_path.unlink()
logger.info("Report file cleaned up", path=str(report_path))
except Exception as e:
logger.warning(
"Failed to cleanup report file", path=str(report_path), error=str(e)
)
+40
View File
@@ -0,0 +1,40 @@
from __future__ import annotations
from dataclasses import dataclass
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from models.user_feedback import UserFeedback
@dataclass
class FeedbackRepository:
session: AsyncSession
async def create_feedback(
self,
*,
user_id: UUID | None,
feedback_type: str,
content: str,
images: list[str],
device_info: dict,
app_version: str,
os_version: str,
) -> UserFeedback:
feedback = UserFeedback(
user_id=user_id,
feedback_type=feedback_type,
content=content,
images=images,
device_info=device_info,
app_version=app_version,
os_version=os_version,
)
self.session.add(feedback)
await self.session.flush()
return feedback
async def save(self) -> None:
await self.session.commit()
+58
View File
@@ -0,0 +1,58 @@
from __future__ import annotations
import json
from typing import Annotated
from fastapi import APIRouter, Depends, File, Form, UploadFile
from core.auth.models import CurrentUser
from core.http.errors import ApiProblemError, problem_payload
from v1.feedback.dependencies import get_feedback_service, get_optional_user
from v1.feedback.schemas import FeedbackCreateResponse
from v1.feedback.service import FeedbackService
router = APIRouter(prefix="/feedback", tags=["feedback"])
@router.post("", response_model=FeedbackCreateResponse, status_code=201)
async def create_feedback(
feedback_type: Annotated[str, Form(...)],
content: Annotated[str, Form(...)],
device_info: Annotated[str, Form(...)],
app_version: Annotated[str, Form(...)],
os_version: Annotated[str, Form(...)],
images: list[UploadFile] = File(default=[]),
user: CurrentUser | None = Depends(get_optional_user),
service: FeedbackService = Depends(get_feedback_service),
) -> FeedbackCreateResponse:
if len(images) > 3:
raise ApiProblemError(
status_code=400,
detail=problem_payload(
code="FEEDBACK_TOO_MANY_IMAGES",
detail="Maximum 3 images allowed",
),
)
try:
device_info_dict = json.loads(device_info)
except (json.JSONDecodeError, TypeError) as exc:
raise ApiProblemError(
status_code=422,
detail=problem_payload(
code="REQUEST_VALIDATION_ERROR",
detail="Invalid device_info JSON",
),
) from exc
user_id = user.id if isinstance(user, CurrentUser) else None
return await service.submit_feedback(
feedback_type=feedback_type,
content=content,
device_info=device_info_dict,
app_version=app_version,
os_version=os_version,
images=images,
user_id=user_id,
)
+15
View File
@@ -0,0 +1,15 @@
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, ConfigDict
FeedbackType = Literal["bug", "suggestion", "other"]
FeedbackStatus = Literal["pending", "processed"]
class FeedbackCreateResponse(BaseModel):
model_config = ConfigDict(extra="forbid")
id: str
created_at: str
+165
View File
@@ -0,0 +1,165 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from uuid import UUID
from fastapi import UploadFile
from structlog import get_logger
from core.config.settings import config
from core.http.errors import ApiProblemError, problem_payload
from services.base.supabase import SupabaseService
from v1.feedback.repository import FeedbackRepository
from v1.feedback.schemas import FeedbackCreateResponse
logger = get_logger("v1.feedback.service")
_MAX_IMAGES = 3
_ALLOWED_CONTENT_TYPES = {"image/jpeg", "image/png"}
_ALLOWED_FEEDBACK_TYPES = {"bug", "suggestion", "other"}
@dataclass
class FeedbackService:
repository: FeedbackRepository
storage: SupabaseService
async def submit_feedback(
self,
*,
feedback_type: str,
content: str,
device_info: dict,
app_version: str,
os_version: str,
images: list[UploadFile],
user_id: UUID | None,
) -> FeedbackCreateResponse:
self._validate_feedback_type(feedback_type)
self._validate_content(content)
self._validate_images(images)
image_paths: list[str] = []
if images:
image_paths = await self._upload_images(images)
feedback = await self.repository.create_feedback(
user_id=user_id,
feedback_type=feedback_type,
content=content,
images=image_paths,
device_info=device_info,
app_version=app_version,
os_version=os_version,
)
await self.repository.save()
logger.info(
"Feedback submitted",
feedback_id=str(feedback.id),
user_id=str(user_id) if user_id else "anonymous",
image_count=len(image_paths),
)
return FeedbackCreateResponse(
id=str(feedback.id),
created_at=feedback.created_at.isoformat(),
)
def _validate_feedback_type(self, feedback_type: str) -> None:
if feedback_type not in _ALLOWED_FEEDBACK_TYPES:
raise ApiProblemError(
status_code=422,
detail=problem_payload(
code="REQUEST_VALIDATION_ERROR",
detail=f"Invalid feedback_type: {feedback_type}",
),
)
def _validate_content(self, content: str) -> None:
stripped = content.strip()
if not stripped:
raise ApiProblemError(
status_code=400,
detail=problem_payload(
code="FEEDBACK_CONTENT_EMPTY",
detail="Feedback content must not be empty",
),
)
if len(stripped) > 500:
raise ApiProblemError(
status_code=400,
detail=problem_payload(
code="FEEDBACK_CONTENT_TOO_LONG",
detail="Feedback content exceeds 500 characters",
),
)
def _validate_images(self, images: list[UploadFile]) -> None:
if len(images) > _MAX_IMAGES:
raise ApiProblemError(
status_code=400,
detail=problem_payload(
code="FEEDBACK_TOO_MANY_IMAGES",
detail="Maximum 3 images allowed",
),
)
for image in images:
content_type = (image.content_type or "").lower().strip()
if content_type and content_type not in _ALLOWED_CONTENT_TYPES:
raise ApiProblemError(
status_code=400,
detail=problem_payload(
code="FEEDBACK_INVALID_IMAGE_TYPE",
detail=f"Unsupported image type: {content_type}",
),
)
async def _upload_images(self, images: list[UploadFile]) -> list[str]:
bucket = config.storage.feedback.bucket
max_bytes = config.storage.feedback.max_size_mb * 1024 * 1024
timestamp_prefix = datetime.now(timezone.utc).strftime("%Y-%m-%d")
paths: list[str] = []
for i, image in enumerate(images):
content = await image.read()
if len(content) > max_bytes:
raise ApiProblemError(
status_code=400,
detail=problem_payload(
code="FEEDBACK_IMAGE_TOO_LARGE",
detail=f"Image too large: {image.filename}",
),
)
content_type = image.content_type or "image/jpeg"
ext = "jpg" if content_type == "image/jpeg" else "png"
storage_path = (
f"{timestamp_prefix}/{datetime.now(timezone.utc).timestamp()}_{i}.{ext}"
)
try:
await self.storage.upload_bytes(
bucket=bucket,
path=storage_path,
content=content,
content_type=content_type,
)
except Exception as exc:
logger.exception(
"Feedback image upload failed",
path=storage_path,
filename=image.filename,
)
raise ApiProblemError(
status_code=500,
detail=problem_payload(
code="FEEDBACK_SUBMIT_FAILED",
detail="Failed to upload feedback images",
),
) from exc
paths.append(storage_path)
return paths
+226
View File
@@ -0,0 +1,226 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from pathlib import Path
from sqlalchemy import select
from structlog import get_logger
from taskiq_redis import RedisScheduleSource
from core.config.settings import config
from core.db.session import AsyncSessionLocal
from core.email.sender import EmailAttachment, EmailMessage, EmailSender
from core.email.template_loader import load_template
from core.taskiq.app import worker_general_broker
from models.user_feedback import UserFeedback
from v1.feedback.report import generate_feedback_report
logger = get_logger("v1.feedback.tasks")
async def _fetch_pending_feedbacks_by_time_range(
start_time: datetime, end_time: datetime
) -> list[UserFeedback]:
async with AsyncSessionLocal() as session:
stmt = (
select(UserFeedback)
.where(UserFeedback.created_at >= start_time)
.where(UserFeedback.created_at < end_time)
.where(UserFeedback.status == "pending")
.order_by(UserFeedback.created_at.desc())
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def _mark_feedbacks_processed(feedback_ids: list) -> None:
if not feedback_ids:
return
async with AsyncSessionLocal() as session:
stmt = select(UserFeedback).where(UserFeedback.id.in_(feedback_ids))
result = await session.execute(stmt)
for fb in result.scalars().all():
fb.status = "processed"
fb.updated_at = datetime.now(timezone.utc)
await session.commit()
logger.info("Feedbacks marked as processed", count=len(feedback_ids))
def _build_report_email_html(
feedbacks: list[UserFeedback],
start_time: datetime,
end_time: datetime,
push_hour: int,
) -> str:
template = load_template("feedback", "daily_report.html")
return template.substitute(
start_date=start_time.strftime("%Y-%m-%d"),
start_hour=str(push_hour),
end_date=end_time.strftime("%Y-%m-%d"),
end_hour=str(push_hour),
total_count=len(feedbacks),
bug_count=sum(1 for fb in feedbacks if fb.feedback_type == "bug"),
suggestion_count=sum(1 for fb in feedbacks if fb.feedback_type == "suggestion"),
generated_at=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
def _build_no_feedback_email_html(
start_time: datetime,
end_time: datetime,
push_hour: int,
) -> str:
template = load_template("feedback", "no_feedback.html")
return template.substitute(
start_date=start_time.strftime("%Y-%m-%d"),
start_hour=str(push_hour),
end_date=end_time.strftime("%Y-%m-%d"),
end_hour=str(push_hour),
generated_at=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
async def _send_feedback_email(
feedbacks: list[UserFeedback],
start_time: datetime,
end_time: datetime,
push_hour: int,
report_path: Path | None = None,
) -> bool:
sender = EmailSender()
if feedbacks:
body_html = _build_report_email_html(feedbacks, start_time, end_time, push_hour)
subject = f"用户反馈日报 - {start_time.strftime('%Y-%m-%d')}"
else:
body_html = _build_no_feedback_email_html(start_time, end_time, push_hour)
subject = f"用户反馈日报(无反馈)- {start_time.strftime('%Y-%m-%d')}"
attachments: list[EmailAttachment] = []
if report_path is not None and report_path.exists():
attachments.append(
EmailAttachment(
filename=report_path.name,
content=report_path.read_bytes(),
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
)
message = EmailMessage(
to=config.feedback_report.email,
subject=subject,
body_html=body_html,
attachments=attachments,
)
await sender.send(message)
logger.info(
"Feedback report email sent",
to=config.feedback_report.email,
has_feedback=len(feedbacks) > 0,
has_attachment=len(attachments) > 0,
)
return True
# type: ignore reportArgumentType for taskiq decorator
@worker_general_broker.on_event("startup") # type: ignore[arg-type]
async def _register_feedback_report_schedule() -> None:
if not config.feedback_report.enabled:
logger.info("Feedback report scheduling disabled")
return
schedule_source = RedisScheduleSource(
url=config.taskiq_broker_url,
prefix="schedule:feedback",
)
await schedule_source.startup()
await generate_daily_feedback_report.schedule_by_cron(
source=schedule_source,
cron=config.feedback_report.cron,
)
logger.info(
"Feedback report cron registered",
cron=config.feedback_report.cron,
)
@worker_general_broker.task(task_name="tasks.feedback.generate_daily_report")
async def generate_daily_feedback_report() -> str | None:
if not config.feedback_report.enabled:
logger.info("Feedback report is disabled, skipping")
return None
now = datetime.now(timezone.utc)
push_hour = 10
end_time = now.replace(hour=push_hour, minute=0, second=0, microsecond=0)
start_time = end_time - timedelta(days=1)
feedbacks = await _fetch_pending_feedbacks_by_time_range(start_time, end_time)
logger.info(
"Feedback query result",
count=len(feedbacks),
start=start_time,
end=end_time,
)
report_path: Path | None = None
try:
if feedbacks:
reports_dir = Path(config.runtime.log_dir).parent / "reports"
report_path = await generate_feedback_report(
feedbacks, output_dir=reports_dir
)
logger.info("Report generated", path=str(report_path))
await _mark_feedbacks_processed([fb.id for fb in feedbacks])
await _send_feedback_email(
feedbacks=feedbacks,
start_time=start_time,
end_time=end_time,
push_hour=push_hour,
report_path=report_path,
)
finally:
if report_path and report_path.exists():
report_path.unlink()
logger.info("Report file cleaned up", path=str(report_path))
return str(report_path) if report_path else None
async def generate_all_feedback_report() -> Path:
async with AsyncSessionLocal() as session:
stmt = select(UserFeedback).order_by(UserFeedback.created_at.desc())
result = await session.execute(stmt)
feedbacks = list(result.scalars().all())
if not feedbacks:
raise ValueError("No feedbacks to report")
logger.info("Generating all feedback report", count=len(feedbacks))
reports_dir = Path(config.runtime.log_dir).parent / "reports"
report_path = await generate_feedback_report(feedbacks, output_dir=reports_dir)
await _mark_feedbacks_processed([fb.id for fb in feedbacks])
return report_path
async def send_feedback_report_email(
feedbacks: list[UserFeedback],
start_time: datetime,
end_time: datetime,
push_hour: int,
report_path: Path | None = None,
) -> bool:
return await _send_feedback_email(
feedbacks=feedbacks,
start_time=start_time,
end_time=end_time,
push_hour=push_hour,
report_path=report_path,
)
+2
View File
@@ -4,6 +4,7 @@ from fastapi import APIRouter
from v1.agent.router import router as agent_router
from v1.auth.router import router as auth_router
from v1.feedback.router import router as feedback_router
from v1.invite.router import router as invite_router
from v1.notifications.router import router as notifications_router
from v1.points.router import router as points_router
@@ -13,6 +14,7 @@ from v1.users.router import router as users_router
router = APIRouter(prefix="/api/v1")
router.include_router(auth_router)
router.include_router(agent_router)
router.include_router(feedback_router)
router.include_router(invite_router)
router.include_router(notifications_router)
router.include_router(points_router)
@@ -0,0 +1,43 @@
from __future__ import annotations
import os
import time
import pytest
def pytest_configure(config): # noqa: ARG001
config.addinivalue_line(
"markers", "integration: integration test requiring live backend"
)
def pytest_collection_modifyitems(items):
for item in items:
if "integration" in item.nodeid:
item.add_marker(pytest.mark.integration)
@pytest.fixture(scope="session")
def api_base_url() -> str:
return os.environ.get("ERYAO_TEST_BASE_URL", "http://localhost:5775")
@pytest.fixture
def unique_test_email() -> str:
base_email = os.environ.get("ERYAO_TEST__EMAIL", "test@example.com").strip().lower()
if "@" in base_email:
name, domain = base_email.split("@", 1)
else:
name, domain = base_email, "example.com"
return f"{name}+fb{int(time.time() * 1000)}@{domain}"
@pytest.fixture
def test_verify_code() -> str:
return os.environ.get("ERYAO_TEST__CODE", "123456")
@pytest.fixture
def test_identity(unique_test_email: str, test_verify_code: str) -> dict[str, str]:
return {"email": unique_test_email, "code": test_verify_code}
@@ -0,0 +1,159 @@
from __future__ import annotations
from collections.abc import AsyncIterator
import httpx
import pytest
import json
@pytest.fixture
async def feedback_client(api_base_url: str) -> AsyncIterator[httpx.AsyncClient]:
async with httpx.AsyncClient(base_url=api_base_url, timeout=30.0) as client:
try:
health = await client.get("/health")
if health.status_code != 200:
pytest.skip(f"API not ready: /health={health.status_code}")
except Exception as exc:
pytest.skip(f"API unavailable: {exc}")
yield client
@pytest.fixture
async def authed_feedback_client(
feedback_client: httpx.AsyncClient,
test_identity: dict[str, str],
) -> httpx.AsyncClient:
otp_response = await feedback_client.post(
"/api/v1/auth/otp",
json={"email": test_identity["email"]},
)
if otp_response.status_code not in (200, 204):
pytest.skip(f"OTP request failed: {otp_response.status_code}")
verify_response = await feedback_client.post(
"/api/v1/auth/verify",
json={
"email": test_identity["email"],
"code": test_identity["code"],
},
)
if verify_response.status_code != 200:
pytest.skip(f"Auth verify failed: {verify_response.status_code}")
token = verify_response.json().get("access_token") or verify_response.json().get(
"session", {}
).get("access_token")
if not token:
pytest.skip("No access token in auth response")
feedback_client.headers["Authorization"] = f"Bearer {token}"
return feedback_client
class TestFeedbackSubmitAnonymous:
@pytest.mark.asyncio
async def test_submit_feedback_anonymous_success(
self, feedback_client: httpx.AsyncClient
):
response = await feedback_client.post(
"/api/v1/feedback",
data={
"feedback_type": "bug",
"content": "App crashes when opening settings",
"device_info": json.dumps({"platform": "ios", "model": "iPhone 15"}),
"app_version": "1.0.0",
"os_version": "iOS 17.0",
},
)
assert response.status_code == 201
body = response.json()
assert "id" in body
assert "created_at" in body
@pytest.mark.asyncio
async def test_submit_feedback_with_auth(
self, authed_feedback_client: httpx.AsyncClient
):
response = await authed_feedback_client.post(
"/api/v1/feedback",
data={
"feedback_type": "suggestion",
"content": "Please add dark mode",
"device_info": json.dumps({"platform": "android", "model": "Pixel 8"}),
"app_version": "1.0.0",
"os_version": "Android 14",
},
)
assert response.status_code == 201
body = response.json()
assert "id" in body
assert "created_at" in body
@pytest.mark.asyncio
async def test_submit_feedback_content_empty(
self, feedback_client: httpx.AsyncClient
):
response = await feedback_client.post(
"/api/v1/feedback",
data={
"feedback_type": "bug",
"content": "",
"device_info": json.dumps({"platform": "ios", "model": "iPhone 15"}),
"app_version": "1.0.0",
"os_version": "iOS 17.0",
},
)
assert response.status_code == 400
assert response.json().get("code") == "FEEDBACK_CONTENT_EMPTY"
@pytest.mark.asyncio
async def test_submit_feedback_content_too_long(
self, feedback_client: httpx.AsyncClient
):
response = await feedback_client.post(
"/api/v1/feedback",
data={
"feedback_type": "bug",
"content": "x" * 501,
"device_info": json.dumps({"platform": "ios", "model": "iPhone 15"}),
"app_version": "1.0.0",
"os_version": "iOS 17.0",
},
)
assert response.status_code == 400
assert response.json().get("code") == "FEEDBACK_CONTENT_TOO_LONG"
@pytest.mark.asyncio
async def test_submit_feedback_invalid_device_info(
self, feedback_client: httpx.AsyncClient
):
response = await feedback_client.post(
"/api/v1/feedback",
data={
"feedback_type": "bug",
"content": "Test content",
"device_info": "not-json",
"app_version": "1.0.0",
"os_version": "iOS 17.0",
},
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_submit_feedback_with_image(self, feedback_client: httpx.AsyncClient):
fake_image = b"\xff\xd8\xff\xe0" + b"\x00" * 100
response = await feedback_client.post(
"/api/v1/feedback",
data={
"feedback_type": "bug",
"content": "Screenshot of the issue",
"device_info": json.dumps({"platform": "ios", "model": "iPhone 15"}),
"app_version": "1.0.0",
"os_version": "iOS 17.0",
},
files=[("images", ("screenshot.jpg", fake_image, "image/jpeg"))],
)
assert response.status_code == 201
body = response.json()
assert "id" in body
+349
View File
@@ -0,0 +1,349 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
import pytest
from core.http.errors import ApiProblemError
from v1.feedback.schemas import FeedbackCreateResponse
class TestFeedbackCreateResponse:
def test_valid_response(self):
resp = FeedbackCreateResponse(
id=str(uuid4()),
created_at="2026-04-17T10:30:00Z",
)
assert isinstance(resp.id, str)
assert resp.created_at == "2026-04-17T10:30:00Z"
def test_extra_fields_forbidden(self):
with pytest.raises(Exception):
FeedbackCreateResponse(
id=str(uuid4()),
created_at="2026-04-17T10:30:00Z",
unexpected_field="value",
)
class _FakeUserFeedback:
def __init__(
self,
*,
id: UUID,
user_id: UUID | None,
feedback_type: str,
content: str,
images: list[str],
device_info: dict,
app_version: str,
os_version: str,
status: str = "pending",
) -> None:
self.id = id
self.user_id = user_id
self.feedback_type = feedback_type
self.content = content
self.images = images
self.device_info = device_info
self.app_version = app_version
self.os_version = os_version
self.status = status
self.created_at = datetime.now()
self.updated_at = datetime.now()
class _FakeFeedbackRepository:
def __init__(self) -> None:
self._records: list[_FakeUserFeedback] = []
self._committed = False
async def create_feedback(
self,
*,
user_id: UUID | None,
feedback_type: str,
content: str,
images: list[str],
device_info: dict,
app_version: str,
os_version: str,
) -> _FakeUserFeedback:
record = _FakeUserFeedback(
id=uuid4(),
user_id=user_id,
feedback_type=feedback_type,
content=content,
images=images,
device_info=device_info,
app_version=app_version,
os_version=os_version,
)
self._records.append(record)
return record
async def save(self) -> None:
self._committed = True
class _FakeUploadFile:
def __init__(
self,
*,
filename: str = "test.jpg",
content_type: str = "image/jpeg",
content: bytes = b"fake-image-data",
) -> None:
self.filename = filename
self.content_type = content_type
self._content = content
self._read = False
async def read(self) -> bytes:
self._read = True
return self._content
class _FakeStorage:
def __init__(self) -> None:
self.uploaded: list[dict] = []
async def upload_bytes(
self,
*,
bucket: str,
path: str,
content: bytes,
content_type: str,
) -> str:
self.uploaded.append(
{
"bucket": bucket,
"path": path,
"content_type": content_type,
"size": len(content),
}
)
return path
@pytest.fixture
def fake_repo() -> _FakeFeedbackRepository:
return _FakeFeedbackRepository()
@pytest.fixture
def fake_storage() -> _FakeStorage:
return _FakeStorage()
class TestFeedbackServiceValidation:
@pytest.mark.asyncio
async def test_submit_feedback_success_no_images(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
result = await service.submit_feedback(
feedback_type="bug",
content="App crashes on launch",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=[],
user_id=uuid4(),
)
assert isinstance(result, FeedbackCreateResponse)
assert result.id
assert result.created_at
assert len(fake_repo._records) == 1
assert fake_repo._records[0].feedback_type == "bug"
assert fake_repo._records[0].images == []
@pytest.mark.asyncio
async def test_submit_feedback_anonymous(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
await service.submit_feedback(
feedback_type="suggestion",
content="Add dark mode",
device_info={"platform": "android", "model": "Pixel 8"},
app_version="1.0.0",
os_version="Android 14",
images=[],
user_id=None,
)
assert fake_repo._records[0].user_id is None
@pytest.mark.asyncio
async def test_submit_feedback_with_images(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
images = [
_FakeUploadFile(filename="screenshot1.jpg", content_type="image/jpeg"),
_FakeUploadFile(filename="screenshot2.png", content_type="image/png"),
]
await service.submit_feedback(
feedback_type="bug",
content="UI glitch",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=images, # type: ignore[arg-type]
user_id=uuid4(),
)
assert len(fake_storage.uploaded) == 2
assert len(fake_repo._records[0].images) == 2
@pytest.mark.asyncio
async def test_content_empty_raises(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
with pytest.raises(ApiProblemError) as exc_info:
await service.submit_feedback(
feedback_type="bug",
content=" ",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=[],
user_id=None,
)
assert exc_info.value.code == "FEEDBACK_CONTENT_EMPTY"
@pytest.mark.asyncio
async def test_content_too_long_raises(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
with pytest.raises(ApiProblemError) as exc_info:
await service.submit_feedback(
feedback_type="bug",
content="x" * 501,
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=[],
user_id=None,
)
assert exc_info.value.code == "FEEDBACK_CONTENT_TOO_LONG"
@pytest.mark.asyncio
async def test_too_many_images_raises(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
images = [_FakeUploadFile() for _ in range(4)]
with pytest.raises(ApiProblemError) as exc_info:
await service.submit_feedback(
feedback_type="bug",
content="Test",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=images, # type: ignore[arg-type]
user_id=None,
)
assert exc_info.value.code == "FEEDBACK_TOO_MANY_IMAGES"
@pytest.mark.asyncio
async def test_invalid_image_type_raises(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
images = [_FakeUploadFile(content_type="image/gif")]
with pytest.raises(ApiProblemError) as exc_info:
await service.submit_feedback(
feedback_type="bug",
content="Test",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=images, # type: ignore[arg-type]
user_id=None,
)
assert exc_info.value.code == "FEEDBACK_INVALID_IMAGE_TYPE"
@pytest.mark.asyncio
async def test_image_too_large_raises(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
large_content = b"x" * (6 * 1024 * 1024)
images = [_FakeUploadFile(content=large_content, content_type="image/jpeg")]
with pytest.raises(ApiProblemError) as exc_info:
await service.submit_feedback(
feedback_type="bug",
content="Test",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=images, # type: ignore[arg-type]
user_id=None,
)
assert exc_info.value.code == "FEEDBACK_IMAGE_TOO_LARGE"
@pytest.mark.asyncio
async def test_invalid_feedback_type_raises(
self, fake_repo: _FakeFeedbackRepository, fake_storage: _FakeStorage
):
from v1.feedback.service import FeedbackService
service = FeedbackService(repository=fake_repo, storage=fake_storage)
with pytest.raises(ApiProblemError) as exc_info:
await service.submit_feedback(
feedback_type="invalid_type",
content="Test",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=[],
user_id=None,
)
assert exc_info.value.code == "REQUEST_VALIDATION_ERROR"
@pytest.mark.asyncio
async def test_storage_upload_failure_raises_submit_failed(
self, fake_repo: _FakeFeedbackRepository
):
from v1.feedback.service import FeedbackService
class _FailingStorage:
async def upload_bytes(self, **kwargs: object) -> str:
raise RuntimeError("Storage unavailable")
service = FeedbackService(
repository=fake_repo,
storage=_FailingStorage(), # type: ignore[arg-type]
)
images = [_FakeUploadFile()]
with pytest.raises(ApiProblemError) as exc_info:
await service.submit_feedback(
feedback_type="bug",
content="Test",
device_info={"platform": "ios", "model": "iPhone 15"},
app_version="1.0.0",
os_version="iOS 17.0",
images=images, # type: ignore[arg-type]
user_id=None,
)
assert exc_info.value.code == "FEEDBACK_SUBMIT_FAILED"