Skip to content

10.1 JWT 生成与验证:使用 python-jose[cryptography]

JWT(JSON Web Token)是现代 Web 应用中广泛采用的身份认证方案。它无状态、可跨域、自带签名,非常适合 FastAPI 这类高性能异步框架。我们选用 python-jose 库来处理 JWT,因为它支持 JWS(签名)和 JWE(加密),且兼容主流算法。

首先安装依赖:

bash
pip install python-jose[cryptography]

接下来,我们需要定义密钥和算法。通常这些配置会放在 .env 文件中,通过 Pydantic Settings 加载(参考第2章)。

下面是一个完整的 JWT 工具模块示例:

python
# app/core/security.py

from datetime import datetime, timedelta
from jose import jwt, JWTError
from typing import Optional
from app.core.config import settings  # 假设你已按第2章配置了settings

# 定义常量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # 访问令牌有效期30分钟

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """
    生成JWT访问令牌
    :param data: 要编码进token的载荷数据(如用户ID)
    :param expires_delta: 自定义过期时间(可选)
    :return: 编码后的JWT字符串
    """
    # 深拷贝原始数据,避免修改原字典
    to_encode = data.copy()
    
    # 设置过期时间
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    
    # 将过期时间加入载荷
    to_encode.update({"exp": expire})
    
    # 使用密钥和算法编码JWT
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str) -> dict:
    """
    验证并解码JWT令牌
    :param token: 待验证的JWT字符串
    :return: 解码后的载荷字典
    :raises: JWTError 如果令牌无效或过期
    """
    try:
        # 使用密钥解码JWT
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        # 捕获所有JWT相关错误(签名无效、过期、格式错误等)
        raise JWTError("Invalid or expired token")

这节讲了如何用 python-jose 生成和验证 JWT 令牌,这是实现无状态认证的基础。通过设置过期时间和密钥签名,确保了令牌的安全性和时效性。

10.2 登录接口:验证密码 → 生成 token → 存入 Redis(可选)

登录接口的核心流程是:接收用户名和密码 → 查询数据库验证 → 生成 JWT → 返回给前端。为了支持单点登录或强制下线,我们可以将 token 存入 Redis,并设置与 JWT 相同的过期时间。

以下是登录接口的完整实现:

python
# app/api/auth.py

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext
from app.db.session import get_db
from app.models.user import User
from app.schemas.auth import LoginRequest, TokenResponse
from app.core.security import create_access_token
from app.core.redis import redis_client  # 假设你已按第8章配置了Redis客户端
import json

# 密码哈希上下文(应全局单例)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

router = APIRouter()

async def authenticate_user(db: AsyncSession, username: str, password: str) -> User | None:
    """
    验证用户凭据
    :param db: 数据库会话
    :param username: 用户名
    :param password: 明文密码
    :return: 用户对象或None
    """
    # 查询用户
    result = await db.execute(select(User).where(User.username == username))
    user = result.scalar_one_or_none()
    
    # 用户不存在或密码错误
    if not user or not pwd_context.verify(password, user.hashed_password):
        return None
    
    return user

