iT邦幫忙

2025 iThome 鐵人賽

DAY 22
0

前言

今天我們要專注於監控和 log 分析相關,前陣子我們幾乎都只是針對 aws bedrock 和
Sagemaker 提到關於監控相關的執行和實作,今天我們以一個篇幅針對監控和相關操作
撰寫今天的內容,並且提及一些可觀測性架構,在實務需求中我們要考量模型的健康狀態
以及性能表現。

為什麼需要監控與日誌分析

監控和日誌分析扮演著關鍵角色

  • 及早發現問題: 在用戶發現之前就識別潛在問題
  • 性能優化: 了解瓶頸所在,針對性地優化
  • 成本控制: 監控資源使用,避免不必要的開支
  • 模型漂移檢測: 發現模型預測品質下降的徵兆
  • 合規性要求: 滿足稽核和法規要求

AWS 監控工具概覽

  • Amazon CloudWatch: AWS 的核心監控服務
  • AWS CloudTrail: API 呼叫追蹤和稽核
  • Amazon CloudWatch Logs: 集中式日誌管理
  • Amazon CloudWatch Insights: 日誌查詢和分析

實作

首先我們先開始設定基本監控,使用 cloudwatch 去實現這個目的
讓我們為 SageMaker 端點設定基本的 CloudWatch 監控

import boto3
from datetime import datetime, timedelta

# 初始化客戶端
cloudwatch = boto3.client('cloudwatch')
sagemaker = boto3.client('sagemaker')

def get_endpoint_metrics(endpoint_name, metric_name, period=300):
    """
    獲取端點指標
    
    Args:
        endpoint_name: 端點名稱
        metric_name: 指標名稱
        period: 時間週期(秒)
    """
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=1)
    
    response = cloudwatch.get_metric_statistics(
        Namespace='AWS/SageMaker',
        MetricName=metric_name,
        Dimensions=[
            {
                'Name': 'EndpointName',
                'Value': endpoint_name
            },
        ],
        StartTime=start_time,
        EndTime=end_time,
        Period=period,
        Statistics=['Average', 'Maximum', 'Minimum']
    )
    
    return response['Datapoints']

# 關鍵指標
endpoint_name = 'your-endpoint-name'

# 獲取各項指標
invocations = get_endpoint_metrics(endpoint_name, 'Invocations')
model_latency = get_endpoint_metrics(endpoint_name, 'ModelLatency')
overhead_latency = get_endpoint_metrics(endpoint_name, 'OverheadLatency')

print(f"呼叫次數: {len(invocations)} 個數據點")
print(f"模型延遲: {len(model_latency)} 個數據點")

創建自定義 Dashboard

def create_monitoring_dashboard(endpoint_name, dashboard_name):
    """創建 CloudWatch 儀表板"""
    
    dashboard_body = {
        "widgets": [
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/SageMaker", "ModelLatency", {"stat": "Average"}],
                        [".", ".", {"stat": "p99"}]
                    ],
                    "period": 300,
                    "stat": "Average",
                    "region": "<你的Region>",
                    "title": "模型延遲",
                    "yAxis": {
                        "left": {
                            "label": "毫秒"
                        }
                    }
                }
            },
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/SageMaker", "Invocations", {"stat": "Sum"}],
                        [".", "Invocation4XXErrors", {"stat": "Sum"}],
                        [".", "Invocation5XXErrors", {"stat": "Sum"}]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": "<你的Region>",
                    "title": "請求與錯誤",
                    "yAxis": {
                        "left": {
                            "label": "次數"
                        }
                    }
                }
            },
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/SageMaker", "CPUUtilization", {"stat": "Average"}],
                        [".", "MemoryUtilization", {"stat": "Average"}]
                    ],
                    "period": 300,
                    "stat": "Average",
                    "region": "<你的Region>",
                    "title": "資源使用率",
                    "yAxis": {
                        "left": {
                            "label": "百分比"
                        }
                    }
                }
            }
        ]
    }
    
    cloudwatch.put_dashboard(
        DashboardName=dashboard_name,
        DashboardBody=json.dumps(dashboard_body)
    )
    
    print(f"儀表板 '{dashboard_name}' 創建成功!")

# 創建儀表板
create_monitoring_dashboard(endpoint_name, 'SageMaker-Endpoint-Monitor')

當然這個可以在 AWS cloudwatch 頁面去實現手工創建,這裡就不演示

設定警報系統 - Cloudwatch alarms

