Skip to content

AioTest 最佳实践指南

本指南提供了 AioTest 负载测试的最佳实践,帮助您设计高效、可靠的负载测试。

📋 目录


负载测试设计

1.1 确定测试目标

在设计负载测试之前,明确测试目标至关重要:

性能基准测试

  • 目标:建立系统性能基准
  • 策略:逐步增加负载,记录性能指标
  • 示例:从10用户开始,每30秒增加10用户,直到500用户

容量规划测试

  • 目标:确定系统最大承载能力
  • 策略:持续增加负载直到系统饱和
  • 示例:从100用户开始,每分钟增加50用户,直到响应时间超过阈值

稳定性测试

  • 目标:验证系统长时间运行的稳定性
  • 策略:保持中等负载运行较长时间
  • 示例:保持200用户负载运行24小时

压力测试

  • 目标:发现系统极限和故障点
  • 策略:快速增加负载到极限
  • 示例:从500用户开始,每10秒增加100用户

1.2 设计真实的负载模式

避免使用不切实际的负载模式:

好的实践

class RealisticLoadShape(LoadUserShape):
    def tick(self):
        run_time = self.get_run_time()

        # 模拟真实用户行为:缓慢增长,波动
        if run_time < 300:  # 前5分钟
            return (50, 2)   # 缓慢启动
        elif run_time < 1800:  # 5-30分钟
            # 模拟业务高峰期
            import math
            base = 100
            wave = 50 * math.sin(2 * math.pi * (run_time - 300) / 300)
            return (int(base + wave), 5)
        else:
            return None

避免的实践

# 避免:瞬间达到最大负载

class BadLoadShape(LoadUserShape):
    def tick(self):
        return (10000, 1000)  # 不切实际的瞬间高负载

1.3 设计多样化的用户行为

创建不同类型的用户,模拟真实场景:

class BrowsingUser(HttpUser):
    """浏览型用户:主要进行读取操作"""
    weight = 3
    host = "https://api.example.com"
            wait_time = (2, 5)
            execution_mode = ExecutionMode.CONCURRENT

    @weight(5)
    async def test_view_products(self):
        async with self.client.get("/products") as resp:
            assert resp.status == 200

    @weight(3)
    async def test_view_product_detail(self):
        async with self.client.get("/products/123") as resp:
            assert resp.status == 200

class ShoppingUser(HttpUser):
    """购物型用户:包含写入操作"""
    weight = 1
    host = "https://api.example.com"
            wait_time = (1, 3)
            execution_mode = ExecutionMode.CONCURRENT

    @weight(2)
    async def test_add_to_cart(self):
        data = {"product_id": 123, "quantity": 1}
        async with self.client.post("/cart", json=data) as resp:
            assert resp.status == 201

    @weight(1)
    async def test_checkout(self):
        async with self.client.post("/checkout") as resp:
            assert resp.status == 200

class AdminUser(HttpUser):
    """管理型用户:执行管理操作"""
    weight = 1
    host = "https://api.example.com"
    wait_time = (5, 10)

    async def test_view_analytics(self):
        async with self.client.get("/admin/analytics") as resp:
            assert resp.status == 200

参数调优

2.1 用户数量调优

原则:从少量用户开始,逐步增加

class OptimizedLoadShape(LoadUserShape):
    def __init__(self):
        super().__init__()
        self.stages = [
            (60, 10, 2),    # 1分钟:10用户,每秒2个
            (120, 50, 5),   # 2分钟:50用户,每秒5个
            (180, 100, 10),  # 3分钟:100用户,每秒10个
            (240, 200, 20),  # 4分钟:200用户,每秒20个
            (300, 500, 50),  # 5分钟:500用户,每秒50个
        ]

    def tick(self):
        run_time = self.get_run_time()
        for duration, user_count, rate in self.stages:
            if run_time < duration:
                return (user_count, rate)
        return None

调优建议

  • 初始阶段:10-50用户,验证系统稳定性
  • 中等阶段:50-200用户,观察性能变化
  • 高负载阶段:200-500用户,测试系统极限
  • 极限阶段:500+用户,发现系统瓶颈

2.2 等待时间调优

原则:根据实际业务场景设置合理的等待时间

class RealisticWaitTimeUser(HttpUser):
    """根据业务场景设置等待时间"""

    # 固定等待时间:适合稳定的测试场景
    wait_time = 2.0

    # 随机等待时间:模拟真实用户行为
    wait_time = (1.0, 3.0)

    # 动态等待时间:根据系统负载调整
    def get_wait_time(self):
        import random
        # 根据时间调整:白天等待时间短,夜间等待时间长
        import datetime
        hour = datetime.datetime.now().hour
        if 9 <= hour <= 18:  # 工作时间
            return random.uniform(0.5, 2.0)
        else:  # 非工作时间
            return random.uniform(2.0, 5.0)

    wait_time = get_wait_time

调优建议

  • 保守估计:设置较长的等待时间,避免系统过载
  • 激进测试:设置较短的等待时间,测试系统极限
  • 真实模拟:根据实际用户行为设置等待时间分布

2.3 并发控制调优

原则:根据系统资源和业务需求调整并发度

