Skip to content

13.1 功能清单:注册 → 登录 → 发送通知(入队) → 查看通知列表

在综合项目实战中,我们要构建一个用户通知中心系统。这个系统包含四个核心功能模块:

  • 用户注册:新用户填写邮箱和密码进行注册
  • 用户登录:已注册用户通过邮箱密码登录获取 JWT token
  • 发送通知:登录用户可触发发送通知操作,该操作会被放入 Redis 队列异步处理
  • 查看通知列表:用户可查询自己收到的所有通知记录

这套流程完整覆盖了认证、数据库操作、异步任务和缓存等关键技术点,是前面章节知识的集大成者。

13.2 数据库设计:users 表 + notifications 表(含 status 字段)

我们需要两张核心表来支撑通知中心的功能:

users 表

  • id: 主键,自增整数
  • email: 唯一索引,用于登录
  • password_hash: 加密后的密码
  • created_at: 创建时间戳

notifications 表

  • id: 主键,自增整数
  • user_id: 外键关联 users.id
  • title: 通知标题
  • content: 通知内容
  • status: 状态字段(pending/processing/sent/failed)
  • created_at: 创建时间
  • updated_at: 最后更新时间

实例方法表格

功能名称实例调用方法具体功能、注意事项、必需参数/可选参数
创建用户表User.__table__.create(engine)在数据库中创建 users 表,需先初始化引擎
创建通知表Notification.__table__.create(engine)在数据库中创建 notifications 表,需确保外键约束正确
添加用户session.add(User(email=..., password_hash=...))必需参数:email, password_hash;email 需唯一
添加通知session.add(Notification(user_id=..., title=..., content=...))必需参数:user_id, title, content;status 默认为 'pending'

下面是一个完整的模型定义示例:

python
# app/models.py
from sqlalchemy import String, Integer, ForeignKey, DateTime, Text, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import Optional

class Base(DeclarativeBase):
    """声明式基类"""
    pass

class User(Base):
    """用户模型"""
    __tablename__ = "users"
    
    # 主键定义
    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    # 邮箱字段,唯一约束
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    # 密码哈希存储
    password_hash: Mapped[str] = mapped_column(String(255))
    # 创建时间戳,自动设置为当前时间
    created_at: Mapped[DateTime] = mapped_column(DateTime, default=func.now())
    
    # 反向关系,关联通知
    notifications: Mapped[list["Notification"]] = relationship(
        "Notification", 
        back_populates="user",
        cascade="all, delete-orphan"
    )

class Notification(Base):
    """通知模型"""
    __tablename__ = "notifications"
    
    # 主键定义
    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    # 外键关联用户
    user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"))
    # 通知标题
    title: Mapped[str] = mapped_column(String(255))
    # 通知内容
    content: Mapped[str] = mapped_column(Text)
    # 状态字段,默认为 pending
    status: Mapped[str] = mapped_column(String(20), default="pending")
    # 创建时间
    created_at: Mapped[DateTime] = mapped_column(DateTime, default=func.now())
    # 更新时间,自动更新
    updated_at: Mapped[DateTime] = mapped_column(DateTime, default=func.now(), onupdate=func.now())
    
    # 正向关系,关联用户
    user: Mapped["User"] = relationship("User", back_populates="notifications")

这段代码定义了两个相互关联的模型,使用 SQLAlchemy 2.0 的现代语法。注意 relationship 中的 back_populates 参数建立了双向关系,而 cascade="all, delete-orphan" 确保当用户被删除时,其所有通知也会被级联删除。

13.3 后台 Worker:消费队列 → 调用 SMTP 发送邮件 → 更新状态

后台 Worker 是通知中心的核心组件,它负责从 Redis 队列中取出任务并执行实际的通知发送工作。

Worker 的工作流程如下:

  1. 监听 Redis 列表的 BLPOP 操作,阻塞等待新任务
  2. 收到任务后解析 JSON 数据
  3. 根据通知类型执行相应操作(本例中为发送邮件)
  4. 更新数据库中通知的状态为成功或失败

实例方法表格

功能名称实例调用方法具体功能、注意事项、必需参数/可选参数
任务入队await redis.lpush("notification_queue", json.dumps(task))将通知任务推入队列左侧,task 需包含必要信息
阻塞出队await redis.blpop("notification_queue", timeout=0)从队列右侧阻塞弹出任务,timeout=0 表示永久阻塞
发送邮件await send_email(to_email, subject, body)异步发送邮件函数,需配置 SMTP 服务器
更新状态await session.execute(update(Notification).where(...).values(status="sent"))更新通知状态,需在事务中执行

