iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0

前言

今天我們將探討如何在AWS上實現Auto Scaling和高可用性設計,
確保我們的AI應用能夠穩定可靠地服務用戶。通常確保我們系統用量比較高的時候
確實會考慮到 autoscaling 的部分部署我們的 AI 應用
開始我們的嘗試吧!

Auto Scaling 去實現高可用性

以下場景可以考慮高可用性到我們的系統內

  • 流量突增: 新聞推送導致用戶量暴增10倍
  • 區域故障: 某個可用區因自然災害中斷服務
  • 成本優化: 深夜流量低谷時仍維持全部資源
  • 模型推理延遲: 單一實例無法處理並發請求

AWS Auto Scaling 核心概念與種類

EC2 Auto Scaling

  • 自動調整EC2實例數量
  • 適用於自建推理服務

Application Auto Scaling

  • 支援多種AWS服務
  • 包括ECS、Lambda、SageMaker等

SageMaker Auto Scaling

  • 專為SageMaker端點設計
  • 基於推理負載自動調整

Scaling 策略

Target Tracking Scaling
維持特定指標在目標值附近

例如 :

CPU使用率保持在70%
請求數量維持在每實例1000次/分鐘

Step Scaling

根據CloudWatch告警觸發,階梯式增減容量

Scheduled Scaling

根據預期的流量模式定時調整,適合有規律的業務場景。

SageMaker 端點 Auto Scaling 實戰

設定 autoscaling 策略

import boto3
from datetime import datetime

# 初始化客戶端
sagemaker_client = boto3.client('sagemaker')
autoscaling_client = boto3.client('application-autoscaling')
cloudwatch_client = boto3.client('cloudwatch')

# 端點配置
endpoint_name = 'bedrock-qa-endpoint'
variant_name = 'AllTraffic'
resource_id = f'endpoint/{endpoint_name}/variant/{variant_name}'

# 註冊可擴展目標
response = autoscaling_client.register_scalable_target(
    ServiceNamespace='sagemaker',
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=1,  # 最小實例數
    MaxCapacity=10,  # 最大實例數
    RoleARN='arn:aws:iam::YOUR_ACCOUNT:role/SageMakerAutoScalingRole'
)

# 設定 Target Tracking Scaling 策略
response = autoscaling_client.put_scaling_policy(
    PolicyName='SageMakerEndpointInvocationScaling',
    ServiceNamespace='sagemaker',
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 1000.0,  # 目標值:每分鐘1000次調用
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
        },
        'ScaleInCooldown': 300,  # 縮容冷卻時間(秒)
        'ScaleOutCooldown': 60   # 擴容冷卻時間(秒)
    }
)

print(f"Auto Scaling 策略已設定: {response['PolicyARN']}")

基於自定義指標擴展

# 建立基於模型延遲的 Auto Scaling
def create_latency_based_scaling(endpoint_name, target_latency_ms=500):
    """
    基於模型推理延遲設定 Auto Scaling
    """
    resource_id = f'endpoint/{endpoint_name}/variant/AllTraffic'
    
    # 設定基於 ModelLatency 的擴展策略
    response = autoscaling_client.put_scaling_policy(
        PolicyName='ModelLatencyScaling',
        ServiceNamespace='sagemaker',
        ResourceId=resource_id,
        ScalableDimension='sagemaker:variant:DesiredInstanceCount',
        PolicyType='TargetTrackingScaling',
        TargetTrackingScalingPolicyConfiguration={
            'TargetValue': target_latency_ms,
            'CustomizedMetricSpecification': {
                'MetricName': 'ModelLatency',
                'Namespace': 'AWS/SageMaker',
                'Statistic': 'Average',
                'Unit': 'Milliseconds',
                'Dimensions': [
                    {
                        'Name': 'EndpointName',
                        'Value': endpoint_name
                    },
                    {
                        'Name': 'VariantName',
                        'Value': 'AllTraffic'
                    }
                ]
            },
            'ScaleInCooldown': 600,
            'ScaleOutCooldown': 300
        }
    )
    
    return response

