Skip to content

11.1 单元测试:pytest + httpx.AsyncClient

在 FastAPI 异步项目中,我们通常使用 pytest 作为测试框架,配合 httpx.AsyncClient 来模拟 HTTP 请求。这种方式能真实地测试整个请求-响应流程,包括路由、依赖注入、数据库操作等。

测试工具方法表

功能名称调用方法说明
异步客户端httpx.AsyncClient(app=app, base_url="http://test")创建一个直接调用 FastAPI 应用的客户端,无需启动服务器
固定装置@pytest.fixture用于提供测试所需的资源,如数据库会话、应用实例等
异步测试函数@pytest.mark.asyncio标记测试函数为异步,确保 pytest 能正确运行

下面是一个简单的测试示例,测试我们的 Hello World 接口:

python
# tests/test_main.py
import pytest  # 导入 pytest 框架
from httpx import AsyncClient  # 导入异步 HTTP 客户端
from app.main import app  # 导入我们的 FastAPI 应用实例


@pytest.mark.asyncio  # 标记这个测试函数是异步的
async def test_hello_world():
    """
    测试根路径的 Hello World 接口
    """
    # 创建一个异步客户端,直接连接到我们的应用
    async with AsyncClient(app=app, base_url="http://test") as ac:
        # 发送 GET 请求到根路径
        response = await ac.get("/")
        # 断言响应状态码是 200
        assert response.status_code == 200
        # 断言响应 JSON 内容符合预期
        assert response.json() == {"message": "Hello World"}

这段代码展示了如何编写一个基本的异步测试。通过 AsyncClient,我们可以直接调用应用而不需要启动实际的服务器,这使得测试快速且可靠。

11.2 测试数据库:使用内存 SQLite 隔离测试

在测试过程中,我们不希望污染生产数据库,因此通常会使用一个独立的测试数据库。对于快速测试,内存中的 SQLite 数据库是个不错的选择,因为它启动快、隔离性好,测试结束后自动销毁。

测试数据库方法表

功能名称调用方法说明
内存数据库引擎create_async_engine("sqlite+aiosqlite:///:memory:")创建一个内存中的 SQLite 数据库引擎
表结构创建Base.metadata.create_all(engine)在测试开始前创建所有表结构
会话覆盖app.dependency_overrides[get_db] = override_get_db覆盖应用中的数据库依赖,使用测试数据库

首先,我们需要创建一个测试专用的数据库依赖:

python
# tests/conftest.py
import pytest  # 导入 pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession  # 导入异步引擎和会话
from sqlalchemy.orm import sessionmaker  # 导入会话工厂
from app.core.database import Base  # 导入我们的模型基类
from app.main import app  # 导入应用
from app.core.database import get_db  # 导入原始的数据库依赖


# 创建内存 SQLite 引擎
SQLALCHEMY_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=True)
TestingSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


@pytest.fixture(scope="session", autouse=True)
async def setup_test_db():
    """
    在测试会话开始时创建所有表
    """
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # 测试结束后清理(内存数据库会自动清理)


@pytest.fixture
async def db_session():
    """
    为每个测试提供一个新的数据库会话
    """
    async with TestingSessionLocal() as session:
        yield session


async def override_get_db():
    """
    覆盖原始的 get_db 依赖
    """
    async with TestingSessionLocal() as session:
        yield session


# 在测试期间覆盖数据库依赖
app.dependency_overrides[get_db] = override_get_db

这样,所有使用 Depends(get_db) 的路由在测试时都会自动使用内存数据库。

11.3 调试技巧:日志输出 SQL 语句、Redis 操作

调试异步应用时,查看 SQL 语句和 Redis 操作非常重要。我们可以通过配置日志级别来实现这一点。

调试方法表

功能名称调用方法说明
SQL 日志create_async_engine(..., echo=True)在引擎创建时启用 SQL 日志输出
SQLAlchemy 日志设置 logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)控制 SQLAlchemy 引擎日志级别
Redis 日志在 Redis 客户端初始化时设置 decode_responses=True 并启用调试日志查看 Redis 命令和响应

在开发环境中,我们可以在数据库配置中启用 SQL 日志:

python
# app/core/database.py
from sqlalchemy.ext.asyncio import create_async_engine
import os

# 在开发环境中启用 SQL 日志
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_async_engine(
    DATABASE_URL,
    echo=True if os.getenv("ENV") == "development" else False,  # 根据环境变量决定是否输出 SQL
    pool_pre_ping=True,
)

对于 Redis,我们可以在初始化客户端时添加一些调试信息:

python
# app/core/redis.py
import redis.asyncio as redis
import logging

logger = logging.getLogger(__name__)

class RedisClient:
    def __init__(self, host: str, port: int, db: int):
        self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
    
    async def get(self, key: str):
        logger.debug(f"Redis GET: {key}")  # 记录 GET 操作
        return await self.client.get(key)
    
    async def set(self, key: str, value: str, expire: int = None):
        logger.debug(f"Redis SET: {key} = {value}, expire={expire}")  # 记录 SET 操作
        return await self.client.set(key, value, ex=expire)

11.4 性能分析:使用 async-profiler 或日志计时

性能分析对于优化异步应用至关重要。我们可以使用简单的日志计时来测量关键操作的执行时间。

性能分析方法表

功能名称调用方法说明
简单计时装饰器自定义 @timing_decorator测量函数执行时间并记录日志
异步上下文管理器自定义 async with timing_context()测量代码块执行时间
专业分析工具py-spyasync-profiler生成火焰图等详细性能分析

下面是一个简单的异步计时装饰器:

python
# app/core/utils.py
import time
import functools
import asyncio
import logging

logger = logging.getLogger(__name__)

def timing_decorator(func):
    """
    异步函数计时装饰器
    """
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            return result
        finally:
            end_time = time.time()
            logger.info(f"{func.__name__} took {end_time - start_time:.4f} seconds")
    return wrapper

# 使用示例
@timing_decorator
async def slow_database_operation(user_id: int):
    """
    模拟一个慢速数据库操作
    """
    await asyncio.sleep(0.1)  # 模拟延迟
    return {"user_id": user_id, "status": "processed"}

对于更复杂的性能分析,可以使用 py-spy 工具:

bash
# 安装 py-spy
pip install py-spy

# 在运行应用时进行采样分析
py-spy top --pid <your-app-pid>

# 生成火焰图
py-spy record -o profile.svg --pid <your-app-pid>

这些工具可以帮助你识别性能瓶颈,特别是在处理大量并发请求时。