下面是一个完整的 Worker 实现:

python
# worker.py
import asyncio
import json
import logging
from datetime import datetime
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import update
import redis.asyncio as redis
from aiosmtplib import send
from app.models import Notification
from app.core.config import settings

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 创建数据库引擎
engine = create_async_engine(settings.DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# 创建 Redis 连接
redis_client = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, decode_responses=True)

async def send_email(to_email: str, subject: str, body: str) -> bool:
    """
    异步发送邮件函数
    
    Args:
        to_email: 收件人邮箱
        subject: 邮件主题
        body: 邮件正文
        
    Returns:
        bool: 发送是否成功
    """
    try:
        # 使用 aiosmtplib 发送邮件
        await send(
            message=f"Subject: {subject}\n\n{body}",
            hostname=settings.SMTP_HOST,
            port=settings.SMTP_PORT,
            start_tls=True,
            username=settings.SMTP_USER,
            password=settings.SMTP_PASSWORD,
            sender=settings.SMTP_SENDER,
            recipients=[to_email]
        )
        logger.info(f"邮件发送成功: {to_email}")
        return True
    except Exception as e:
        logger.error(f"邮件发送失败: {e}")
        return False

async def process_notification_task(task_data: dict) -> None:
    """
    处理单个通知任务
    
    Args:
        task_data: 任务数据字典,包含 notification_id 等信息
    """
    notification_id = task_data.get("notification_id")
    if not notification_id:
        logger.error("任务数据缺少 notification_id")
        return
    
    async with async_session() as session:
        try:
            # 查询通知记录
            result = await session.execute(
                select(Notification).where(Notification.id == notification_id)
            )
            notification = result.scalar_one_or_none()
            
            if not notification:
                logger.error(f"未找到通知记录: {notification_id}")
                return
            
            # 获取用户邮箱
            user_email = notification.user.email
            
            # 发送邮件
            success = await send_email(
                to_email=user_email,
                subject=notification.title,
                body=notification.content
            )
            
            # 更新通知状态
            new_status = "sent" if success else "failed"
            await session.execute(
                update(Notification)
                .where(Notification.id == notification_id)
                .values(status=new_status, updated_at=datetime.utcnow())
            )
            await session.commit()
            
            logger.info(f"通知处理完成: {notification_id}, 状态: {new_status}")
            
        except Exception as e:
            logger.error(f"处理通知任务时出错: {e}")
            # 回滚事务
            await session.rollback()

async def worker():
    """
    主 Worker 循环
    """
    logger.info("通知 Worker 启动...")
    while True:
        try:
            # 从 Redis 队列阻塞获取任务
            queue_item = await redis_client.blpop("notification_queue", timeout=0)
            if queue_item:
                _, task_json = queue_item
                task_data = json.loads(task_json)
                await process_notification_task(task_data)
        except asyncio.CancelledError:
            logger.info("Worker 被取消")
            break
        except Exception as e:
            logger.error(f"Worker 错误: {e}")
            # 短暂休眠避免错误风暴
            await asyncio.sleep(1)

if __name__ == "__main__":
    # 运行 Worker
    asyncio.run(worker())

这个 Worker 实现了完整的任务处理流程,包含了错误处理和日志记录。注意我们使用了 aiosmtplib 来异步发送邮件,这比传统的同步 SMTP 库更适合异步环境。

13.4 前端轮询:通过 API 获取最新通知(带 Redis 缓存)

为了让前端能够实时获取用户的最新通知,我们需要提供一个高效的 API 接口。为了提高性能,我们会使用 Redis 缓存通知列表,并设置合理的过期时间。

API 设计要点:

  • 使用 JWT 认证确保只有登录用户能访问自己的通知
  • 优先从 Redis 缓存读取,缓存未命中时才查询数据库
  • 支持分页以处理大量通知的情况
  • 返回统一格式的响应数据

实例方法表格

