11.1 单元测试:pytest + httpx.AsyncClient
在 FastAPI 异步项目中,我们通常使用 pytest 作为测试框架,配合 httpx.AsyncClient 来模拟 HTTP 请求。这种方式能真实地测试整个请求-响应流程,包括路由、依赖注入、数据库操作等。
测试工具方法表
| 功能名称 | 调用方法 | 说明 |
|---|---|---|
| 异步客户端 | httpx.AsyncClient(app=app, base_url="http://test") | 创建一个直接调用 FastAPI 应用的客户端,无需启动服务器 |
| 固定装置 | @pytest.fixture | 用于提供测试所需的资源,如数据库会话、应用实例等 |
| 异步测试函数 | @pytest.mark.asyncio | 标记测试函数为异步,确保 pytest 能正确运行 |
下面是一个简单的测试示例,测试我们的 Hello World 接口:
python
# tests/test_main.py
import pytest # 导入 pytest 框架
from httpx import AsyncClient # 导入异步 HTTP 客户端
from app.main import app # 导入我们的 FastAPI 应用实例
@pytest.mark.asyncio # 标记这个测试函数是异步的
async def test_hello_world():
"""
测试根路径的 Hello World 接口
"""
# 创建一个异步客户端,直接连接到我们的应用
async with AsyncClient(app=app, base_url="http://test") as ac:
# 发送 GET 请求到根路径
response = await ac.get("/")
# 断言响应状态码是 200
assert response.status_code == 200
# 断言响应 JSON 内容符合预期
assert response.json() == {"message": "Hello World"}这段代码展示了如何编写一个基本的异步测试。通过 AsyncClient,我们可以直接调用应用而不需要启动实际的服务器,这使得测试快速且可靠。
11.2 测试数据库:使用内存 SQLite 隔离测试
在测试过程中,我们不希望污染生产数据库,因此通常会使用一个独立的测试数据库。对于快速测试,内存中的 SQLite 数据库是个不错的选择,因为它启动快、隔离性好,测试结束后自动销毁。
测试数据库方法表
| 功能名称 | 调用方法 | 说明 |
|---|---|---|
| 内存数据库引擎 | create_async_engine("sqlite+aiosqlite:///:memory:") | 创建一个内存中的 SQLite 数据库引擎 |
| 表结构创建 | Base.metadata.create_all(engine) | 在测试开始前创建所有表结构 |
| 会话覆盖 | app.dependency_overrides[get_db] = override_get_db | 覆盖应用中的数据库依赖,使用测试数据库 |
首先,我们需要创建一个测试专用的数据库依赖:
python
# tests/conftest.py
import pytest # 导入 pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession # 导入异步引擎和会话
from sqlalchemy.orm import sessionmaker # 导入会话工厂
from app.core.database import Base # 导入我们的模型基类
from app.main import app # 导入应用
from app.core.database import get_db # 导入原始的数据库依赖
# 创建内存 SQLite 引擎
SQLALCHEMY_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=True)
TestingSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@pytest.fixture(scope="session", autouse=True)
async def setup_test_db():
"""
在测试会话开始时创建所有表
"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 测试结束后清理(内存数据库会自动清理)
@pytest.fixture
async def db_session():
"""
为每个测试提供一个新的数据库会话
"""
async with TestingSessionLocal() as session:
yield session
async def override_get_db():
"""
覆盖原始的 get_db 依赖
"""
async with TestingSessionLocal() as session:
yield session
# 在测试期间覆盖数据库依赖
app.dependency_overrides[get_db] = override_get_db这样,所有使用 Depends(get_db) 的路由在测试时都会自动使用内存数据库。
11.3 调试技巧:日志输出 SQL 语句、Redis 操作
调试异步应用时,查看 SQL 语句和 Redis 操作非常重要。我们可以通过配置日志级别来实现这一点。
调试方法表
| 功能名称 | 调用方法 | 说明 |
|---|---|---|
| SQL 日志 | create_async_engine(..., echo=True) | 在引擎创建时启用 SQL 日志输出 |
| SQLAlchemy 日志 | 设置 logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) | 控制 SQLAlchemy 引擎日志级别 |
| Redis 日志 | 在 Redis 客户端初始化时设置 decode_responses=True 并启用调试日志 | 查看 Redis 命令和响应 |
在开发环境中,我们可以在数据库配置中启用 SQL 日志:
python
# app/core/database.py
from sqlalchemy.ext.asyncio import create_async_engine
import os
# 在开发环境中启用 SQL 日志
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_async_engine(
DATABASE_URL,
echo=True if os.getenv("ENV") == "development" else False, # 根据环境变量决定是否输出 SQL
pool_pre_ping=True,
)对于 Redis,我们可以在初始化客户端时添加一些调试信息:
python
# app/core/redis.py
import redis.asyncio as redis
import logging
logger = logging.getLogger(__name__)
class RedisClient:
def __init__(self, host: str, port: int, db: int):
self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
async def get(self, key: str):
logger.debug(f"Redis GET: {key}") # 记录 GET 操作
return await self.client.get(key)
async def set(self, key: str, value: str, expire: int = None):
logger.debug(f"Redis SET: {key} = {value}, expire={expire}") # 记录 SET 操作
return await self.client.set(key, value, ex=expire)11.4 性能分析:使用 async-profiler 或日志计时
性能分析对于优化异步应用至关重要。我们可以使用简单的日志计时来测量关键操作的执行时间。
性能分析方法表
| 功能名称 | 调用方法 | 说明 |
|---|---|---|
| 简单计时装饰器 | 自定义 @timing_decorator | 测量函数执行时间并记录日志 |
| 异步上下文管理器 | 自定义 async with timing_context() | 测量代码块执行时间 |
| 专业分析工具 | py-spy 或 async-profiler | 生成火焰图等详细性能分析 |
下面是一个简单的异步计时装饰器:
python
# app/core/utils.py
import time
import functools
import asyncio
import logging
logger = logging.getLogger(__name__)
def timing_decorator(func):
"""
异步函数计时装饰器
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
end_time = time.time()
logger.info(f"{func.__name__} took {end_time - start_time:.4f} seconds")
return wrapper
# 使用示例
@timing_decorator
async def slow_database_operation(user_id: int):
"""
模拟一个慢速数据库操作
"""
await asyncio.sleep(0.1) # 模拟延迟
return {"user_id": user_id, "status": "processed"}对于更复杂的性能分析,可以使用 py-spy 工具:
bash
# 安装 py-spy
pip install py-spy
# 在运行应用时进行采样分析
py-spy top --pid <your-app-pid>
# 生成火焰图
py-spy record -o profile.svg --pid <your-app-pid>这些工具可以帮助你识别性能瓶颈,特别是在处理大量并发请求时。