# 應用延遲基礎擴展
create_latency_based_scaling('bedrock-qa-endpoint', target_latency_ms=500)

整合前面的監控,監控 scaling 活動

def monitor_scaling_activities(endpoint_name, hours=1):
    """
    監控最近的 Auto Scaling 活動
    """
    resource_id = f'endpoint/{endpoint_name}/variant/AllTraffic'
    
    response = autoscaling_client.describe_scaling_activities(
        ServiceNamespace='sagemaker',
        ResourceId=resource_id,
        MaxResults=50
    )
    
    print(f"\n最近 {hours} 小時的擴展活動:")
    print("-" * 80)
    
    for activity in response['ScalingActivities']:
        start_time = activity['StartTime']
        status = activity['StatusCode']
        description = activity['Description']
        
        print(f"時間: {start_time}")
        print(f"狀態: {status}")
        print(f"描述: {description}")
        print("-" * 80)

# 執行監控
monitor_scaling_activities('bedrock-qa-endpoint')

高可用性架構設計 - MultiAZ(多區部署)

def create_multi_az_endpoint(model_name, endpoint_config_name, endpoint_name):
    """
    建立跨多個可用區的高可用端點
    """
    
    # 建立端點配置 - 多實例跨AZ
    endpoint_config = sagemaker_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                'VariantName': 'AllTraffic',
                'ModelName': model_name,
                'InitialInstanceCount': 3,  # 至少3個實例
                'InstanceType': 'ml.m5.xlarge',
                'InitialVariantWeight': 1.0
            }
        ],
        DataCaptureConfig={
            'EnableCapture': True,
            'InitialSamplingPercentage': 100,
            'DestinationS3Uri': 's3://my-bucket/data-capture',
            'CaptureOptions': [
                {'CaptureMode': 'Input'},
                {'CaptureMode': 'Output'}
            ]
        }
    )
    
    # 建立端點
    endpoint = sagemaker_client.create_endpoint(
        EndpointName=endpoint_name,
        EndpointConfigName=endpoint_config_name
    )
    
    return endpoint

# 部署多AZ端點
create_multi_az_endpoint(
    model_name='my-ai-model',
    endpoint_config_name='multi-az-config-v1',
    endpoint_name='high-availability-endpoint'
)

healthcheck 檢查與自動恢復

import time

def implement_health_check(endpoint_name, check_interval=60):
    """
    實作端點健康檢查機制
    """
    
    def check_endpoint_health():
        try:
            # 檢查端點狀態
            response = sagemaker_client.describe_endpoint(
                EndpointName=endpoint_name
            )
            
            status = response['EndpointStatus']
            
            if status != 'InService':
                print(f"⚠️  端點狀態異常: {status}")
                return False
            
            # 執行測試推理
            runtime_client = boto3.client('sagemaker-runtime')
            test_response = runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType='application/json',
                Body='{"text": "health check test"}'
            )
            
            if test_response['ResponseMetadata']['HTTPStatusCode'] == 200:
                print(f"✓ 端點健康檢查通過 - {datetime.now()}")
                return True
            else:
                print(f"✗ 端點回應異常")
                return False
                
        except Exception as e:
            print(f"✗ 健康檢查失敗: {str(e)}")
            return False
    
    # 持續監控
    while True:
        is_healthy = check_endpoint_health()
        
        if not is_healthy:
            # 觸發告警
            send_alert(f"端點 {endpoint_name} 健康檢查失敗")
        
        time.sleep(check_interval)

def send_alert(message):
    """
    發送告警通知
    """
    sns_client = boto3.client('sns')
    sns_client.publish(
        TopicArn='arn:aws:sns:ap-northeast-1:ACCOUNT:alerts',
        Subject='SageMaker 端點告警',
        Message=message
    )

流量分配 - 藍綠部署

