AioTest 事件系统文档¶
目录¶
概述¶
AioTest 事件系统是一个强大的异步事件处理框架,支持基于优先级的事件处理器执行、线程安全操作、装饰器式注册等特性。
主要特性¶
- ✅ 优先级执行 - 支持按优先级顺序执行事件处理器
- ✅ 并发安全 - 使用
asyncio.Lock确保在异步上下文中的并发安全 - ✅ 装饰器注册 - 支持声明式的事件处理器注册
- ✅ 同步/异步混合 - 同时支持同步和异步事件处理器
- ✅ 批量触发 - 支持批量触发事件以提高性能
- ✅ 异常隔离 - 单个处理器的异常不会影响其他处理器
- ✅ 超时保护 - 内置处理器超时保护机制(默认 5 秒)
核心概念¶
EventHook¶
EventHook 是单个事件钩子,负责管理特定类型的事件处理器。
特点¶
- 维护一个处理器列表,每个处理器包含优先级和可调用对象
- 使用
asyncio.Lock确保异步上下文中的并发安全 - 支持装饰器和手动两种注册方式
- 注意:仅在单线程 asyncio 事件循环中安全,不支持多线程环境
Events¶
Events 是集中式事件系统管理器,负责创建和管理多个 EventHook 实例。
特点¶
- 自动创建事件钩子(通过
__getattr__) - 统一管理所有事件的生命周期
- 支持注册装饰器定义的处理器
预定义事件¶
系统预定义了以下核心测试生命周期事件:
| 事件名称 | 说明 | 触发时机 |
|---|---|---|
init_events |
初始化事件 | 系统初始化时 |
test_start |
测试开始事件 | 测试开始时 |
test_stop |
测试停止事件 | 测试停止时 |
test_quit |
测试退出事件 | 测试退出时 |
startup_completed |
启动完成事件 | 所有用户启动完成后 |
request_metrics |
请求指标事件 | 每次请求完成后 |
worker_request_metrics |
Worker 请求指标事件 | Master 接收 Worker 指标时 |
快速开始¶
方式一:使用装饰器注册(推荐)¶
from aiotest import test_start
@test_start.handler(priority=0)
async def my_test_start_handler(**kwargs):
print("测试开始!")
方式二:手动注册¶
from aiotest import test_start
async def my_handler(**kwargs):
print("测试开始!")
await test_start.add_handler(my_handler, priority=0)
运行示例¶
API 参考¶
EventHook¶
handler(priority: int = 0)¶
装饰器方法,用于声明式注册事件处理器。
参数¶
priority(int): 优先级,数值越大优先级越高,默认为 0
返回¶
装饰器函数
示例¶
async add_handler(handler: Callable, priority: int = 0) -> None¶
添加事件处理器。
参数¶
handler(Callable): 事件处理器函数priority(int): 处理器的优先级,数值越高越先执行,默认为 0
异常¶
TypeError: 如果 handler 不是可调用对象
示例¶
async remove_handler(handler: Callable) -> None¶
移除事件处理器。
参数¶
handler(Callable): 要移除的事件处理器函数
示例¶
async fire(**kwargs: Any) -> None¶
触发事件,执行所有注册的处理器。
参数¶
**kwargs(Any): 传递给事件处理器的参数
示例¶
async fire_batch(events: List[Dict[str, Any]]) -> None¶
批量触发事件,提高性能。
参数¶
events(List[Dict[str, Any]]): 事件字典列表
异常¶
RuntimeError: 如果任务数量超过限制(1000)
示例¶
events = [
{"user_id": 1, "action": "login"},
{"user_id": 2, "action": "logout"}
]
await event_hook.fire_batch(events)
async register_pending_handlers() -> int¶
注册所有通过装饰器定义的待注册处理器。
返回¶
注册的处理器数量
示例¶
Events¶
register(name: str) -> EventHook¶
注册新事件类型。
参数¶
name(str): 事件类型名称
返回¶
EventHook 实例
示例¶
async register_all_pending_handlers() -> int¶
注册所有事件钩子的待注册处理器。
返回¶
注册的处理器总数
示例¶
__getattr__(name: str) -> EventHook¶
获取事件钩子,如果不存在则自动注册。
参数¶
name(str): 事件钩子名称
返回¶
EventHook 实例
示例¶
使用示例¶
示例 1:基础装饰器注册¶
from aiotest import test_start, test_stop
@test_start.handler(priority=0)
async def on_test_start(**kwargs):
print("测试开始执行")
@test_stop.handler(priority=0)
async def on_test_stop(**kwargs):
print("测试执行结束")
示例 2:优先级执行¶
from aiotest import request_metrics
@request_metrics.handler(priority=10) # 最高优先级
async def log_slow_requests(**kwargs):
duration = kwargs.get('duration', 0)
if duration > 1000:
print(f"慢请求: {duration}ms")
@request_metrics.handler(priority=5) # 中等优先级
async def log_errors(**kwargs):
status = kwargs.get('status_code', 0)
if status >= 400:
print(f"错误请求: {status}")
@request_metrics.handler(priority=0) # 默认优先级
async def log_all_requests(**kwargs):
print(f"请求: {kwargs.get('request_id')}")
示例 3:手动注册处理器¶
from aiotest import test_start
async def setup_test(**kwargs):
print("设置测试环境")
async def initialize_data(**kwargs):
print("初始化测试数据")
# 在初始化阶段注册
await test_start.add_handler(setup_test, priority=10)
await test_start.add_handler(initialize_data, priority=5)
示例 4:动态创建自定义事件¶
from aiotest import events
# 创建自定义事件
user_event = events.register("user_login")
@user_event.handler(priority=0)
async def on_user_login(**kwargs):
user_id = kwargs.get('user_id')
print(f"用户 {user_id} 登录")
# 触发自定义事件
await user_event.fire(user_id=123, timestamp="2024-01-01")
示例 5:同步和异步处理器混合¶
from aiotest import test_start
# 异步处理器
@test_start.handler(priority=10)
async def async_setup(**kwargs):
await asyncio.sleep(0.1)
print("异步设置完成")
# 同步处理器
@test_start.handler(priority=5)
def sync_setup(**kwargs):
print("同步设置完成")
示例 6:批量触发事件¶
from aiotest import events
# 创建批量处理事件
batch_event = events.register("batch_process")
@batch_event.handler(priority=0)
async def process_item(**kwargs):
item_id = kwargs.get('item_id')
# 处理单个项目
print(f"处理项目 {item_id}")
# 批量触发
items = [{"item_id": i} for i in range(100)]
await batch_event.fire_batch(items)
示例 7:移除处理器¶
from aiotest import test_start
async def temp_handler(**kwargs):
print("临时处理器")
# 注册处理器
await test_start.add_handler(temp_handler, priority=0)
# 触发事件(会调用 temp_handler)
await test_start.fire()
# 移除处理器
await test_start.remove_handler(temp_handler)
# 触发事件(不会调用 temp_handler)
await test_start.fire()
最佳实践¶
1. 使用装饰器优先¶
推荐使用装饰器方式注册事件处理器,代码更清晰易读:
# ✅ 推荐
@test_start.handler(priority=0)
async def on_test_start(**kwargs):
print("测试开始")
# ❌ 不推荐(除非需要动态注册)
async def on_test_start(**kwargs):
print("测试开始")
await test_start.add_handler(on_test_start)
2. 合理设置优先级¶
根据业务逻辑设置合理的优先级:
# 验证逻辑应该先执行
@test_start.handler(priority=20)
async def validate(**kwargs):
if not validate_test(**kwargs):
raise ValueError("测试配置无效")
# 初始化操作中等优先级
@test_start.handler(priority=10)
async def setup(**kwargs):
setup_test_env(**kwargs)
# 日志记录最后执行
@test_start.handler(priority=0)
async def log(**kwargs):
logger.info("测试开始")
3. 处理器应该幂等¶
确保事件处理器可以安全地被多次调用:
# ✅ 好的实践 - 幂等
@request_metrics.handler(priority=0)
async def log_request(**kwargs):
request_id = kwargs.get('request_id')
logger.info(f"请求完成: {request_id}")
# ❌ 坏的实践 - 有副作用
@request_metrics.handler(priority=0)
async def log_request(**kwargs):
global counter
counter += 1 # 可能导致计数不准确
4. 使用参数传递上下文信息¶
通过 **kwargs 传递事件相关的上下文信息:
@request_metrics.handler(priority=10)
async def monitor_performance(**kwargs):
duration = kwargs.get('duration')
endpoint = kwargs.get('endpoint')
if duration > 1000:
logger.warning(f"慢请求: {endpoint}, 耗时: {duration}ms")
5. 处理器应该捕获自己的异常¶
虽然系统会捕获处理器异常,但建议在处理器内部处理预期内的异常:
@test_start.handler(priority=0)
async def setup_database(**kwargs):
try:
await connect_database()
except DatabaseError as e:
logger.error(f"数据库连接失败: {e}")
# 不要抛出异常,避免影响其他处理器
6. 避免长时间运行的处理器¶
系统默认超时时间为 5 秒,避免处理器执行时间过长:
# ❌ 坏的实践 - 可能超时
@test_start.handler(priority=0)
async def load_large_data(**kwargs):
data = await download_large_file() # 可能超过 5 秒
# ✅ 好的实践 - 使用后台任务
@test_start.handler(priority=0)
async def start_background_task(**kwargs):
asyncio.create_task(download_large_data())
7. 使用去重机制¶
系统会自动去重相同的处理器对象(使用 is 比较):
# 安全地多次注册同一个处理器
await test_start.add_handler(my_handler)
await test_start.add_handler(my_handler) # 不会重复注册
注意: 去重对 lambda 函数无效,每次调用 lambda 都会创建新对象:
# ❌ 会被重复注册 3 次
await test_start.add_handler(lambda **kwargs: print("1"))
await test_start.add_handler(lambda **kwargs: print("1"))
await test_start.add_handler(lambda **kwargs: print("1"))
# ✅ 使用命名函数可以正确去重
async def my_handler(**kwargs):
print("1")
await test_start.add_handler(my_handler)
await test_start.add_handler(my_handler) # 不会重复注册
常见问题¶
Q1: 装饰器和手动注册可以混合使用吗?¶
A: 可以。两种注册方式完全兼容,处理器会按优先级统一执行。
# 装饰器注册
@test_start.handler(priority=10)
async def handler1(**kwargs):
pass
# 手动注册
await test_start.add_handler(handler2, priority=5)
Q2: 处理器抛出异常会影响其他处理器吗?¶
A: 不会。系统会捕获每个处理器的异常,继续执行其他处理器。
@test_start.handler(priority=10)
async def handler_with_error(**kwargs):
raise ValueError("This won't affect others")
@test_start.handler(priority=5)
async def handler_normal(**kwargs):
print("This will still execute") # 仍然会执行
Q3: 如何确保处理器按特定顺序执行?¶
A: 使用优先级参数,数值越大越先执行。
@test_start.handler(priority=20)
async def first(**kwargs):
print("First")
@test_start.handler(priority=10)
async def second(**kwargs):
print("Second")
@test_start.handler(priority=0)
async def third(**kwargs):
print("Third")
Q4: 同步和异步处理器可以混用吗?¶
A: 可以。系统会自动识别处理器类型并正确执行。
@test_start.handler(priority=10)
async def async_handler(**kwargs):
await asyncio.sleep(0.1)
@test_start.handler(priority=5)
def sync_handler(**kwargs):
print("Sync handler")
Q5: 如何移除已注册的处理器?¶
A: 使用 remove_handler 方法,传入同一个处理器对象。
async def my_handler(**kwargs):
pass
await test_start.add_handler(my_handler)
await test_start.remove_handler(my_handler)
Q6: 批量触发事件有什么限制?¶
A: 单次批量触发的任务总数不能超过 1000,防止系统过载。
# ✅ 正常
events = [{"id": i} for i in range(500)]
await event_hook.fire_batch(events)
# ❌ 会抛出 RuntimeError
events = [{"id": i} for i in range(1010)]
await event_hook.fire_batch(events)
Q7: 如何获取当前已注册的处理器数量?¶
A: 可以直接访问 _handlers 属性(不推荐用于生产代码)。
event_hook = EventHook()
await event_hook.add_handler(handler1)
await event_hook.add_handler(handler2)
print(len(event_hook._handlers)) # 输出: 2
Q8: 处理器的超时时间可以修改吗?¶
A: 当前版本超时时间固定为 5 秒,如需修改需要修改源码中的 _safe_execute 方法。
async def _safe_execute(self, handler, **kwargs):
try:
# 可以修改这里的超时时间
await asyncio.wait_for(handler(**kwargs), timeout=5.0)
except asyncio.TimeoutError:
# ...
Q9: 可以在处理器中修改处理器列表吗?¶
A: 不推荐在处理器执行期间添加或移除处理器,可能导致不可预测的行为。建议在事件触发前完成所有处理器注册。
Q10: 事件系统是线程安全的吗?¶
A: 不是。事件系统使用 asyncio.Lock 只保证在单线程 asyncio 事件循环中的并发安全,不支持多线程环境。
如果需要在多线程环境中使用,请确保:
- 所有事件操作都在同一个线程中执行
- 或者使用
threading.Lock等多线程同步机制(需要自行实现)
技术说明¶
并发模型¶
AioTest 事件系统基于 asyncio 协程,设计用于单线程异步编程模型:
- 适用场景: 单线程、高并发的异步应用
- 不适用场景: 多线程环境或需要跨线程同步的场景
安全保证¶
在 asyncio 单线程事件循环中:
- ✅ 多个协程并发添加/移除处理器 - 安全
- ✅ 多个协程同时触发事件 - 安全
- ❌ 多线程同时操作事件系统 - 不安全
如果确实需要在多线程中使用,需要额外实现多线程同步机制。
性能考虑¶
处理器数量建议¶
- 推荐: 每个事件的处理器数量 < 20
- 警告: 每个事件的处理器数量 > 50 可能影响性能
- 限制: 单次批量触发任务数 ≤ 1000
性能优化建议¶
- 减少同步处理器 - 同步处理器会被放到执行器中运行,性能较低
- 合理使用批量触发 - 对于大量事件,使用
fire_batch提高性能 - 避免复杂计算 - 将复杂计算放到后台任务中
- 控制处理器数量 - 避免在一个事件上注册过多处理器
相关资源¶
- 完整示例代码: examples/events_example.py
- 测试用例: tests/test_events.py
- 项目主文档: README.md