iT邦幫忙

2025 iThome 鐵人賽

DAY 19
0

前言

今天我們將學習如何使用 A/B Testing 和實驗管理來科學地評估模型效能,並使用 AWS 的工具來管理整個實驗流程
去比較哪個模型比較好,這個為了符合業務需求,假設我們有新模型,且同時舊版本用舊模型,在這種狀態下我們需要
去比較新模型是否比舊模型好,那我們今天恰好就是為了解決這類問題而學習。

為什麼需要 A/B Testing?

  • 客觀評估:用真實用戶數據驗證模型改進
  • 風險控制:逐步推出新模型,降低潛在影響
  • 持續優化:基於數據驅動的決策改進模型
  • 業務價值:直接衡量模型對業務指標的影響

AWS 上的實驗管理架構

  • SageMaker Experiments:追蹤和管理 ML 實驗
  • SageMaker Model Monitor:監控模型表現
  • CloudWatch:收集和分析指標
  • Lambda + API Gateway:實現流量分配邏輯

建立 A/B Testing 系統

step1 : 設置 SageMaker experiments

首先,我們需要建立實驗追蹤系統

import boto3
from sagemaker.experiments.experiment import Experiment
from sagemaker.experiments.trial import Trial
from datetime import datetime

# 初始化 SageMaker 客戶端
sagemaker_client = boto3.client('sagemaker')

# 創建實驗
experiment_name = f'bedrock-model-comparison-{datetime.now().strftime("%Y%m%d-%H%M%S")}'

experiment = Experiment.create(
    experiment_name=experiment_name,
    description='A/B testing for comparing model versions'
)

print(f"實驗已創建: {experiment_name}")

step2 : 建立並部署多模型變體

使用 SageMaker 的 Multi-Model Endpoints 或多個 endpoints 來部署不同版本

from sagemaker.model import Model
from sagemaker.predictor import Predictor

# 部署模型 A (控制組)
model_a_name = 'text-generation-model-v1'
model_a = Model(
    image_uri='your-container-image-uri',
    model_data='s3://your-bucket/model-v1/model.tar.gz',
    role='your-sagemaker-role',
    name=model_a_name
)

predictor_a = model_a.deploy(
    initial_instance_count=1,
    instance_type='ml.g4dn.xlarge',
    endpoint_name='model-a-endpoint'
)

# 部署模型 B (實驗組)
model_b_name = 'text-generation-model-v2'
model_b = Model(
    image_uri='your-container-image-uri',
    model_data='s3://your-bucket/model-v2/model.tar.gz',
    role='your-sagemaker-role',
    name=model_b_name
)

predictor_b = model_b.deploy(
    initial_instance_count=1,
    instance_type='ml.g4dn.xlarge',
    endpoint_name='model-b-endpoint'
)

print("兩個模型版本已部署完成")

step3 : 實現流量分配邏輯

創建 Lambda 函數來處理 A/B Testing 的流量分配

import json
import random
import boto3
import hashlib
from datetime import datetime

# Lambda 函數代碼
def lambda_handler(event, context):
    """
    A/B Testing 流量分配函數
    使用用戶 ID 的 hash 來確保同一用戶始終看到相同版本
    """
    
    # 解析請求
    body = json.loads(event['body'])
    user_id = body.get('user_id', 'anonymous')
    prompt = body.get('prompt', '')
    
    # 配置流量分配比例 (可以從環境變量讀取)
    model_a_traffic = 0.5  # 50% 流量給模型 A
    model_b_traffic = 0.5  # 50% 流量給模型 B
    
    # 使用用戶 ID 的 hash 來決定分配
    user_hash = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    traffic_split = (user_hash % 100) / 100.0
    
    # 決定使用哪個模型
    if traffic_split < model_a_traffic:
        model_variant = 'A'
        endpoint_name = 'model-a-endpoint'
    else:
        model_variant = 'B'
        endpoint_name = 'model-b-endpoint'
    
    # 呼叫 SageMaker endpoint
    runtime = boto3.client('sagemaker-runtime')
    
    response = runtime.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType='application/json',
        Body=json.dumps({'prompt': prompt})
    )
    
    result = json.loads(response['Body'].read())
    
    # 記錄實驗數據
    log_experiment_data(user_id, model_variant, prompt, result)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'model_variant': model_variant,
            'result': result,
            'timestamp': datetime.now().isoformat()
        })
    }