def implement_blue_green_deployment(
    endpoint_name,
    old_model_name,
    new_model_name,
    old_config_name,
    new_config_name
):
    """
    實作藍綠部署策略
    """
    
    # 階段1: 建立新版本端點配置
    sagemaker_client.create_endpoint_config(
        EndpointConfigName=new_config_name,
        ProductionVariants=[
            {
                'VariantName': 'BlueVariant',
                'ModelName': old_model_name,
                'InitialInstanceCount': 2,
                'InstanceType': 'ml.m5.xlarge',
                'InitialVariantWeight': 0.9  # 90%流量
            },
            {
                'VariantName': 'GreenVariant',
                'ModelName': new_model_name,
                'InitialInstanceCount': 1,
                'InstanceType': 'ml.m5.xlarge',
                'InitialVariantWeight': 0.1  # 10%流量測試
            }
        ]
    )
    
    # 階段2: 更新端點使用新配置
    sagemaker_client.update_endpoint(
        EndpointName=endpoint_name,
        EndpointConfigName=new_config_name
    )
    
    print("藍綠部署已啟動,新版本接收10%流量")
    
    # 階段3: 監控並逐步切換
    def gradual_traffic_shift():
        weights = [
            (0.9, 0.1),  # 初始
            (0.7, 0.3),  # 30分鐘後
            (0.5, 0.5),  # 1小時後
            (0.2, 0.8),  # 2小時後
            (0.0, 1.0)   # 最終完全切換
        ]
        
        for blue_weight, green_weight in weights:
            # 更新流量權重
            sagemaker_client.update_endpoint_weights_and_capacities(
                EndpointName=endpoint_name,
                DesiredWeightsAndCapacities=[
                    {
                        'VariantName': 'BlueVariant',
                        'DesiredWeight': blue_weight
                    },
                    {
                        'VariantName': 'GreenVariant',
                        'DesiredWeight': green_weight
                    }
                ]
            )
            
            print(f"流量切換: Blue={blue_weight*100}%, Green={green_weight*100}%")
            
            # 等待並監控指標
            time.sleep(1800)  # 等待30分鐘
    
    return gradual_traffic_shift

# 執行藍綠部署
deploy_fn = implement_blue_green_deployment(
    endpoint_name='production-endpoint',
    old_model_name='model-v1',
    new_model_name='model-v2',
    old_config_name='config-v1',
    new_config_name='config-v2'
)

Lambda + Bedrock 的 Autoscaling

Lambda 併發控制

lambda_client = boto3.client('lambda')

def configure_lambda_autoscaling(function_name):
    """
    配置 Lambda 函數的並發限制
    """
    
    # 設定保留並發
    response = lambda_client.put_function_concurrency(
        FunctionName=function_name,
        ReservedConcurrentExecutions=100  # 保留100個並發執行
    )
    
    # 設定預配置並發(用於減少冷啟動)
    lambda_client.put_provisioned_concurrency_config(
        FunctionName=function_name,
        Qualifier='$LATEST',
        ProvisionedConcurrentExecutions=10  # 預熱10個實例
    )
    
    print(f"Lambda Auto Scaling 配置完成")
    return response

api Gateway 的節流設定

apigateway_client = boto3.client('apigateway')

def configure_api_throttling(api_id, stage_name):
    """
    配置 API Gateway 的節流限制
    """
    
    response = apigateway_client.update_stage(
        restApiId=api_id,
        stageName=stage_name,
        patchOperations=[
            {
                'op': 'replace',
                'path': '/throttle/rateLimit',
                'value': '10000'  # 每秒10000次請求
            },
            {
                'op': 'replace',
                'path': '/throttle/burstLimit',
                'value': '5000'  # 突發5000次
            }
        ]
    )
    
    return response

成本優化策略

按需擴展成本監控(Scheduled)