功能名称实例调用方法具体功能、注意事项、必需参数/可选参数
获取缓存await redis.get(f"notifications:{user_id}:{page}")从 Redis 获取缓存的通知列表,key 包含用户ID和页码
设置缓存await redis.setex(key, 300, json.dumps(notifications))设置缓存,过期时间为300秒(5分钟)
查询数据库await session.execute(select(Notification).where(...))从数据库查询通知,按创建时间倒序排列
分页处理offset=(page-1)*page_size, limit=page_size实现分页查询,避免一次性加载过多数据

下面是一个完整的 API 实现:

python
# app/api/v1/notifications.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
import json
import redis.asyncio as redis
from app.db.session import get_db
from app.core.security import get_current_user
from app.models import Notification, User
from app.schemas.notification import NotificationResponse
from app.core.config import settings

router = APIRouter()

# 初始化 Redis 连接
redis_client = redis.Redis(
    host=settings.REDIS_HOST, 
    port=settings.REDIS_PORT, 
    decode_responses=True
)

@router.get(
    "/notifications",
    response_model=List[NotificationResponse],
    summary="获取用户通知列表"
)
async def get_notifications(
    page: int = Query(1, ge=1, description="页码"),
    page_size: int = Query(10, ge=1, le=100, description="每页数量"),
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    """
    获取当前用户的通知列表
    
    Args:
        page: 页码,默认为1
        page_size: 每页数量,默认为10,最大100
        current_user: 当前认证用户
        db: 数据库会话
        
    Returns:
        List[NotificationResponse]: 通知列表
    """
    try:
        # 构建缓存 key
        cache_key = f"notifications:{current_user.id}:{page}:{page_size}"
        
        # 尝试从 Redis 获取缓存
        cached_data = await redis_client.get(cache_key)
        if cached_data:
            # 缓存命中,直接返回
            notifications = json.loads(cached_data)
            return [NotificationResponse(**item) for item in notifications]
        
        # 缓存未命中,查询数据库
        offset = (page - 1) * page_size
        query = (
            select(Notification)
            .where(Notification.user_id == current_user.id)
            .order_by(Notification.created_at.desc())
            .offset(offset)
            .limit(page_size)
        )
        
        result = await db.execute(query)
        notifications = result.scalars().all()
        
        # 转换为响应模型
        response_data = [
            NotificationResponse(
                id=notification.id,
                title=notification.title,
                content=notification.content,
                status=notification.status,
                created_at=notification.created_at,
                updated_at=notification.updated_at
            )
            for notification in notifications
        ]
        
        # 存入 Redis 缓存,过期时间5分钟
        await redis_client.setex(
            cache_key,
            300,  # 5分钟过期
            json.dumps([item.dict() for item in response_data])
        )
        
        return response_data
        
    except Exception as e:
        # 记录错误日志
        print(f"获取通知列表时出错: {e}")
        raise HTTPException(
            status_code=500,
            detail="获取通知列表失败"
        )

# 通知发送接口
@router.post(
    "/notifications/send",
    summary="发送通知"
)
async def send_notification(
    title: str,
    content: str,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    """
    发送通知(实际是创建通知任务)
    
    Args:
        title: 通知标题
        content: 通知内容
        current_user: 当前认证用户
        db: 数据库会话
        
    Returns:
        dict: 操作结果
    """
    try:
        # 创建通知记录
        notification = Notification(
            user_id=current_user.id,
            title=title,
            content=content,
            status="pending"
        )
        db.add(notification)
        await db.commit()
        await db.refresh(notification)
        
        # 将任务加入 Redis 队列
        task_data = {
            "notification_id": notification.id,
            "user_id": current_user.id,
            "timestamp": notification.created_at.isoformat()
        }
        await redis_client.lpush(
            "notification_queue",
            json.dumps(task_data)
        )
        
        # 清除相关缓存
        # 这里可以清除第一页的缓存,因为新通知会出现在第一页
        await redis_client.delete(f"notifications:{current_user.id}:1:10")
        
        return {
            "code": 200,
            "message": "通知已提交处理",
            "notification_id": notification.id
        }
        
    except Exception as e:
        await db.rollback()
        print(f"发送通知时出错: {e}")
        raise HTTPException(
            status_code=500,
            detail="发送通知失败"
        )

这个实现包含了完整的缓存策略和错误处理。当用户发送新通知时,我们会主动清除第一页的缓存,确保用户刷新页面时能看到最新的通知。同时,所有数据库操作都包含在 try-except 块中,确保出现异常时能正确回滚事务。