@router.post("/login", response_model=TokenResponse)
async def login(
    login_data: LoginRequest,
    db: AsyncSession = Depends(get_db)
):
    """
    用户登录接口
    :param login_data: 包含username和password的请求体
    :param db: 数据库会话依赖
    :return: 包含access_token的响应
    """
    # 验证用户
    user = await authenticate_user(db, login_data.username, login_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # 生成访问令牌
    access_token_expires = timedelta(minutes=30)
    access_token = create_access_token(
        data={"sub": str(user.id)},  # 通常用用户ID作为subject
        expires_delta=access_token_expires
    )
    
    # 可选:将token存入Redis以支持单点登录
    # key格式: user_token:{user_id}
    redis_key = f"user_token:{user.id}"
    await redis_client.setex(
        name=redis_key,
        time=int(access_token_expires.total_seconds()),
        value=access_token
    )
    
    return TokenResponse(access_token=access_token, token_type="bearer")

对应的 Pydantic 模型:

python
# app/schemas/auth.py

from pydantic import BaseModel

class LoginRequest(BaseModel):
    username: str
    password: str

class TokenResponse(Base Model):
    access_token: str
    token_type: str

这节实现了完整的登录流程,包括密码验证、JWT 生成和 Redis 存储。通过将 token 存入 Redis,我们可以实现更灵活的会话管理,比如强制用户下线。

10.3 依赖校验:get_current_user(token: str = Depends(oauth2_scheme))

FastAPI 的依赖注入系统让我们可以轻松地在每个需要认证的路由中自动验证用户身份。我们使用 OAuth2PasswordBearer 来定义一个标准的 Bearer Token 依赖。

首先定义 OAuth2 方案:

python
# app/core/security.py (追加)

from fastapi.security import OAuth2PasswordBearer

# 定义OAuth2方案,指定tokenUrl为/login
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")

然后创建获取当前用户的依赖函数:

python
# app/api/deps.py

from fastapi import Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from jose import JWTError
from app.db.session import get_db
from app.models.user import User
from app.core.security import oauth2_scheme, verify_token
from app.core.redis import redis_client

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -> User:
    """
    获取当前认证用户
    :param token: 从Authorization头提取的Bearer token
    :param db: 数据库会话
    :return: 用户对象
    :raises: HTTPException 如果token无效或用户不存在
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    try:
        # 验证并解码token
        payload = verify_token(token)
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    
    # 可选:检查Redis中是否存在该token(防止token被提前撤销)
    redis_key = f"user_token:{user_id}"
    stored_token = await redis_client.get(redis_key)
    if stored_token != token:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token revoked or invalid",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # 查询用户
    result = await db.execute(select(User).where(User.id == int(user_id)))
    user = result.scalar_one_or_none()
    if user is None:
        raise credentials_exception
    
    return user

现在可以在任何需要认证的路由中使用这个依赖:

python
# app/api/users.py

from fastapi import APIRouter, Depends
from app.models.user import User
from app.api.deps import get_current_user

router = APIRouter()

@router.get("/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return {"username": current_user.username, "email": current_user.email}

这节展示了如何利用 FastAPI 的依赖注入系统实现自动化的用户认证。通过 get_current_user 依赖,我们可以在任何路由中轻松获取当前用户,而无需重复编写认证逻辑。

10.4 权限中间件:检查用户角色(admin/user)

除了基本的身份认证,我们还需要基于角色的访问控制(RBAC)。可以通过自定义依赖或中间件来实现。这里我们使用依赖方式,因为它更灵活且易于测试。

首先确保用户模型中有角色字段:

python
# app/models/user.py

from sqlalchemy import String, Enum
from app.db.base_class import Base

class UserRole(str, Enum):
    USER = "user"
    ADMIN = "admin"

class User(Base):
    __tablename__ = "users"
    
    id = mapped_column(Integer, primary_key=True, index=True)
    username = mapped_column(String(50), unique=True, index=True)
    email = mapped_column(String(100), unique=True, index=True)
    hashed_password = mapped_column(String(255))
    role = mapped_column(Enum(UserRole), default=UserRole.USER)  # 角色字段

然后创建角色验证依赖:

python
# app/api/deps.py (追加)

from enum import Enum
from fastapi import Depends, HTTPException, status
from app.models.user import User, UserRole

class RoleChecker:
    def __init__(self, allowed_roles: list[UserRole]):
        self.allowed_roles = allowed_roles

    def __call__(self, current_user: User = Depends(get_current_user)):
        if current_user.role not in self.allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Operation not permitted"
            )
        return current_user

# 创建预定义的角色检查器
allow_admin = RoleChecker([UserRole.ADMIN])
allow_user_and_admin = RoleChecker([UserRole.USER, UserRole.ADMIN])

在路由中使用角色检查器:

python
# app/api/admin.py

from fastapi import APIRouter, Depends
from app.api.deps import allow_admin
from app.models.user import User

router = APIRouter()

@router.get("/admin-only")
async def admin_dashboard(current_user: User = Depends(allow_admin)):
    return {"message": f"Welcome, admin {current_user.username}!"}

@router.delete("/users/{user_id}")
async def delete_user(user_id: int, current_user: User = Depends(allow_admin)):
    # 只有管理员可以删除用户
    return {"message": f"User {user_id} deleted by {current_user.username}"}

实例方法表格

功能名称实例调用方法具体功能、注意事项、必需参数/可选参数
生成JWT令牌create_access_token(data={"sub": "123"})必需参数:data(载荷数据);可选参数:expires_delta(自定义过期时间)
验证JWT令牌verify_token("eyJhbGciOiJIUzI1NiIs...")必需参数:token(JWT字符串);返回载荷字典或抛出JWTError
获取当前用户get_current_user(token=Depends(oauth2_scheme))作为FastAPI依赖使用;自动从Authorization头提取token并验证
角色权限检查RoleChecker([UserRole.ADMIN])初始化时传入允许的角色列表;作为依赖使用时自动检查当前用户角色

注意事项

  • JWT 密钥(SECRET_KEY)必须足够长且保密,生产环境中绝不能硬编码在代码中
  • Redis 中存储 token 主要用于支持令牌撤销(如用户登出),但会增加一次 Redis 查询开销
  • 角色检查依赖应该在 get_current_user 之后执行,因为需要先获取用户信息
  • 所有认证相关的异常都应该返回 401(未认证)或 403(无权限),遵循 HTTP 标准
  • 在高并发场景下,考虑使用缓存减少数据库查询(如缓存用户角色信息)