def implement_cost_aware_scaling(endpoint_name):
    """
    實作成本感知的擴展策略
    """
    
    # 設定工作時段的擴展
    autoscaling_client.put_scheduled_action(
        ServiceNamespace='sagemaker',
        ScheduledActionName='ScaleUpForBusinessHours',
        ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
        ScalableDimension='sagemaker:variant:DesiredInstanceCount',
        Schedule='cron(0 8 * * MON-FRI *)',  # 每週一到五早上8點
        ScalableTargetAction={
            'MinCapacity': 3,
            'MaxCapacity': 10
        }
    )
    
    # 設定非工作時段的縮減
    autoscaling_client.put_scheduled_action(
        ServiceNamespace='sagemaker',
        ScheduledActionName='ScaleDownForOffHours',
        ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
        ScalableDimension='sagemaker:variant:DesiredInstanceCount',
        Schedule='cron(0 20 * * * *)',  # 每天晚上8點
        ScalableTargetAction={
            'MinCapacity': 1,
            'MaxCapacity': 3
        }
    )

monitoring dashboard(成本追蹤)

def create_cost_monitoring_dashboard():
    """
    建立成本監控 CloudWatch 儀表板
    """
    
    dashboard_body = {
        "widgets": [
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/SageMaker", "ModelLatency", {"stat": "Average"}],
                        [".", "Invocations", {"stat": "Sum"}],
                        [".", "ModelSetupTime", {"stat": "Average"}]
                    ],
                    "period": 300,
                    "stat": "Average",
                    "region": "ap-northeast-1",
                    "title": "SageMaker 端點效能"
                }
            },
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/ApplicationELB", "TargetResponseTime", {"stat": "Average"}],
                        [".", "RequestCount", {"stat": "Sum"}]
                    ],
                    "period": 300,
                    "stat": "Average",
                    "region": "ap-northeast-1",
                    "title": "應用程式負載"
                }
            }
        ]
    }
    
    cloudwatch_client.put_dashboard(
        DashboardName='AI-Application-Monitoring',
        DashboardBody=str(dashboard_body)
    )
    
    print("成本監控儀表板已建立")

災難回復+備份策略

備份

def implement_cross_region_backup(
    source_region,
    target_region,
    model_name,
    s3_model_path
):
    """
    實作跨區域模型備份
    """
    
    # 複製模型到目標區域
    s3_client = boto3.client('s3')
    
    # 跨區域複製 S3 模型檔案
    source_bucket = s3_model_path.split('/')[2]
    source_key = '/'.join(s3_model_path.split('/')[3:])
    target_bucket = f"{source_bucket}-{target_region}"
    
    copy_source = {
        'Bucket': source_bucket,
        'Key': source_key
    }
    
    s3_target_client = boto3.client('s3', region_name=target_region)
    s3_target_client.copy_object(
        CopySource=copy_source,
        Bucket=target_bucket,
        Key=source_key
    )
    
    # 在目標區域建立模型
    sagemaker_target = boto3.client('sagemaker', region_name=target_region)
    sagemaker_target.create_model(
        ModelName=f"{model_name}-backup",
        PrimaryContainer={
            'Image': 'your-ecr-image',
            'ModelDataUrl': f's3://{target_bucket}/{source_key}'
        },
        ExecutionRoleArn='arn:aws:iam::ACCOUNT:role/SageMakerRole'
    )
    
    print(f"模型已備份到 {target_region} 區域")

自動故障轉移

def setup_failover_mechanism(primary_endpoint, backup_endpoint):
    """
    設定自動故障轉移機制
    """
    
    def invoke_with_failover(payload):
        runtime_client = boto3.client('sagemaker-runtime')
        
        try:
            # 嘗試主端點
            response = runtime_client.invoke_endpoint(
                EndpointName=primary_endpoint,
                ContentType='application/json',
                Body=payload
            )
            return response
            
        except Exception as e:
            print(f"主端點失敗,切換到備份端點: {str(e)}")
            
            # 自動切換到備份
            try:
                response = runtime_client.invoke_endpoint(
                    EndpointName=backup_endpoint,
                    ContentType='application/json',
                    Body=payload
                )
                
                # 發送告警
                send_alert(f"已自動切換到備份端點 {backup_endpoint}")
                return response
                
            except Exception as backup_error:
                print(f"備份端點也失敗: {str(backup_error)}")
                raise
    
    return invoke_with_failover

今天我們針對各種面向去處理 Autoscaling 考慮的部分!收工


上一篇
企業級安全性與合規性
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言