今天我們將探討如何在AWS上實現Auto Scaling和高可用性設計,
確保我們的AI應用能夠穩定可靠地服務用戶。通常確保我們系統用量比較高的時候
確實會考慮到 autoscaling 的部分部署我們的 AI 應用
開始我們的嘗試吧!
以下場景可以考慮高可用性到我們的系統內
EC2 Auto Scaling
Application Auto Scaling
SageMaker Auto Scaling
Target Tracking Scaling
維持特定指標在目標值附近
例如 :
CPU使用率保持在70%
請求數量維持在每實例1000次/分鐘
Step Scaling
根據CloudWatch告警觸發,階梯式增減容量
Scheduled Scaling
根據預期的流量模式定時調整,適合有規律的業務場景。
設定 autoscaling 策略
import boto3
from datetime import datetime
# 初始化客戶端
sagemaker_client = boto3.client('sagemaker')
autoscaling_client = boto3.client('application-autoscaling')
cloudwatch_client = boto3.client('cloudwatch')
# 端點配置
endpoint_name = 'bedrock-qa-endpoint'
variant_name = 'AllTraffic'
resource_id = f'endpoint/{endpoint_name}/variant/{variant_name}'
# 註冊可擴展目標
response = autoscaling_client.register_scalable_target(
ServiceNamespace='sagemaker',
ResourceId=resource_id,
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
MinCapacity=1, # 最小實例數
MaxCapacity=10, # 最大實例數
RoleARN='arn:aws:iam::YOUR_ACCOUNT:role/SageMakerAutoScalingRole'
)
# 設定 Target Tracking Scaling 策略
response = autoscaling_client.put_scaling_policy(
PolicyName='SageMakerEndpointInvocationScaling',
ServiceNamespace='sagemaker',
ResourceId=resource_id,
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 1000.0, # 目標值:每分鐘1000次調用
'PredefinedMetricSpecification': {
'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
},
'ScaleInCooldown': 300, # 縮容冷卻時間(秒)
'ScaleOutCooldown': 60 # 擴容冷卻時間(秒)
}
)
print(f"Auto Scaling 策略已設定: {response['PolicyARN']}")
基於自定義指標擴展
# 建立基於模型延遲的 Auto Scaling
def create_latency_based_scaling(endpoint_name, target_latency_ms=500):
"""
基於模型推理延遲設定 Auto Scaling
"""
resource_id = f'endpoint/{endpoint_name}/variant/AllTraffic'
# 設定基於 ModelLatency 的擴展策略
response = autoscaling_client.put_scaling_policy(
PolicyName='ModelLatencyScaling',
ServiceNamespace='sagemaker',
ResourceId=resource_id,
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': target_latency_ms,
'CustomizedMetricSpecification': {
'MetricName': 'ModelLatency',
'Namespace': 'AWS/SageMaker',
'Statistic': 'Average',
'Unit': 'Milliseconds',
'Dimensions': [
{
'Name': 'EndpointName',
'Value': endpoint_name
},
{
'Name': 'VariantName',
'Value': 'AllTraffic'
}
]
},
'ScaleInCooldown': 600,
'ScaleOutCooldown': 300
}
)
return response
# 應用延遲基礎擴展
create_latency_based_scaling('bedrock-qa-endpoint', target_latency_ms=500)
整合前面的監控,監控 scaling 活動
def monitor_scaling_activities(endpoint_name, hours=1):
"""
監控最近的 Auto Scaling 活動
"""
resource_id = f'endpoint/{endpoint_name}/variant/AllTraffic'
response = autoscaling_client.describe_scaling_activities(
ServiceNamespace='sagemaker',
ResourceId=resource_id,
MaxResults=50
)
print(f"\n最近 {hours} 小時的擴展活動:")
print("-" * 80)
for activity in response['ScalingActivities']:
start_time = activity['StartTime']
status = activity['StatusCode']
description = activity['Description']
print(f"時間: {start_time}")
print(f"狀態: {status}")
print(f"描述: {description}")
print("-" * 80)
# 執行監控
monitor_scaling_activities('bedrock-qa-endpoint')
def create_multi_az_endpoint(model_name, endpoint_config_name, endpoint_name):
"""
建立跨多個可用區的高可用端點
"""
# 建立端點配置 - 多實例跨AZ
endpoint_config = sagemaker_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
'VariantName': 'AllTraffic',
'ModelName': model_name,
'InitialInstanceCount': 3, # 至少3個實例
'InstanceType': 'ml.m5.xlarge',
'InitialVariantWeight': 1.0
}
],
DataCaptureConfig={
'EnableCapture': True,
'InitialSamplingPercentage': 100,
'DestinationS3Uri': 's3://my-bucket/data-capture',
'CaptureOptions': [
{'CaptureMode': 'Input'},
{'CaptureMode': 'Output'}
]
}
)
# 建立端點
endpoint = sagemaker_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=endpoint_config_name
)
return endpoint
# 部署多AZ端點
create_multi_az_endpoint(
model_name='my-ai-model',
endpoint_config_name='multi-az-config-v1',
endpoint_name='high-availability-endpoint'
)
import time
def implement_health_check(endpoint_name, check_interval=60):
"""
實作端點健康檢查機制
"""
def check_endpoint_health():
try:
# 檢查端點狀態
response = sagemaker_client.describe_endpoint(
EndpointName=endpoint_name
)
status = response['EndpointStatus']
if status != 'InService':
print(f"⚠️ 端點狀態異常: {status}")
return False
# 執行測試推理
runtime_client = boto3.client('sagemaker-runtime')
test_response = runtime_client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType='application/json',
Body='{"text": "health check test"}'
)
if test_response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"✓ 端點健康檢查通過 - {datetime.now()}")
return True
else:
print(f"✗ 端點回應異常")
return False
except Exception as e:
print(f"✗ 健康檢查失敗: {str(e)}")
return False
# 持續監控
while True:
is_healthy = check_endpoint_health()
if not is_healthy:
# 觸發告警
send_alert(f"端點 {endpoint_name} 健康檢查失敗")
time.sleep(check_interval)
def send_alert(message):
"""
發送告警通知
"""
sns_client = boto3.client('sns')
sns_client.publish(
TopicArn='arn:aws:sns:ap-northeast-1:ACCOUNT:alerts',
Subject='SageMaker 端點告警',
Message=message
)
def implement_blue_green_deployment(
endpoint_name,
old_model_name,
new_model_name,
old_config_name,
new_config_name
):
"""
實作藍綠部署策略
"""
# 階段1: 建立新版本端點配置
sagemaker_client.create_endpoint_config(
EndpointConfigName=new_config_name,
ProductionVariants=[
{
'VariantName': 'BlueVariant',
'ModelName': old_model_name,
'InitialInstanceCount': 2,
'InstanceType': 'ml.m5.xlarge',
'InitialVariantWeight': 0.9 # 90%流量
},
{
'VariantName': 'GreenVariant',
'ModelName': new_model_name,
'InitialInstanceCount': 1,
'InstanceType': 'ml.m5.xlarge',
'InitialVariantWeight': 0.1 # 10%流量測試
}
]
)
# 階段2: 更新端點使用新配置
sagemaker_client.update_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=new_config_name
)
print("藍綠部署已啟動,新版本接收10%流量")
# 階段3: 監控並逐步切換
def gradual_traffic_shift():
weights = [
(0.9, 0.1), # 初始
(0.7, 0.3), # 30分鐘後
(0.5, 0.5), # 1小時後
(0.2, 0.8), # 2小時後
(0.0, 1.0) # 最終完全切換
]
for blue_weight, green_weight in weights:
# 更新流量權重
sagemaker_client.update_endpoint_weights_and_capacities(
EndpointName=endpoint_name,
DesiredWeightsAndCapacities=[
{
'VariantName': 'BlueVariant',
'DesiredWeight': blue_weight
},
{
'VariantName': 'GreenVariant',
'DesiredWeight': green_weight
}
]
)
print(f"流量切換: Blue={blue_weight*100}%, Green={green_weight*100}%")
# 等待並監控指標
time.sleep(1800) # 等待30分鐘
return gradual_traffic_shift
# 執行藍綠部署
deploy_fn = implement_blue_green_deployment(
endpoint_name='production-endpoint',
old_model_name='model-v1',
new_model_name='model-v2',
old_config_name='config-v1',
new_config_name='config-v2'
)
Lambda 併發控制
lambda_client = boto3.client('lambda')
def configure_lambda_autoscaling(function_name):
"""
配置 Lambda 函數的並發限制
"""
# 設定保留並發
response = lambda_client.put_function_concurrency(
FunctionName=function_name,
ReservedConcurrentExecutions=100 # 保留100個並發執行
)
# 設定預配置並發(用於減少冷啟動)
lambda_client.put_provisioned_concurrency_config(
FunctionName=function_name,
Qualifier='$LATEST',
ProvisionedConcurrentExecutions=10 # 預熱10個實例
)
print(f"Lambda Auto Scaling 配置完成")
return response
api Gateway 的節流設定
apigateway_client = boto3.client('apigateway')
def configure_api_throttling(api_id, stage_name):
"""
配置 API Gateway 的節流限制
"""
response = apigateway_client.update_stage(
restApiId=api_id,
stageName=stage_name,
patchOperations=[
{
'op': 'replace',
'path': '/throttle/rateLimit',
'value': '10000' # 每秒10000次請求
},
{
'op': 'replace',
'path': '/throttle/burstLimit',
'value': '5000' # 突發5000次
}
]
)
return response
按需擴展成本監控(Scheduled)
def implement_cost_aware_scaling(endpoint_name):
"""
實作成本感知的擴展策略
"""
# 設定工作時段的擴展
autoscaling_client.put_scheduled_action(
ServiceNamespace='sagemaker',
ScheduledActionName='ScaleUpForBusinessHours',
ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
Schedule='cron(0 8 * * MON-FRI *)', # 每週一到五早上8點
ScalableTargetAction={
'MinCapacity': 3,
'MaxCapacity': 10
}
)
# 設定非工作時段的縮減
autoscaling_client.put_scheduled_action(
ServiceNamespace='sagemaker',
ScheduledActionName='ScaleDownForOffHours',
ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
Schedule='cron(0 20 * * * *)', # 每天晚上8點
ScalableTargetAction={
'MinCapacity': 1,
'MaxCapacity': 3
}
)
def create_cost_monitoring_dashboard():
"""
建立成本監控 CloudWatch 儀表板
"""
dashboard_body = {
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["AWS/SageMaker", "ModelLatency", {"stat": "Average"}],
[".", "Invocations", {"stat": "Sum"}],
[".", "ModelSetupTime", {"stat": "Average"}]
],
"period": 300,
"stat": "Average",
"region": "ap-northeast-1",
"title": "SageMaker 端點效能"
}
},
{
"type": "metric",
"properties": {
"metrics": [
["AWS/ApplicationELB", "TargetResponseTime", {"stat": "Average"}],
[".", "RequestCount", {"stat": "Sum"}]
],
"period": 300,
"stat": "Average",
"region": "ap-northeast-1",
"title": "應用程式負載"
}
}
]
}
cloudwatch_client.put_dashboard(
DashboardName='AI-Application-Monitoring',
DashboardBody=str(dashboard_body)
)
print("成本監控儀表板已建立")
備份
def implement_cross_region_backup(
source_region,
target_region,
model_name,
s3_model_path
):
"""
實作跨區域模型備份
"""
# 複製模型到目標區域
s3_client = boto3.client('s3')
# 跨區域複製 S3 模型檔案
source_bucket = s3_model_path.split('/')[2]
source_key = '/'.join(s3_model_path.split('/')[3:])
target_bucket = f"{source_bucket}-{target_region}"
copy_source = {
'Bucket': source_bucket,
'Key': source_key
}
s3_target_client = boto3.client('s3', region_name=target_region)
s3_target_client.copy_object(
CopySource=copy_source,
Bucket=target_bucket,
Key=source_key
)
# 在目標區域建立模型
sagemaker_target = boto3.client('sagemaker', region_name=target_region)
sagemaker_target.create_model(
ModelName=f"{model_name}-backup",
PrimaryContainer={
'Image': 'your-ecr-image',
'ModelDataUrl': f's3://{target_bucket}/{source_key}'
},
ExecutionRoleArn='arn:aws:iam::ACCOUNT:role/SageMakerRole'
)
print(f"模型已備份到 {target_region} 區域")
自動故障轉移
def setup_failover_mechanism(primary_endpoint, backup_endpoint):
"""
設定自動故障轉移機制
"""
def invoke_with_failover(payload):
runtime_client = boto3.client('sagemaker-runtime')
try:
# 嘗試主端點
response = runtime_client.invoke_endpoint(
EndpointName=primary_endpoint,
ContentType='application/json',
Body=payload
)
return response
except Exception as e:
print(f"主端點失敗,切換到備份端點: {str(e)}")
# 自動切換到備份
try:
response = runtime_client.invoke_endpoint(
EndpointName=backup_endpoint,
ContentType='application/json',
Body=payload
)
# 發送告警
send_alert(f"已自動切換到備份端點 {backup_endpoint}")
return response
except Exception as backup_error:
print(f"備份端點也失敗: {str(backup_error)}")
raise
return invoke_with_failover
今天我們針對各種面向去處理 Autoscaling 考慮的部分!收工