def create_latency_alarm(endpoint_name, threshold_ms=1000):
    """創建延遲告警"""
    
    alarm_name = f'{endpoint_name}-high-latency'
    
    cloudwatch.put_metric_alarm(
        AlarmName=alarm_name,
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=2,
        MetricName='ModelLatency',
        Namespace='AWS/SageMaker',
        Period=300,
        Statistic='Average',
        Threshold=threshold_ms,
        ActionsEnabled=True,
        AlarmDescription=f'當模型延遲超過 {threshold_ms}ms 時觸發',
        Dimensions=[
            {
                'Name': 'EndpointName',
                'Value': endpoint_name
            },
        ],
        TreatMissingData='notBreaching'
    )
    
    print(f"延遲告警 '{alarm_name}' 創建成功!")

def create_error_alarm(endpoint_name, error_threshold=10):
    """創建錯誤率告警"""
    
    alarm_name = f'{endpoint_name}-high-errors'
    
    cloudwatch.put_metric_alarm(
        AlarmName=alarm_name,
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=1,
        MetricName='Invocation5XXErrors',
        Namespace='AWS/SageMaker',
        Period=300,
        Statistic='Sum',
        Threshold=error_threshold,
        ActionsEnabled=True,
        AlarmDescription=f'當 5XX 錯誤超過 {error_threshold} 次時觸發',
        Dimensions=[
            {
                'Name': 'EndpointName',
                'Value': endpoint_name
            },
        ]
    )
    
    print(f"錯誤告警 '{alarm_name}' 創建成功!")

# 創建告警
create_latency_alarm(endpoint_name, threshold_ms=500)
create_error_alarm(endpoint_name, error_threshold=5)

這裏針對關鍵指標進行警報

設定 SNS

aws 有個服務為 SNS 可以針對警報發出通知,像是寄信之類的操作
可以直接針對 cloudwatch 進行整合

def setup_alarm_notification(alarm_name, sns_topic_arn):
    """為告警設定 SNS 通知"""
    
    cloudwatch.put_metric_alarm(
        AlarmName=alarm_name,
        AlarmActions=[sns_topic_arn],
        # ... 其他告警設定
    )

# 創建 SNS 主題
sns = boto3.client('sns')

topic_response = sns.create_topic(Name='sagemaker-alerts')
topic_arn = topic_response['TopicArn']

# 訂閱 Email
sns.subscribe(
    TopicArn=topic_arn,
    Protocol='email',
    Endpoint='your-email@example.com'
)

print(f"SNS 主題創建成功: {topic_arn}")

啟用 Bedrock 呼叫 log 分析

def enable_bedrock_logging(model_id, log_group_name):
    """啟用 Bedrock 模型呼叫日誌"""
    
    bedrock = boto3.client('bedrock')
    logs = boto3.client('logs')
    
    # 創建 CloudWatch Logs 群組
    try:
        logs.create_log_group(logGroupName=log_group_name)
        print(f"日誌群組 '{log_group_name}' 創建成功")
    except logs.exceptions.ResourceAlreadyExistsException:
        print(f"日誌群組 '{log_group_name}' 已存在")
    
    # 設定日誌保留期限
    logs.put_retention_policy(
        logGroupName=log_group_name,
        retentionInDays=7
    )
    
    return log_group_name

log_group = enable_bedrock_logging(
    model_id='anthropic.claude-v2',
    log_group_name='/aws/bedrock/claude-invocations'
)

cloud watch log insight 分析

def analyze_bedrock_logs(log_group_name, hours=24):
    """分析 Bedrock 呼叫日誌"""
    
    logs = boto3.client('logs')
    
    # 查詢語句
    query = """
    fields @timestamp, @message
    | filter @message like /error/
    | stats count() as error_count by bin(5m)
    | sort @timestamp desc
    """
    
    start_time = int((datetime.now() - timedelta(hours=hours)).timestamp())
    end_time = int(datetime.now().timestamp())
    
    # 執行查詢
    response = logs.start_query(
        logGroupName=log_group_name,
        startTime=start_time,
        endTime=end_time,
        queryString=query
    )
    
    query_id = response['queryId']
    
    # 等待查詢完成
    import time
    while True:
        result = logs.get_query_results(queryId=query_id)
        status = result['status']
        
        if status == 'Complete':
            return result['results']
        elif status == 'Failed':
            raise Exception("查詢失敗")
        
        time.sleep(1)

# 執行分析
results = analyze_bedrock_logs('/aws/bedrock/claude-invocations', hours=1)

for result in results[:10]:  # 顯示前 10 筆
    print(result)

這裏有常用查詢

# 查詢 1: Token 使用統計
token_usage_query = """
fields @timestamp, inputTokens, outputTokens
| stats sum(inputTokens) as total_input, 
        sum(outputTokens) as total_output,
        avg(inputTokens) as avg_input,
        avg(outputTokens) as avg_output
"""

