Locust 的 Event Hook(事件鉤子) 是一套強大的機制,讓我們可以在測試生命週期的特定時刻執行自定義程式碼。透過 Event Hook,我們可以實現進階監控、自定義統計、外部系統整合等功能。
Event Hook 是 Locust 提供的回調機制,當特定事件發生時(如測試開始、請求完成、錯誤發生),Locust 會自動調用我們預先註冊的處理函數。
from locust import HttpUser, task, between, events
import time
import logging
# 測試初始化事件
@events.init.add_listener
def on_locust_init(environment, **kwargs):
"""Locust 初始化時觸發"""
print("🚀 Locust 正在初始化...")
print(f"Runner 類型: {environment.runner.__class__.__name__}")
# 測試開始事件
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
"""測試開始時觸發"""
print("✅ 測試開始執行")
print(f"目標主機: {environment.host}")
# 記錄測試開始時間
environment.test_start_time = time.time()
# 測試結束事件
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
"""測試結束時觸發"""
print("🛑 測試執行結束")
# 計算總測試時間
if hasattr(environment, 'test_start_time'):
duration = time.time() - environment.test_start_time
print(f"總測試時間: {duration:.2f} 秒")
# 請求成功事件
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, **kwargs):
"""每個請求完成時觸發"""
print(f"✅ 請求成功: {request_type} {name} - {response_time}ms")
# 請求失敗事件
@events.request.add_listener
def on_request_failure(request_type, name, response_time, response_length, exception, **kwargs):
"""請求失敗時觸發"""
print(f"❌ 請求失敗: {request_type} {name} - {exception}")
class EventHookUser(HttpUser):
wait_time = between(1, 3)
host = "http://httpbin.org"
@task
def test_get(self):
"""測試 GET 請求"""
self.client.get("/get")
@task
def test_post(self):
"""測試 POST 請求"""
self.client.post("/post", json={"data": "test"})
@task
def test_status_codes(self):
"""測試不同狀態碼"""
# 隨機測試不同的 HTTP 狀態碼
import random
status_code = random.choice([200, 404, 500])
self.client.get(f"/status/{status_code}")
# ✅ 好的做法:輕量級處理
@events.request.add_listener
def lightweight_handler(request_type, name, response_time, **kwargs):
# 快速記錄,避免阻塞測試執行
metrics_queue.put((request_type, name, response_time))
# ❌ 避免的做法:重量級處理
@events.request.add_listener
def heavy_handler(request_type, name, response_time, **kwargs):
# 避免在事件處理器中進行重量級操作
complex_calculation(response_time) # 這會影響測試性能
send_to_external_api(data) # 這會導致網路延遲
@events.request.add_listener
def safe_event_handler(request_type, name, response_time, **kwargs):
try:
# 事件處理邏輯
process_request_data(request_type, name, response_time)
except Exception as e:
# 記錄錯誤但不中斷測試
logger.error(f"Event handler error: {e}")
import threading
from queue import Queue
# 使用背景線程處理重量級任務
class BackgroundProcessor:
def __init__(self):
self.queue = Queue()
self.worker_thread = threading.Thread(target=self._process_worker)
self.worker_thread.daemon = True
self.is_running = False
def start(self):
self.is_running = True
self.worker_thread.start()
def stop(self):
self.is_running = False
def add_task(self, task_data):
self.queue.put(task_data)
def _process_worker(self):
while self.is_running:
try:
if not self.queue.empty():
task = self.queue.get_nowait()
self._process_heavy_task(task)
else:
time.sleep(0.1)
except Exception as e:
logger.error(f"Background processor error: {e}")
def _process_heavy_task(self, task_data):
# 執行重量級任務
pass
# 全局處理器
background_processor = BackgroundProcessor()
@events.test_start.add_listener
def start_background_processor(environment, **kwargs):
background_processor.start()
@events.test_stop.add_listener
def stop_background_processor(environment, **kwargs):
background_processor.stop()
@events.request.add_listener
def queue_heavy_processing(request_type, name, response_time, **kwargs):
# 將重量級任務加入隊列,不阻塞測試執行
background_processor.add_task({
"type": request_type,
"name": name,
"response_time": response_time,
"timestamp": time.time()
})
今天我們深入學習了 Locust Event Hook 的完整應用:
透過 Event Hook,我們可以:
Event Hook 是 Locust 高階應用的核心技術,掌握它能讓你的性能測試更智能、更全面、更符合實際需求。