iT邦幫忙

2025 iThome 鐵人賽

DAY 12
1

Locust 的 Event Hook(事件鉤子) 是一套強大的機制,讓我們可以在測試生命週期的特定時刻執行自定義程式碼。透過 Event Hook,我們可以實現進階監控、自定義統計、外部系統整合等功能。

什麼是 Event Hook?

Event Hook 是 Locust 提供的回調機制,當特定事件發生時(如測試開始、請求完成、錯誤發生),Locust 會自動調用我們預先註冊的處理函數。

Event Hook 的應用場景

  • 自定義統計數據收集
  • 外部監控系統整合(如 Prometheus、InfluxDB)
  • 測試過程中的動態配置調整
  • 故障自動處理和恢復
  • 測試結果的實時分析和警報

基本 Event Hook 使用

1. 導入 events 模組

from locust import HttpUser, task, between, events
import time
import logging

2. 基本事件監聽器

# 測試初始化事件
@events.init.add_listener
def on_locust_init(environment, **kwargs):
    """Locust 初始化時觸發"""
    print("🚀 Locust 正在初始化...")
    print(f"Runner 類型: {environment.runner.__class__.__name__}")

# 測試開始事件
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    """測試開始時觸發"""
    print("✅ 測試開始執行")
    print(f"目標主機: {environment.host}")
    
    # 記錄測試開始時間
    environment.test_start_time = time.time()

# 測試結束事件
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    """測試結束時觸發"""
    print("🛑 測試執行結束")
    
    # 計算總測試時間
    if hasattr(environment, 'test_start_time'):
        duration = time.time() - environment.test_start_time
        print(f"總測試時間: {duration:.2f} 秒")

# 請求成功事件
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, **kwargs):
    """每個請求完成時觸發"""
    print(f"✅ 請求成功: {request_type} {name} - {response_time}ms")

# 請求失敗事件
@events.request.add_listener
def on_request_failure(request_type, name, response_time, response_length, exception, **kwargs):
    """請求失敗時觸發"""
    print(f"❌ 請求失敗: {request_type} {name} - {exception}")

3. 基本測試用戶類

class EventHookUser(HttpUser):
    wait_time = between(1, 3)
    host = "http://httpbin.org"
    
    @task
    def test_get(self):
        """測試 GET 請求"""
        self.client.get("/get")
    
    @task
    def test_post(self):
        """測試 POST 請求"""
        self.client.post("/post", json={"data": "test"})
    
    @task
    def test_status_codes(self):
        """測試不同狀態碼"""
        # 隨機測試不同的 HTTP 狀態碼
        import random
        status_code = random.choice([200, 404, 500])
        self.client.get(f"/status/{status_code}")

最佳實踐

1. Event Hook 設計原則

# ✅ 好的做法:輕量級處理
@events.request.add_listener
def lightweight_handler(request_type, name, response_time, **kwargs):
    # 快速記錄,避免阻塞測試執行
    metrics_queue.put((request_type, name, response_time))

# ❌ 避免的做法:重量級處理
@events.request.add_listener
def heavy_handler(request_type, name, response_time, **kwargs):
    # 避免在事件處理器中進行重量級操作
    complex_calculation(response_time)  # 這會影響測試性能
    send_to_external_api(data)         # 這會導致網路延遲

2. 錯誤處理

@events.request.add_listener
def safe_event_handler(request_type, name, response_time, **kwargs):
    try:
        # 事件處理邏輯
        process_request_data(request_type, name, response_time)
    except Exception as e:
        # 記錄錯誤但不中斷測試
        logger.error(f"Event handler error: {e}")

3. 性能考量

import threading
from queue import Queue

# 使用背景線程處理重量級任務
class BackgroundProcessor:
    def __init__(self):
        self.queue = Queue()
        self.worker_thread = threading.Thread(target=self._process_worker)
        self.worker_thread.daemon = True
        self.is_running = False
    
    def start(self):
        self.is_running = True
        self.worker_thread.start()
    
    def stop(self):
        self.is_running = False
    
    def add_task(self, task_data):
        self.queue.put(task_data)
    
    def _process_worker(self):
        while self.is_running:
            try:
                if not self.queue.empty():
                    task = self.queue.get_nowait()
                    self._process_heavy_task(task)
                else:
                    time.sleep(0.1)
            except Exception as e:
                logger.error(f"Background processor error: {e}")
    
    def _process_heavy_task(self, task_data):
        # 執行重量級任務
        pass

# 全局處理器
background_processor = BackgroundProcessor()

@events.test_start.add_listener
def start_background_processor(environment, **kwargs):
    background_processor.start()

@events.test_stop.add_listener
def stop_background_processor(environment, **kwargs):
    background_processor.stop()

@events.request.add_listener
def queue_heavy_processing(request_type, name, response_time, **kwargs):
    # 將重量級任務加入隊列,不阻塞測試執行
    background_processor.add_task({
        "type": request_type,
        "name": name,
        "response_time": response_time,
        "timestamp": time.time()
    })

總結

今天我們深入學習了 Locust Event Hook 的完整應用:

  1. 基本概念:理解 Event Hook 的工作原理和應用場景
  2. 基礎應用:掌握常見事件的監聽和處理方法
  3. 進階功能:實現自定義統計、外部系統整合、動態負載調整
  4. 分散式支持:處理分散式測試中的特殊事件
  5. 最佳實踐:性能優化、錯誤處理、背景處理等技巧

透過 Event Hook,我們可以:

  • 擴展 Locust 功能:實現原生功能無法滿足的需求
  • 整合外部系統:連接監控平台、數據庫、API 等
  • 自動化決策:根據測試結果動態調整測試行為
  • 深度分析:收集和分析更詳細的測試數據

Event Hook 是 Locust 高階應用的核心技術,掌握它能讓你的性能測試更智能、更全面、更符合實際需求。


上一篇
Day11 - Locust 配置文件管理 (Configuration Files)
下一篇
Day13 - Locust Custom Clients 自訂客戶端
系列文
Vibe Coding 後的挑戰:Locust x Loki 負載及監控13
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言