def log_experiment_data(user_id, model_variant, prompt, result):
    """
    記錄實驗數據到 CloudWatch Logs
    """
    cloudwatch = boto3.client('logs')
    
    log_data = {
        'user_id': user_id,
        'model_variant': model_variant,
        'prompt': prompt,
        'response_length': len(str(result)),
        'timestamp': datetime.now().isoformat()
    }
    
    cloudwatch.put_log_events(
        logGroupName='/aws/lambda/ab-testing',
        logStreamName='experiment-logs',
        logEvents=[{
            'timestamp': int(datetime.now().timestamp() * 1000),
            'message': json.dumps(log_data)
        }]
    )

step 4 : 追蹤實驗指標

建立 Python 腳本來追蹤和記錄實驗指標

from sagemaker.experiments.trial import Trial
from sagemaker.analytics import ExperimentAnalytics
import time

def create_trial_for_model(experiment_name, model_variant):
    """
    為每個模型版本創建 Trial
    """
    trial_name = f'trial-{model_variant}-{int(time.time())}'
    
    trial = Trial.create(
        trial_name=trial_name,
        experiment_name=experiment_name
    )
    
    return trial

def log_metrics_to_trial(trial, metrics_dict):
    """
    記錄指標到 Trial
    """
    for metric_name, metric_value in metrics_dict.items():
        trial.log_metric(
            metric_name=metric_name,
            value=metric_value,
            timestamp=datetime.now()
        )

# 使用範例
trial_a = create_trial_for_model(experiment_name, 'A')
trial_b = create_trial_for_model(experiment_name, 'B')

# 模擬記錄指標
metrics_a = {
    'latency_ms': 250,
    'user_satisfaction': 4.2,
    'completion_rate': 0.85,
    'error_rate': 0.02
}

metrics_b = {
    'latency_ms': 180,
    'user_satisfaction': 4.5,
    'completion_rate': 0.92,
    'error_rate': 0.01
}

log_metrics_to_trial(trial_a, metrics_a)
log_metrics_to_trial(trial_b, metrics_b)

step 5 : 可以分析實驗結果了

import pandas as pd
import numpy as np
from scipy import stats

def analyze_experiment_results(experiment_name):
    """
    分析實驗結果並進行統計檢定
    """
    # 使用 SageMaker Experiments Analytics
    analytics = ExperimentAnalytics(
        experiment_name=experiment_name
    )
    
    # 獲取數據
    df = analytics.dataframe()
    
    # 按模型變體分組
    model_a_data = df[df['TrialComponentName'].str.contains('trial-A')]
    model_b_data = df[df['TrialComponentName'].str.contains('trial-B')]
    
    # 計算關鍵指標
    results = {
        'Model A': {
            'avg_latency': model_a_data['latency_ms'].mean(),
            'avg_satisfaction': model_a_data['user_satisfaction'].mean(),
            'completion_rate': model_a_data['completion_rate'].mean(),
            'error_rate': model_a_data['error_rate'].mean()
        },
        'Model B': {
            'avg_latency': model_b_data['latency_ms'].mean(),
            'avg_satisfaction': model_b_data['user_satisfaction'].mean(),
            'completion_rate': model_b_data['completion_rate'].mean(),
            'error_rate': model_b_data['error_rate'].mean()
        }
    }
    
    # 進行 t-test 檢定顯著性
    t_stat, p_value = stats.ttest_ind(
        model_a_data['user_satisfaction'],
        model_b_data['user_satisfaction']
    )
    
    print("=== 實驗結果分析 ===")
    print(f"\nModel A 平均表現:")
    for metric, value in results['Model A'].items():
        print(f"  {metric}: {value:.3f}")
    
    print(f"\nModel B 平均表現:")
    for metric, value in results['Model B'].items():
        print(f"  {metric}: {value:.3f}")
    
    print(f"\n統計顯著性檢定:")
    print(f"  t-statistic: {t_stat:.3f}")
    print(f"  p-value: {p_value:.3f}")
    
    if p_value < 0.05:
        print("  結論: 差異具有統計顯著性 ✓")
    else:
        print("  結論: 差異不具統計顯著性")
    
    return results, p_value

# 執行分析
results, p_value = analyze_experiment_results(experiment_name)

step 6:

使用 Python 繪製視覺化圖表

import matplotlib.pyplot as plt
import seaborn as sns

def visualize_experiment_results(results):
    """
    視覺化實驗結果
    """
    # 設置樣式
    sns.set_style("whitegrid")
    
    # 創建子圖
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    fig.suptitle('A/B Testing 實驗結果比較', fontsize=16, fontweight='bold')
    
    metrics = ['avg_latency', 'avg_satisfaction', 'completion_rate', 'error_rate']
    titles = ['平均延遲 (ms)', '用戶滿意度', '完成率', '錯誤率']
    
    for idx, (metric, title) in enumerate(zip(metrics, titles)):
        ax = axes[idx // 2, idx % 2]
        
        values = [results['Model A'][metric], results['Model B'][metric]]
        colors = ['#3498db', '#e74c3c']
        
        bars = ax.bar(['Model A', 'Model B'], values, color=colors, alpha=0.7)
        ax.set_title(title, fontsize=12, fontweight='bold')
        ax.set_ylabel('Value')
        
        # 在柱狀圖上顯示數值
        for bar in bars:
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height,
                   f'{height:.3f}',
                   ha='center', va='bottom')
    
    plt.tight_layout()
    plt.savefig('ab_testing_results.png', dpi=300, bbox_inches='tight')
    print("圖表已保存為 ab_testing_results.png")

# 執行視覺化
visualize_experiment_results(results)

實驗管理

  • 實驗設計原則
class ExperimentConfig:
    """
    實驗配置類別
    """
    def __init__(self):
        # 流量分配
        self.traffic_allocation = {
            'control': 0.5,      # 控制組
            'treatment': 0.5     # 實驗組
        }
        
        # 最小樣本數
        self.minimum_sample_size = 1000
        
        # 實驗持續時間
        self.duration_days = 7
        
        # 關鍵指標
        self.primary_metrics = ['user_satisfaction', 'task_completion']
        self.secondary_metrics = ['latency', 'error_rate']
        
        # 統計顯著性閾值
        self.significance_level = 0.05
        
    def validate_experiment_readiness(self, current_sample_size):
        """
        驗證實驗是否準備好進行分析
        """
        if current_sample_size < self.minimum_sample_size:
            return False, f"樣本數不足: {current_sample_size}/{self.minimum_sample_size}"
        return True, "實驗準備就緒"

監控實驗進度

import boto3
from datetime import datetime, timedelta

def monitor_experiment_health(experiment_name):
    """
    監控實驗健康狀態
    """
    cloudwatch = boto3.client('cloudwatch')
    
    # 獲取過去 24 小時的指標
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=24)
    
    metrics_to_monitor = [
        'ErrorRate',
        'Latency',
        'RequestCount'
    ]
    
    health_report = {}
    
    for metric_name in metrics_to_monitor:
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/SageMaker',
            MetricName=metric_name,
            Dimensions=[
                {'Name': 'EndpointName', 'Value': 'model-a-endpoint'}
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=3600,
            Statistics=['Average', 'Maximum']
        )
        
        health_report[metric_name] = response['Datapoints']
    
    # 檢查異常
    alerts = []
    
    if health_report.get('ErrorRate'):
        avg_error = sum(d['Average'] for d in health_report['ErrorRate']) / len(health_report['ErrorRate'])
        if avg_error > 0.05:  # 錯誤率超過 5%
            alerts.append(f"警告: 錯誤率過高 ({avg_error:.2%})")
    
    return health_report, alerts

# 定期監控
health_report, alerts = monitor_experiment_health(experiment_name)
if alerts:
    print("⚠️  檢測到異常:")
    for alert in alerts:
        print(f"  - {alert}")

自動化決策

class ExperimentDecisionEngine:
    """
    實驗決策引擎
    """
    def __init__(self, config):
        self.config = config
        
    def should_stop_experiment(self, results, p_value):
        """
        判斷是否應該停止實驗
        """
        # 檢查統計顯著性
        if p_value < self.config.significance_level:
            return True, "達到統計顯著性"
        
        # 檢查是否有明顯的負面影響
        if results['Model B']['error_rate'] > results['Model A']['error_rate'] * 1.5:
            return True, "實驗組錯誤率過高"
        
        return False, "繼續實驗"
    
    def recommend_rollout_strategy(self, results, p_value):
        """
        推薦部署策略
        """
        if p_value >= self.config.significance_level:
            return "維持現狀", "差異不顯著"
        
        improvement = (
            results['Model B']['avg_satisfaction'] - 
            results['Model A']['avg_satisfaction']
        ) / results['Model A']['avg_satisfaction']
        
        if improvement > 0.1:  # 改善超過 10%
            return "全面推出", f"顯著改善 {improvement:.1%}"
        elif improvement > 0:
            return "逐步推出", f"小幅改善 {improvement:.1%}"
        else:
            return "回滾", f"表現下降 {improvement:.1%}"

