今天我們將學習如何使用 A/B Testing 和實驗管理來科學地評估模型效能,並使用 AWS 的工具來管理整個實驗流程
去比較哪個模型比較好,這個為了符合業務需求,假設我們有新模型,且同時舊版本用舊模型,在這種狀態下我們需要
去比較新模型是否比舊模型好,那我們今天恰好就是為了解決這類問題而學習。
step1 : 設置 SageMaker experiments
首先,我們需要建立實驗追蹤系統
import boto3
from sagemaker.experiments.experiment import Experiment
from sagemaker.experiments.trial import Trial
from datetime import datetime
# 初始化 SageMaker 客戶端
sagemaker_client = boto3.client('sagemaker')
# 創建實驗
experiment_name = f'bedrock-model-comparison-{datetime.now().strftime("%Y%m%d-%H%M%S")}'
experiment = Experiment.create(
experiment_name=experiment_name,
description='A/B testing for comparing model versions'
)
print(f"實驗已創建: {experiment_name}")
step2 : 建立並部署多模型變體
使用 SageMaker 的 Multi-Model Endpoints 或多個 endpoints 來部署不同版本
from sagemaker.model import Model
from sagemaker.predictor import Predictor
# 部署模型 A (控制組)
model_a_name = 'text-generation-model-v1'
model_a = Model(
image_uri='your-container-image-uri',
model_data='s3://your-bucket/model-v1/model.tar.gz',
role='your-sagemaker-role',
name=model_a_name
)
predictor_a = model_a.deploy(
initial_instance_count=1,
instance_type='ml.g4dn.xlarge',
endpoint_name='model-a-endpoint'
)
# 部署模型 B (實驗組)
model_b_name = 'text-generation-model-v2'
model_b = Model(
image_uri='your-container-image-uri',
model_data='s3://your-bucket/model-v2/model.tar.gz',
role='your-sagemaker-role',
name=model_b_name
)
predictor_b = model_b.deploy(
initial_instance_count=1,
instance_type='ml.g4dn.xlarge',
endpoint_name='model-b-endpoint'
)
print("兩個模型版本已部署完成")
step3 : 實現流量分配邏輯
創建 Lambda 函數來處理 A/B Testing 的流量分配
import json
import random
import boto3
import hashlib
from datetime import datetime
# Lambda 函數代碼
def lambda_handler(event, context):
"""
A/B Testing 流量分配函數
使用用戶 ID 的 hash 來確保同一用戶始終看到相同版本
"""
# 解析請求
body = json.loads(event['body'])
user_id = body.get('user_id', 'anonymous')
prompt = body.get('prompt', '')
# 配置流量分配比例 (可以從環境變量讀取)
model_a_traffic = 0.5 # 50% 流量給模型 A
model_b_traffic = 0.5 # 50% 流量給模型 B
# 使用用戶 ID 的 hash 來決定分配
user_hash = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
traffic_split = (user_hash % 100) / 100.0
# 決定使用哪個模型
if traffic_split < model_a_traffic:
model_variant = 'A'
endpoint_name = 'model-a-endpoint'
else:
model_variant = 'B'
endpoint_name = 'model-b-endpoint'
# 呼叫 SageMaker endpoint
runtime = boto3.client('sagemaker-runtime')
response = runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType='application/json',
Body=json.dumps({'prompt': prompt})
)
result = json.loads(response['Body'].read())
# 記錄實驗數據
log_experiment_data(user_id, model_variant, prompt, result)
return {
'statusCode': 200,
'body': json.dumps({
'model_variant': model_variant,
'result': result,
'timestamp': datetime.now().isoformat()
})
}
def log_experiment_data(user_id, model_variant, prompt, result):
"""
記錄實驗數據到 CloudWatch Logs
"""
cloudwatch = boto3.client('logs')
log_data = {
'user_id': user_id,
'model_variant': model_variant,
'prompt': prompt,
'response_length': len(str(result)),
'timestamp': datetime.now().isoformat()
}
cloudwatch.put_log_events(
logGroupName='/aws/lambda/ab-testing',
logStreamName='experiment-logs',
logEvents=[{
'timestamp': int(datetime.now().timestamp() * 1000),
'message': json.dumps(log_data)
}]
)
step 4 : 追蹤實驗指標
建立 Python 腳本來追蹤和記錄實驗指標
from sagemaker.experiments.trial import Trial
from sagemaker.analytics import ExperimentAnalytics
import time
def create_trial_for_model(experiment_name, model_variant):
"""
為每個模型版本創建 Trial
"""
trial_name = f'trial-{model_variant}-{int(time.time())}'
trial = Trial.create(
trial_name=trial_name,
experiment_name=experiment_name
)
return trial
def log_metrics_to_trial(trial, metrics_dict):
"""
記錄指標到 Trial
"""
for metric_name, metric_value in metrics_dict.items():
trial.log_metric(
metric_name=metric_name,
value=metric_value,
timestamp=datetime.now()
)
# 使用範例
trial_a = create_trial_for_model(experiment_name, 'A')
trial_b = create_trial_for_model(experiment_name, 'B')
# 模擬記錄指標
metrics_a = {
'latency_ms': 250,
'user_satisfaction': 4.2,
'completion_rate': 0.85,
'error_rate': 0.02
}
metrics_b = {
'latency_ms': 180,
'user_satisfaction': 4.5,
'completion_rate': 0.92,
'error_rate': 0.01
}
log_metrics_to_trial(trial_a, metrics_a)
log_metrics_to_trial(trial_b, metrics_b)
step 5 : 可以分析實驗結果了
import pandas as pd
import numpy as np
from scipy import stats
def analyze_experiment_results(experiment_name):
"""
分析實驗結果並進行統計檢定
"""
# 使用 SageMaker Experiments Analytics
analytics = ExperimentAnalytics(
experiment_name=experiment_name
)
# 獲取數據
df = analytics.dataframe()
# 按模型變體分組
model_a_data = df[df['TrialComponentName'].str.contains('trial-A')]
model_b_data = df[df['TrialComponentName'].str.contains('trial-B')]
# 計算關鍵指標
results = {
'Model A': {
'avg_latency': model_a_data['latency_ms'].mean(),
'avg_satisfaction': model_a_data['user_satisfaction'].mean(),
'completion_rate': model_a_data['completion_rate'].mean(),
'error_rate': model_a_data['error_rate'].mean()
},
'Model B': {
'avg_latency': model_b_data['latency_ms'].mean(),
'avg_satisfaction': model_b_data['user_satisfaction'].mean(),
'completion_rate': model_b_data['completion_rate'].mean(),
'error_rate': model_b_data['error_rate'].mean()
}
}
# 進行 t-test 檢定顯著性
t_stat, p_value = stats.ttest_ind(
model_a_data['user_satisfaction'],
model_b_data['user_satisfaction']
)
print("=== 實驗結果分析 ===")
print(f"\nModel A 平均表現:")
for metric, value in results['Model A'].items():
print(f" {metric}: {value:.3f}")
print(f"\nModel B 平均表現:")
for metric, value in results['Model B'].items():
print(f" {metric}: {value:.3f}")
print(f"\n統計顯著性檢定:")
print(f" t-statistic: {t_stat:.3f}")
print(f" p-value: {p_value:.3f}")
if p_value < 0.05:
print(" 結論: 差異具有統計顯著性 ✓")
else:
print(" 結論: 差異不具統計顯著性")
return results, p_value
# 執行分析
results, p_value = analyze_experiment_results(experiment_name)
step 6:
使用 Python 繪製視覺化圖表
import matplotlib.pyplot as plt
import seaborn as sns
def visualize_experiment_results(results):
"""
視覺化實驗結果
"""
# 設置樣式
sns.set_style("whitegrid")
# 創建子圖
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle('A/B Testing 實驗結果比較', fontsize=16, fontweight='bold')
metrics = ['avg_latency', 'avg_satisfaction', 'completion_rate', 'error_rate']
titles = ['平均延遲 (ms)', '用戶滿意度', '完成率', '錯誤率']
for idx, (metric, title) in enumerate(zip(metrics, titles)):
ax = axes[idx // 2, idx % 2]
values = [results['Model A'][metric], results['Model B'][metric]]
colors = ['#3498db', '#e74c3c']
bars = ax.bar(['Model A', 'Model B'], values, color=colors, alpha=0.7)
ax.set_title(title, fontsize=12, fontweight='bold')
ax.set_ylabel('Value')
# 在柱狀圖上顯示數值
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.3f}',
ha='center', va='bottom')
plt.tight_layout()
plt.savefig('ab_testing_results.png', dpi=300, bbox_inches='tight')
print("圖表已保存為 ab_testing_results.png")
# 執行視覺化
visualize_experiment_results(results)
class ExperimentConfig:
"""
實驗配置類別
"""
def __init__(self):
# 流量分配
self.traffic_allocation = {
'control': 0.5, # 控制組
'treatment': 0.5 # 實驗組
}
# 最小樣本數
self.minimum_sample_size = 1000
# 實驗持續時間
self.duration_days = 7
# 關鍵指標
self.primary_metrics = ['user_satisfaction', 'task_completion']
self.secondary_metrics = ['latency', 'error_rate']
# 統計顯著性閾值
self.significance_level = 0.05
def validate_experiment_readiness(self, current_sample_size):
"""
驗證實驗是否準備好進行分析
"""
if current_sample_size < self.minimum_sample_size:
return False, f"樣本數不足: {current_sample_size}/{self.minimum_sample_size}"
return True, "實驗準備就緒"
監控實驗進度
import boto3
from datetime import datetime, timedelta
def monitor_experiment_health(experiment_name):
"""
監控實驗健康狀態
"""
cloudwatch = boto3.client('cloudwatch')
# 獲取過去 24 小時的指標
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
metrics_to_monitor = [
'ErrorRate',
'Latency',
'RequestCount'
]
health_report = {}
for metric_name in metrics_to_monitor:
response = cloudwatch.get_metric_statistics(
Namespace='AWS/SageMaker',
MetricName=metric_name,
Dimensions=[
{'Name': 'EndpointName', 'Value': 'model-a-endpoint'}
],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average', 'Maximum']
)
health_report[metric_name] = response['Datapoints']
# 檢查異常
alerts = []
if health_report.get('ErrorRate'):
avg_error = sum(d['Average'] for d in health_report['ErrorRate']) / len(health_report['ErrorRate'])
if avg_error > 0.05: # 錯誤率超過 5%
alerts.append(f"警告: 錯誤率過高 ({avg_error:.2%})")
return health_report, alerts
# 定期監控
health_report, alerts = monitor_experiment_health(experiment_name)
if alerts:
print("⚠️ 檢測到異常:")
for alert in alerts:
print(f" - {alert}")
自動化決策
class ExperimentDecisionEngine:
"""
實驗決策引擎
"""
def __init__(self, config):
self.config = config
def should_stop_experiment(self, results, p_value):
"""
判斷是否應該停止實驗
"""
# 檢查統計顯著性
if p_value < self.config.significance_level:
return True, "達到統計顯著性"
# 檢查是否有明顯的負面影響
if results['Model B']['error_rate'] > results['Model A']['error_rate'] * 1.5:
return True, "實驗組錯誤率過高"
return False, "繼續實驗"
def recommend_rollout_strategy(self, results, p_value):
"""
推薦部署策略
"""
if p_value >= self.config.significance_level:
return "維持現狀", "差異不顯著"
improvement = (
results['Model B']['avg_satisfaction'] -
results['Model A']['avg_satisfaction']
) / results['Model A']['avg_satisfaction']
if improvement > 0.1: # 改善超過 10%
return "全面推出", f"顯著改善 {improvement:.1%}"
elif improvement > 0:
return "逐步推出", f"小幅改善 {improvement:.1%}"
else:
return "回滾", f"表現下降 {improvement:.1%}"
# 使用決策引擎
config = ExperimentConfig()
engine = ExperimentDecisionEngine(config)
should_stop, reason = engine.should_stop_experiment(results, p_value)
strategy, explanation = engine.recommend_rollout_strategy(results, p_value)
print(f"實驗決策: {strategy}")
print(f"原因: {explanation}")
import boto3
import json
class BedrockABTest:
"""
Bedrock 模型 A/B Testing
"""
def __init__(self):
self.bedrock = boto3.client('bedrock-runtime')
def invoke_model_variant(self, variant, prompt):
"""
調用不同的模型變體
"""
# 定義不同的模型配置
model_configs = {
'A': {
'modelId': 'anthropic.claude-3-sonnet-20240229-v1:0',
'system_prompt': '你是一個專業的助手。'
},
'B': {
'modelId': 'anthropic.claude-3-sonnet-20240229-v1:0',
'system_prompt': '你是一個友善且專業的 AI 助手,會提供詳細的解釋。'
}
}
config = model_configs[variant]
# 構建請求
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"system": config['system_prompt'],
"messages": [
{
"role": "user",
"content": prompt
}
]
}
# 調用 Bedrock
response = self.bedrock.invoke_model(
modelId=config['modelId'],
body=json.dumps(body)
)
result = json.loads(response['body'].read())
return result
# 測試不同的 prompt 策略
ab_test = BedrockABTest()
# 收集用戶反饋
def collect_user_feedback(variant, response, user_rating):
"""
收集用戶反饋
"""
feedback = {
'variant': variant,
'response_length': len(response['content'][0]['text']),
'user_rating': user_rating,
'timestamp': datetime.now().isoformat()
}
# 記錄到 CloudWatch
cloudwatch = boto3.client('logs')
cloudwatch.put_log_events(
logGroupName='/aws/bedrock/ab-testing',
logStreamName='user-feedback',
logEvents=[{
'timestamp': int(datetime.now().timestamp() * 1000),
'message': json.dumps(feedback)
}]
)
class CostOptimizer:
"""
成本優化器
"""
def __init__(self):
self.ce = boto3.client('ce')
def estimate_experiment_cost(self, instance_type, duration_days, requests_per_day):
"""
估算實驗成本
"""
# 實例成本 (以 ml.g4dn.xlarge 為例: $0.736/hour)
instance_costs = {
'ml.g4dn.xlarge': 0.736,
'ml.g5.xlarge': 1.006
}
hourly_cost = instance_costs.get(instance_type, 0.736)
instance_cost = hourly_cost * 24 * duration_days * 2 # 兩個 endpoints
# 推理成本 (每 1000 requests 約 $0.10)
inference_cost = (requests_per_day * duration_days / 1000) * 0.10
total_cost = instance_cost + inference_cost
print(f"=== 實驗成本估算 ===")
print(f"實例類型: {instance_type}")
print(f"實驗天數: {duration_days}")
print(f"每日請求數: {requests_per_day:,}")
print(f"實例成本: ${instance_cost:.2f}")
print(f"推理成本: ${inference_cost:.2f}")
print(f"總成本: ${total_cost:.2f}")
return total_cost
# 使用成本優化器
optimizer = CostOptimizer()
estimated_cost = optimizer.estimate_experiment_cost(
instance_type='ml.g4dn.xlarge',
duration_days=7,
requests_per_day=10000
)