# 查詢 2: 延遲分析
latency_query = """
fields @timestamp, latency
| stats avg(latency) as avg_latency,
        max(latency) as max_latency,
        pct(latency, 95) as p95_latency,
        pct(latency, 99) as p99_latency
by bin(5m)
"""

# 查詢 3: 錯誤率趨勢
error_rate_query = """
fields @timestamp, statusCode
| stats count() as total,
        sum(statusCode >= 400) as errors
by bin(15m)
| fields bin, errors / total * 100 as error_rate
"""

自定義指標與 log

def log_custom_metric(metric_name, value, unit='None', dimensions=None):
    """推送自定義指標到 CloudWatch"""
    
    metric_data = {
        'MetricName': metric_name,
        'Value': value,
        'Unit': unit,
        'Timestamp': datetime.utcnow()
    }
    
    if dimensions:
        metric_data['Dimensions'] = dimensions
    
    cloudwatch.put_metric_data(
        Namespace='CustomAI/Metrics',
        MetricData=[metric_data]
    )

# 範例: 記錄模型預測信心度
def track_prediction_confidence(endpoint_name, confidence_score):
    """追蹤預測信心度"""
    
    log_custom_metric(
        metric_name='PredictionConfidence',
        value=confidence_score,
        unit='None',
        dimensions=[
            {'Name': 'EndpointName', 'Value': endpoint_name}
        ]
    )

# 使用範例
track_prediction_confidence('my-endpoint', 0.95)

結構化日誌記錄

import json
import logging

# 設定結構化日誌
class StructuredLogger:
    def __init__(self, log_group_name):
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        
        # 設定 CloudWatch handler (實際需要 watchtower 套件)
        # handler = watchtower.CloudWatchLogHandler(log_group=log_group_name)
        # self.logger.addHandler(handler)
    
    def log_inference(self, endpoint_name, input_data, output, latency):
        """記錄推論請求"""
        
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'inference',
            'endpoint_name': endpoint_name,
            'input_length': len(str(input_data)),
            'output_length': len(str(output)),
            'latency_ms': latency,
            'status': 'success'
        }
        
        self.logger.info(json.dumps(log_entry))
    
    def log_error(self, endpoint_name, error_message, error_type):
        """記錄錯誤"""
        
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'error',
            'endpoint_name': endpoint_name,
            'error_type': error_type,
            'error_message': error_message,
            'status': 'failed'
        }
        
        self.logger.error(json.dumps(log_entry))

# 使用範例
logger = StructuredLogger('/aws/sagemaker/inference-logs')
logger.log_inference('my-endpoint', {'text': 'sample'}, {'result': 0.9}, 150)

成本追蹤

aws 有 cost explorer

可以參考 這裏進行整合

def get_service_costs(service_name, days=7):
    """獲取服務成本"""
    
    ce = boto3.client('ce')  # Cost Explorer
    
    end_date = datetime.now().date()
    start_date = end_date - timedelta(days=days)
    
    response = ce.get_cost_and_usage(
        TimePeriod={
            'Start': start_date.strftime('%Y-%m-%d'),
            'End': end_date.strftime('%Y-%m-%d')
        },
        Granularity='DAILY',
        Metrics=['UnblendedCost'],
        Filter={
            'Dimensions': {
                'Key': 'SERVICE',
                'Values': [service_name]
            }
        }
    )
    
    return response['ResultsByTime']

# 獲取 SageMaker 成本
sagemaker_costs = get_service_costs('Amazon SageMaker')
print("SageMaker 每日成本:")
for item in sagemaker_costs:
    date = item['TimePeriod']['Start']
    cost = item['Total']['UnblendedCost']['Amount']
    print(f"{date}: ${float(cost):.2f}")

# 獲取 Bedrock 成本
bedrock_costs = get_service_costs('Amazon Bedrock')

一樣我們這裡做 alarm 做成本告警,告訴團隊『你花太多錢了😎』