# 使用決策引擎
config = ExperimentConfig()
engine = ExperimentDecisionEngine(config)

should_stop, reason = engine.should_stop_experiment(results, p_value)
strategy, explanation = engine.recommend_rollout_strategy(results, p_value)

print(f"實驗決策: {strategy}")
print(f"原因: {explanation}")

與 bedrock 整合 A/B testing

import boto3
import json

class BedrockABTest:
    """
    Bedrock 模型 A/B Testing
    """
    def __init__(self):
        self.bedrock = boto3.client('bedrock-runtime')
        
    def invoke_model_variant(self, variant, prompt):
        """
        調用不同的模型變體
        """
        # 定義不同的模型配置
        model_configs = {
            'A': {
                'modelId': 'anthropic.claude-3-sonnet-20240229-v1:0',
                'system_prompt': '你是一個專業的助手。'
            },
            'B': {
                'modelId': 'anthropic.claude-3-sonnet-20240229-v1:0',
                'system_prompt': '你是一個友善且專業的 AI 助手,會提供詳細的解釋。'
            }
        }
        
        config = model_configs[variant]
        
        # 構建請求
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1000,
            "system": config['system_prompt'],
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        }
        
        # 調用 Bedrock
        response = self.bedrock.invoke_model(
            modelId=config['modelId'],
            body=json.dumps(body)
        )
        
        result = json.loads(response['body'].read())
        return result

# 測試不同的 prompt 策略
ab_test = BedrockABTest()

# 收集用戶反饋
def collect_user_feedback(variant, response, user_rating):
    """
    收集用戶反饋
    """
    feedback = {
        'variant': variant,
        'response_length': len(response['content'][0]['text']),
        'user_rating': user_rating,
        'timestamp': datetime.now().isoformat()
    }
    
    # 記錄到 CloudWatch
    cloudwatch = boto3.client('logs')
    cloudwatch.put_log_events(
        logGroupName='/aws/bedrock/ab-testing',
        logStreamName='user-feedback',
        logEvents=[{
            'timestamp': int(datetime.now().timestamp() * 1000),
            'message': json.dumps(feedback)
        }]
    )

成本優化考量

class CostOptimizer:
    """
    成本優化器
    """
    def __init__(self):
        self.ce = boto3.client('ce')
        
    def estimate_experiment_cost(self, instance_type, duration_days, requests_per_day):
        """
        估算實驗成本
        """
        # 實例成本 (以 ml.g4dn.xlarge 為例: $0.736/hour)
        instance_costs = {
            'ml.g4dn.xlarge': 0.736,
            'ml.g5.xlarge': 1.006
        }
        
        hourly_cost = instance_costs.get(instance_type, 0.736)
        instance_cost = hourly_cost * 24 * duration_days * 2  # 兩個 endpoints
        
        # 推理成本 (每 1000 requests 約 $0.10)
        inference_cost = (requests_per_day * duration_days / 1000) * 0.10
        
        total_cost = instance_cost + inference_cost
        
        print(f"=== 實驗成本估算 ===")
        print(f"實例類型: {instance_type}")
        print(f"實驗天數: {duration_days}")
        print(f"每日請求數: {requests_per_day:,}")
        print(f"實例成本: ${instance_cost:.2f}")
        print(f"推理成本: ${inference_cost:.2f}")
        print(f"總成本: ${total_cost:.2f}")
        
        return total_cost

# 使用成本優化器
optimizer = CostOptimizer()
estimated_cost = optimizer.estimate_experiment_cost(
    instance_type='ml.g4dn.xlarge',
    duration_days=7,
    requests_per_day=10000
)

參考資料


上一篇
SageMaker Pipeline:ML工作流程自動化
下一篇
多模態AI應用:文字+圖像處理
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰20
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言