class ConcurrentOptimizedUser(HttpUser):
    execution_mode = ExecutionMode.CONCURRENT

    # 保守设置:适合资源有限的系统
    max_concurrent_tasks = 5
    wait_time = 1.0

    # 激进设置:适合高性能系统
    max_concurrent_tasks = 50
    wait_time = 0.1

    # 自适应设置:根据系统负载动态调整
    def get_max_concurrent_tasks(self):
        # 根据时间调整并发度
        import datetime
        hour = datetime.datetime.now().hour
        if 9 <= hour <= 18:  # 高峰期
            return 50
        else:  # 低峰期
            return 20

    max_concurrent_tasks = get_max_concurrent_tasks

调优建议

  • 从低并发开始:5-10个并发任务
  • 逐步增加:每次增加5-10个并发任务
  • 监控系统资源:CPU、内存、网络使用率
  • 观察响应时间:确保响应时间在可接受范围内

性能优化

3.1 异步 I/O 优化

原则:充分利用异步 I/O,避免阻塞操作

class AsyncOptimizedUser(HttpUser):
    async def test_multiple_apis(self):
        """使用异步并发调用多个 API"""
        import asyncio

        # 好的实践:使用 asyncio.gather 并发调用
        tasks = [
            self.client.get("/api1"),
            self.client.get("/api2"),
            self.client.get("/api3")
        ]
        responses = await asyncio.gather(*tasks)

        for resp in responses:
            assert resp.status == 200

    # 避免的实践:顺序调用导致性能损失
    async def test_sequential_apis(self):
        # 不推荐:顺序调用
        await self.client.get("/api1")
        await self.client.get("/api2")
        await self.client.get("/api3")

3.2 连接池优化

原则:合理配置连接池,避免连接开销

class ConnectionPoolOptimizedUser(HttpUser):
    async def on_start(self):
        """配置连接池参数"""
        await super().on_start()

        # 配置连接池
        self.client.connector = aiohttp.TCPConnector(
            limit=100,              # 最大连接数
            limit_per_host=50,      # 每个主机的最大连接数
            force_close=False,        # 不强制关闭连接
            enable_cleanup_closed=True  # 清理已关闭的连接
        )

调优建议

  • 连接池大小:设置为最大并发任务数的1.5-2倍
  • 每主机连接数:根据目标服务器性能调整
  • 连接复用:启用连接复用,减少连接建立开销

3.3 内存管理优化

原则:及时清理资源,避免内存泄漏

class MemoryOptimizedUser(HttpUser):
    async def on_start(self):
        """初始化资源"""
        self.cache = {}
        self.db_connection = await self.create_db_connection()

    async def test_api(self):
        """使用缓存减少重复计算"""
        cache_key = "api_data"

        # 检查缓存
        if cache_key in self.cache:
            data = self.cache[cache_key]
        else:
            async with self.client.get("/api") as resp:
                data = await resp.json()

            # 更新缓存
            self.cache[cache_key] = data

            # 限制缓存大小
            if len(self.cache) > 100:
                # 清理最旧的缓存项
                oldest_key = next(iter(self.cache))
                del self.cache[oldest_key]

    async def on_stop(self):
        """清理资源"""
        # 清理缓存
        if hasattr(self, 'cache'):
            self.cache.clear()

        # 关闭数据库连接
        if hasattr(self, 'db_connection'):
            await self.db_connection.close()

调优建议

  • 定期清理:在 on_stop 中清理所有资源
  • 限制缓存大小:避免缓存占用过多内存
  • 使用弱引用:对于大型缓存,考虑使用弱引用

分布式测试最佳实践

4.1 Master-Worker 架构设计

原则:合理规划 Master 和 Worker 的数量和分布

# Master 节点配置

master_config = {
    'prometheus_port': 8089,
    'expect_workers': 3,  # 期望的 Worker 数量
}

# Worker 节点配置

worker_config = {
    'prometheus_port': 8090,  # 使用不同的端口
    'heartbeat_interval': 5.0,  # 心跳间隔
    'metrics_batch_size': 100,  # 指标批量大小
}

最佳实践

  • Master 节点:1个,负责协调和监控
  • Worker 节点:3-5个,根据测试规模调整
  • 网络配置:确保 Master 和 Worker 之间的网络稳定
  • 资源分配:每个 Worker 分配足够的资源

监控和调优

5.1 关键指标监控

原则:监控关键指标,及时发现性能问题

重要指标

  1. 响应时间

  2. 平均响应时间

  3. P95、P99 响应时间
  4. 响应时间分布

  5. 吞吐量

  6. 每秒请求数(RPS)

  7. 峰值吞吐量

  8. 错误率

  9. HTTP 错误率

  10. 业务错误率
  11. 超时率

  12. 资源使用

  13. CPU 使用率

  14. 内存使用量
  15. 网络带宽

5.2 性能瓶颈分析

原则:系统化分析性能瓶颈

分析步骤

  1. 识别瓶颈

  2. 检查响应时间增长趋势

  3. 分析错误率变化
  4. 观察资源使用情况

  5. 定位瓶颈

  6. CPU 瓶颈:CPU 使用率持续接近100%

  7. 内存瓶颈:内存使用量持续增长
  8. I/O 瓶颈:磁盘 I/O 或网络 I/O 饱和
  9. 数据库瓶颈:数据库查询响应时间长

提示: 负载测试是一个持续优化的过程,根据实际业务需求和系统特点不断调整测试策略。