def create_cost_alarm(budget_amount, service_name):
    """創建成本預算告警"""
    
    budgets = boto3.client('budgets')
    
    account_id = boto3.client('sts').get_caller_identity()['Account']
    
    budget = {
        'BudgetName': f'{service_name}-monthly-budget',
        'BudgetLimit': {
            'Amount': str(budget_amount),
            'Unit': 'USD'
        },
        'TimeUnit': 'MONTHLY',
        'BudgetType': 'COST',
        'CostFilters': {
            'Service': [service_name]
        }
    }
    
    notification = {
        'NotificationType': 'ACTUAL',
        'ComparisonOperator': 'GREATER_THAN',
        'Threshold': 80,  # 80% 的預算
        'ThresholdType': 'PERCENTAGE'
    }
    
    subscriber = {
        'SubscriptionType': 'EMAIL',
        'Address': 'your-email@example.com'
    }
    
    try:
        budgets.create_budget(
            AccountId=account_id,
            Budget=budget,
            NotificationsWithSubscribers=[
                {
                    'Notification': notification,
                    'Subscribers': [subscriber]
                }
            ]
        )
        print(f"成本預算告警設定成功!")
    except Exception as e:
        print(f"設定失敗: {e}")

# 為 SageMaker 設定月度預算 $500
create_cost_alarm(500, 'Amazon SageMaker')

整合監控 Dashboard

def create_comprehensive_dashboard():
    """創建綜合監控儀表板"""
    
    dashboard_name = 'AI-Services-Comprehensive-Monitor'
    
    dashboard_body = {
        "widgets": [
            # SageMaker 性能
            {
                "type": "metric",
                "properties": {
                    "title": "SageMaker 端點延遲",
                    "metrics": [
                        ["AWS/SageMaker", "ModelLatency", {"stat": "Average"}],
                        ["...", {"stat": "p99"}]
                    ],
                    "period": 300,
                    "region": "<你的Region>"
                }
            },
            # Bedrock Token 使用
            {
                "type": "log",
                "properties": {
                    "title": "Bedrock Token 使用趨勢",
                    "query": """
                        SOURCE '/aws/bedrock/model-invocations'
                        | fields @timestamp, inputTokens, outputTokens
                        | stats sum(inputTokens + outputTokens) as total_tokens by bin(1h)
                    """,
                    "region": "<你的Region>"
                }
            },
            # 錯誤率
            {
                "type": "metric",
                "properties": {
                    "title": "服務錯誤率",
                    "metrics": [
                        ["AWS/SageMaker", "Invocation4XXErrors"],
                        [".", "Invocation5XXErrors"]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": "<你的Region>"
                }
            },
            # 成本追蹤
            {
                "type": "metric",
                "properties": {
                    "title": "每日成本",
                    "metrics": [
                        ["AWS/Billing", "EstimatedCharges", 
                         {"ServiceName": "Amazon SageMaker"}],
                        ["...", {"ServiceName": "Amazon Bedrock"}]
                    ],
                    "period": 86400,
                    "stat": "Maximum",
                    "region": "<你的Region>"
                }
            }
        ]
    }
    
    cloudwatch.put_dashboard(
        DashboardName=dashboard_name,
        DashboardBody=json.dumps(dashboard_body)
    )
    
    print(f"綜合儀表板創建成功!")
    print(f"訪問網址: https://console.aws.amazon.com/cloudwatch/home?region=<你的Region>#dashboards:name={dashboard_name}")

create_comprehensive_dashboard()

health check(健康檢查)

def endpoint_health_check(endpoint_name):
    """端點健康檢查"""
    
    checks = {
        'endpoint_status': False,
        'low_latency': False,
        'low_errors': False,
        'sufficient_capacity': False
    }
    
    # 檢查端點狀態
    try:
        response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
        checks['endpoint_status'] = response['EndpointStatus'] == 'InService'
    except Exception as e:
        print(f"無法獲取端點狀態: {e}")
        return checks
    
    # 檢查延遲
    latency_data = get_endpoint_metrics(endpoint_name, 'ModelLatency', period=300)
    if latency_data:
        avg_latency = sum(d['Average'] for d in latency_data) / len(latency_data)
        checks['low_latency'] = avg_latency < 500  # 小於 500ms
    
    # 檢查錯誤率
    error_data = get_endpoint_metrics(endpoint_name, 'Invocation5XXErrors', period=300)
    if error_data:
        total_errors = sum(d['Sum'] for d in error_data)
        checks['low_errors'] = total_errors < 10
    
    # 檢查容量
    invocations = get_endpoint_metrics(endpoint_name, 'Invocations', period=300)
    if invocations:
        checks['sufficient_capacity'] = True  # 簡化檢查
    
    # 輸出結果
    print(f"\n端點健康檢查: {endpoint_name}")
    print("=" * 50)
    for check, status in checks.items():
        status_icon = "✓" if status else "✗"
        print(f"{status_icon} {check}: {'通過' if status else '失敗'}")
    
    return all(checks.values())

# 執行健康檢查
is_healthy = endpoint_health_check('your-endpoint-name')

參考資料


上一篇
即時推理與批次處理架構
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言