13.1 功能清单:注册 → 登录 → 发送通知(入队) → 查看通知列表
在综合项目实战中,我们要构建一个用户通知中心系统。这个系统包含四个核心功能模块:
- 用户注册:新用户填写邮箱和密码进行注册
- 用户登录:已注册用户通过邮箱密码登录获取 JWT token
- 发送通知:登录用户可触发发送通知操作,该操作会被放入 Redis 队列异步处理
- 查看通知列表:用户可查询自己收到的所有通知记录
这套流程完整覆盖了认证、数据库操作、异步任务和缓存等关键技术点,是前面章节知识的集大成者。
13.2 数据库设计:users 表 + notifications 表(含 status 字段)
我们需要两张核心表来支撑通知中心的功能:
users 表
- id: 主键,自增整数
- email: 唯一索引,用于登录
- password_hash: 加密后的密码
- created_at: 创建时间戳
notifications 表
- id: 主键,自增整数
- user_id: 外键关联 users.id
- title: 通知标题
- content: 通知内容
- status: 状态字段(pending/processing/sent/failed)
- created_at: 创建时间
- updated_at: 最后更新时间
实例方法表格
| 功能名称 | 实例调用方法 | 具体功能、注意事项、必需参数/可选参数 |
|---|---|---|
| 创建用户表 | User.__table__.create(engine) | 在数据库中创建 users 表,需先初始化引擎 |
| 创建通知表 | Notification.__table__.create(engine) | 在数据库中创建 notifications 表,需确保外键约束正确 |
| 添加用户 | session.add(User(email=..., password_hash=...)) | 必需参数:email, password_hash;email 需唯一 |
| 添加通知 | session.add(Notification(user_id=..., title=..., content=...)) | 必需参数:user_id, title, content;status 默认为 'pending' |
下面是一个完整的模型定义示例:
python
# app/models.py
from sqlalchemy import String, Integer, ForeignKey, DateTime, Text, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import Optional
class Base(DeclarativeBase):
"""声明式基类"""
pass
class User(Base):
"""用户模型"""
__tablename__ = "users"
# 主键定义
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
# 邮箱字段,唯一约束
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
# 密码哈希存储
password_hash: Mapped[str] = mapped_column(String(255))
# 创建时间戳,自动设置为当前时间
created_at: Mapped[DateTime] = mapped_column(DateTime, default=func.now())
# 反向关系,关联通知
notifications: Mapped[list["Notification"]] = relationship(
"Notification",
back_populates="user",
cascade="all, delete-orphan"
)
class Notification(Base):
"""通知模型"""
__tablename__ = "notifications"
# 主键定义
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
# 外键关联用户
user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"))
# 通知标题
title: Mapped[str] = mapped_column(String(255))
# 通知内容
content: Mapped[str] = mapped_column(Text)
# 状态字段,默认为 pending
status: Mapped[str] = mapped_column(String(20), default="pending")
# 创建时间
created_at: Mapped[DateTime] = mapped_column(DateTime, default=func.now())
# 更新时间,自动更新
updated_at: Mapped[DateTime] = mapped_column(DateTime, default=func.now(), onupdate=func.now())
# 正向关系,关联用户
user: Mapped["User"] = relationship("User", back_populates="notifications")这段代码定义了两个相互关联的模型,使用 SQLAlchemy 2.0 的现代语法。注意 relationship 中的 back_populates 参数建立了双向关系,而 cascade="all, delete-orphan" 确保当用户被删除时,其所有通知也会被级联删除。
13.3 后台 Worker:消费队列 → 调用 SMTP 发送邮件 → 更新状态
后台 Worker 是通知中心的核心组件,它负责从 Redis 队列中取出任务并执行实际的通知发送工作。
Worker 的工作流程如下:
- 监听 Redis 列表的
BLPOP操作,阻塞等待新任务 - 收到任务后解析 JSON 数据
- 根据通知类型执行相应操作(本例中为发送邮件)
- 更新数据库中通知的状态为成功或失败
实例方法表格
| 功能名称 | 实例调用方法 | 具体功能、注意事项、必需参数/可选参数 |
|---|---|---|
| 任务入队 | await redis.lpush("notification_queue", json.dumps(task)) | 将通知任务推入队列左侧,task 需包含必要信息 |
| 阻塞出队 | await redis.blpop("notification_queue", timeout=0) | 从队列右侧阻塞弹出任务,timeout=0 表示永久阻塞 |
| 发送邮件 | await send_email(to_email, subject, body) | 异步发送邮件函数,需配置 SMTP 服务器 |
| 更新状态 | await session.execute(update(Notification).where(...).values(status="sent")) | 更新通知状态,需在事务中执行 |
下面是一个完整的 Worker 实现:
python
# worker.py
import asyncio
import json
import logging
from datetime import datetime
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import update
import redis.asyncio as redis
from aiosmtplib import send
from app.models import Notification
from app.core.config import settings
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建数据库引擎
engine = create_async_engine(settings.DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 创建 Redis 连接
redis_client = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, decode_responses=True)
async def send_email(to_email: str, subject: str, body: str) -> bool:
"""
异步发送邮件函数
Args:
to_email: 收件人邮箱
subject: 邮件主题
body: 邮件正文
Returns:
bool: 发送是否成功
"""
try:
# 使用 aiosmtplib 发送邮件
await send(
message=f"Subject: {subject}\n\n{body}",
hostname=settings.SMTP_HOST,
port=settings.SMTP_PORT,
start_tls=True,
username=settings.SMTP_USER,
password=settings.SMTP_PASSWORD,
sender=settings.SMTP_SENDER,
recipients=[to_email]
)
logger.info(f"邮件发送成功: {to_email}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
return False
async def process_notification_task(task_data: dict) -> None:
"""
处理单个通知任务
Args:
task_data: 任务数据字典,包含 notification_id 等信息
"""
notification_id = task_data.get("notification_id")
if not notification_id:
logger.error("任务数据缺少 notification_id")
return
async with async_session() as session:
try:
# 查询通知记录
result = await session.execute(
select(Notification).where(Notification.id == notification_id)
)
notification = result.scalar_one_or_none()
if not notification:
logger.error(f"未找到通知记录: {notification_id}")
return
# 获取用户邮箱
user_email = notification.user.email
# 发送邮件
success = await send_email(
to_email=user_email,
subject=notification.title,
body=notification.content
)
# 更新通知状态
new_status = "sent" if success else "failed"
await session.execute(
update(Notification)
.where(Notification.id == notification_id)
.values(status=new_status, updated_at=datetime.utcnow())
)
await session.commit()
logger.info(f"通知处理完成: {notification_id}, 状态: {new_status}")
except Exception as e:
logger.error(f"处理通知任务时出错: {e}")
# 回滚事务
await session.rollback()
async def worker():
"""
主 Worker 循环
"""
logger.info("通知 Worker 启动...")
while True:
try:
# 从 Redis 队列阻塞获取任务
queue_item = await redis_client.blpop("notification_queue", timeout=0)
if queue_item:
_, task_json = queue_item
task_data = json.loads(task_json)
await process_notification_task(task_data)
except asyncio.CancelledError:
logger.info("Worker 被取消")
break
except Exception as e:
logger.error(f"Worker 错误: {e}")
# 短暂休眠避免错误风暴
await asyncio.sleep(1)
if __name__ == "__main__":
# 运行 Worker
asyncio.run(worker())这个 Worker 实现了完整的任务处理流程,包含了错误处理和日志记录。注意我们使用了 aiosmtplib 来异步发送邮件,这比传统的同步 SMTP 库更适合异步环境。
13.4 前端轮询:通过 API 获取最新通知(带 Redis 缓存)
为了让前端能够实时获取用户的最新通知,我们需要提供一个高效的 API 接口。为了提高性能,我们会使用 Redis 缓存通知列表,并设置合理的过期时间。
API 设计要点:
- 使用 JWT 认证确保只有登录用户能访问自己的通知
- 优先从 Redis 缓存读取,缓存未命中时才查询数据库
- 支持分页以处理大量通知的情况
- 返回统一格式的响应数据
实例方法表格
| 功能名称 | 实例调用方法 | 具体功能、注意事项、必需参数/可选参数 |
|---|---|---|
| 获取缓存 | await redis.get(f"notifications:{user_id}:{page}") | 从 Redis 获取缓存的通知列表,key 包含用户ID和页码 |
| 设置缓存 | await redis.setex(key, 300, json.dumps(notifications)) | 设置缓存,过期时间为300秒(5分钟) |
| 查询数据库 | await session.execute(select(Notification).where(...)) | 从数据库查询通知,按创建时间倒序排列 |
| 分页处理 | offset=(page-1)*page_size, limit=page_size | 实现分页查询,避免一次性加载过多数据 |
下面是一个完整的 API 实现:
python
# app/api/v1/notifications.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
import json
import redis.asyncio as redis
from app.db.session import get_db
from app.core.security import get_current_user
from app.models import Notification, User
from app.schemas.notification import NotificationResponse
from app.core.config import settings
router = APIRouter()
# 初始化 Redis 连接
redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
decode_responses=True
)
@router.get(
"/notifications",
response_model=List[NotificationResponse],
summary="获取用户通知列表"
)
async def get_notifications(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(10, ge=1, le=100, description="每页数量"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""
获取当前用户的通知列表
Args:
page: 页码,默认为1
page_size: 每页数量,默认为10,最大100
current_user: 当前认证用户
db: 数据库会话
Returns:
List[NotificationResponse]: 通知列表
"""
try:
# 构建缓存 key
cache_key = f"notifications:{current_user.id}:{page}:{page_size}"
# 尝试从 Redis 获取缓存
cached_data = await redis_client.get(cache_key)
if cached_data:
# 缓存命中,直接返回
notifications = json.loads(cached_data)
return [NotificationResponse(**item) for item in notifications]
# 缓存未命中,查询数据库
offset = (page - 1) * page_size
query = (
select(Notification)
.where(Notification.user_id == current_user.id)
.order_by(Notification.created_at.desc())
.offset(offset)
.limit(page_size)
)
result = await db.execute(query)
notifications = result.scalars().all()
# 转换为响应模型
response_data = [
NotificationResponse(
id=notification.id,
title=notification.title,
content=notification.content,
status=notification.status,
created_at=notification.created_at,
updated_at=notification.updated_at
)
for notification in notifications
]
# 存入 Redis 缓存,过期时间5分钟
await redis_client.setex(
cache_key,
300, # 5分钟过期
json.dumps([item.dict() for item in response_data])
)
return response_data
except Exception as e:
# 记录错误日志
print(f"获取通知列表时出错: {e}")
raise HTTPException(
status_code=500,
detail="获取通知列表失败"
)
# 通知发送接口
@router.post(
"/notifications/send",
summary="发送通知"
)
async def send_notification(
title: str,
content: str,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""
发送通知(实际是创建通知任务)
Args:
title: 通知标题
content: 通知内容
current_user: 当前认证用户
db: 数据库会话
Returns:
dict: 操作结果
"""
try:
# 创建通知记录
notification = Notification(
user_id=current_user.id,
title=title,
content=content,
status="pending"
)
db.add(notification)
await db.commit()
await db.refresh(notification)
# 将任务加入 Redis 队列
task_data = {
"notification_id": notification.id,
"user_id": current_user.id,
"timestamp": notification.created_at.isoformat()
}
await redis_client.lpush(
"notification_queue",
json.dumps(task_data)
)
# 清除相关缓存
# 这里可以清除第一页的缓存,因为新通知会出现在第一页
await redis_client.delete(f"notifications:{current_user.id}:1:10")
return {
"code": 200,
"message": "通知已提交处理",
"notification_id": notification.id
}
except Exception as e:
await db.rollback()
print(f"发送通知时出错: {e}")
raise HTTPException(
status_code=500,
detail="发送通知失败"
)这个实现包含了完整的缓存策略和错误处理。当用户发送新通知时,我们会主动清除第一页的缓存,确保用户刷新页面时能看到最新的通知。同时,所有数据库操作都包含在 try-except 块中,确保出现异常时能正确回滚事务。