我們今天探討的狀況為另一個應用場景,在實際狀態,我們會面臨不同推理的需求,有些需要及時回應用戶請求
今天探討的主題為即時推理 Real-time Inference
以及批次處理 Batch Processing
,根據實際專案選擇不同的方案
即時推理(Real-time Inference)
典型應用場景
:
批次處理(Batch Processing)
典型應用場景
:
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFaceModel
# 初始化
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
# 配置 HuggingFace 模型
huggingface_model = HuggingFaceModel(
model_data='s3://your-bucket/model.tar.gz',
role=role,
transformers_version='4.26',
pytorch_version='1.13',
py_version='py39',
env={
'HF_TASK': 'text-classification',
'HF_MODEL_ID': 'distilbert-base-uncased'
}
)
# 部署即時端點
predictor = huggingface_model.deploy(
initial_instance_count=2,
instance_type='ml.m5.xlarge',
endpoint_name='sentiment-analysis-endpoint'
)
# 進行即時推理
result = predictor.predict({
'inputs': "This product is amazing!"
})
print(result)
(根據流量需求)
import boto3
# 初始化 Auto Scaling client
autoscaling_client = boto3.client('application-autoscaling')
# 註冊可擴展目標
response = autoscaling_client.register_scalable_target(
ServiceNamespace='sagemaker',
ResourceId=f'endpoint/sentiment-analysis-endpoint/variant/AllTraffic',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
MinCapacity=1,
MaxCapacity=10
)
# 配置目標追蹤擴展政策
autoscaling_client.put_scaling_policy(
PolicyName='SageMakerEndpointInvocationScalingPolicy',
ServiceNamespace='sagemaker',
ResourceId=f'endpoint/sentiment-analysis-endpoint/variant/AllTraffic',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 750.0, # 目標每分鐘調用次數
'PredefinedMetricSpecification': {
'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
},
'ScaleInCooldown': 300,
'ScaleOutCooldown': 60
}
)
(針對重複請求我們增加 Cache 層處理)
import json
import hashlib
from datetime import datetime, timedelta
class CachedPredictor:
def __init__(self, predictor, cache_ttl_seconds=300):
self.predictor = predictor
self.cache = {}
self.cache_ttl = timedelta(seconds=cache_ttl_seconds)
def _get_cache_key(self, input_data):
"""生成快取鍵值"""
input_str = json.dumps(input_data, sort_keys=True)
return hashlib.md5(input_str.encode()).hexdigest()
def predict(self, input_data):
"""帶快取的推理"""
cache_key = self._get_cache_key(input_data)
# 檢查快取
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if datetime.now() - timestamp < self.cache_ttl:
print(f"Cache hit for key: {cache_key}")
return cached_result
# 快取未命中,進行實際推理
result = self.predictor.predict(input_data)
self.cache[cache_key] = (result, datetime.now())
return result
# ex
cached_predictor = CachedPredictor(predictor, cache_ttl_seconds=600)
result = cached_predictor.predict({'inputs': "This is great!"})
(處理大量離線資料)
from sagemaker.huggingface import HuggingFaceModel
# 使用已訓練的模型
huggingface_model = HuggingFaceModel(
model_data='s3://your-bucket/model.tar.gz',
role=role,
transformers_version='4.26',
pytorch_version='1.13',
py_version='py39'
)
# 建立批次轉換器
batch_transformer = huggingface_model.transformer(
instance_count=3,
instance_type='ml.m5.xlarge',
strategy='MultiRecord',
max_payload=1, # MB
max_concurrent_transforms=8,
output_path='s3://your-bucket/batch-output/'
)
# 執行批次轉換
batch_transformer.transform(
data='s3://your-bucket/batch-input/',
content_type='application/json',
split_type='Line',
join_source='Input'
)
# 等待完成
batch_transformer.wait()
(批次輸入需要特定格式,通常為 JSON Lines)
import json
def prepare_batch_input(texts, output_file):
"""準備批次推理的輸入檔案"""
with open(output_file, 'w') as f:
for text in texts:
input_data = {'inputs': text}
f.write(json.dumps(input_data) + '\n')
# 範例使用
sample_texts = [
"This product exceeded my expectations!",
"Terrible quality, would not recommend.",
"Average product, nothing special.",
"Best purchase I've made this year!"
]
prepare_batch_input(sample_texts, 'batch_input.jsonl')
# 上傳到 S3
s3_client = boto3.client('s3')
s3_client.upload_file(
'batch_input.jsonl',
'your-bucket',
'batch-input/batch_input.jsonl'
)
import json
def process_batch_output(output_file):
"""處理批次推理的輸出結果"""
results = []
with open(output_file, 'r') as f:
for line in f:
result = json.loads(line)
results.append(result)
return results
# 從 S3 下載輸出
s3_client.download_file(
'your-bucket',
'batch-output/batch_input.jsonl.out',
'batch_output.jsonl'
)
# 處理結果
results = process_batch_output('batch_output.jsonl')
for i, result in enumerate(results):
print(f"Sample {i+1}: {result}")
結合 AWS Lambda 來建立更靈活的推理架構
import json
import boto3
def lambda_handler(event, context):
"""
Lambda 函數作為 SageMaker 端點的前端
處理請求預處理和結果後處理
"""
runtime = boto3.client('sagemaker-runtime')
# 從事件中提取輸入
input_text = event.get('text', '')
# 預處理
payload = {
'inputs': input_text,
'parameters': {
'max_length': 128,
'temperature': 0.7
}
}
try:
# 調用 SageMaker 端點
response = runtime.invoke_endpoint(
EndpointName='sentiment-analysis-endpoint',
ContentType='application/json',
Body=json.dumps(payload)
)
# 解析回應
result = json.loads(response['Body'].read().decode())
# 後處理
processed_result = {
'statusCode': 200,
'body': json.dumps({
'input': input_text,
'prediction': result,
'timestamp': context.request_id
})
}
except Exception as e:
processed_result = {
'statusCode': 500,
'body': json.dumps({
'error': str(e)
})
}
return processed_result
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn
# 建立處理器
sklearn_processor = ScriptProcessor(
image_uri='1342143124.dkr.ecr.<你的region>.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
role=role,
instance_type='ml.m5.xlarge',
instance_count=5,
command=['python3']
)
# 執行批次預處理
sklearn_processor.run(
code='preprocessing.py',
inputs=[
ProcessingInput(
source='s3://your-bucket/raw-data/',
destination='/opt/ml/processing/input'
)
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/output',
destination='s3://your-bucket/processed-data/'
)
],
arguments=['--batch-size', '1000']
)
# preprocessing.py
import argparse
import os
import pandas as pd
from sklearn.preprocessing import StandardScaler
import joblib
def preprocess_batch(input_path, output_path, batch_size):
"""批次預處理資料"""
# 讀取所有輸入檔案
all_files = [os.path.join(input_path, f) for f in os.listdir(input_path)]
for file in all_files:
# 分批讀取大檔案
for chunk in pd.read_csv(file, chunksize=batch_size):
# 執行預處理
processed = preprocess_data(chunk)
# 儲存處理後的資料
output_file = os.path.join(
output_path,
f"processed_{os.path.basename(file)}"
)
processed.to_csv(output_file, mode='a', index=False)
def preprocess_data(df):
"""實際的預處理邏輯"""
# 移除缺失值
df = df.dropna()
# 標準化數值特徵
numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
scaler = StandardScaler()
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
return df
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--batch-size', type=int, default=1000)
args = parser.parse_args()
input_path = '/opt/ml/processing/input'
output_path = '/opt/ml/processing/output'
preprocess_batch(input_path, output_path, args.batch_size)
即時推理成本優化
# 1. 使用 Serverless Inference 處理間歇性流量
from sagemaker.serverless import ServerlessInferenceConfig
serverless_config = ServerlessInferenceConfig(
memory_size_in_mb=2048,
max_concurrency=10
)
predictor = huggingface_model.deploy(
serverless_inference_config=serverless_config,
endpoint_name='serverless-sentiment-endpoint'
)
# 2. 使用 Multi-Model Endpoints 共享資源
from sagemaker.multidatamodel import MultiDataModel
mme = MultiDataModel(
name='multi-model-endpoint',
model_data_prefix='s3://your-bucket/models/',
model=huggingface_model,
sagemaker_session=sagemaker_session
)
mme_predictor = mme.deploy(
initial_instance_count=1,
instance_type='ml.m5.xlarge'
)
批次處理成本優化
使用 spot instance 降低成本
# 使用 Spot 實例降低成本
batch_transformer = huggingface_model.transformer(
instance_count=5,
instance_type='ml.m5.xlarge',
use_spot_instances=True,
max_wait_time=7200, # 2 小時
max_run_time=3600, # 1 小時
output_path='s3://your-bucket/batch-output/'
)
使用 cloud watch
import boto3
cloudwatch = boto3.client('cloudwatch')
# 為即時端點建立延遲告警
cloudwatch.put_metric_alarm(
AlarmName='HighModelLatency',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='ModelLatency',
Namespace='AWS/SageMaker',
Period=60,
Statistic='Average',
Threshold=1000.0, # 毫秒
ActionsEnabled=True,
AlarmActions=['arn:aws:sns:<你的 region>:123456789012:alerts'],
Dimensions=[
{
'Name': 'EndpointName',
'Value': 'sentiment-analysis-endpoint'
},
{
'Name': 'VariantName',
'Value': 'AllTraffic'
}
]
)
# 為批次轉換建立失敗告警
cloudwatch.put_metric_alarm(
AlarmName='BatchTransformFailures',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='TransformJobsFailed',
Namespace='AWS/SageMaker',
Period=300,
Statistic='Sum',
Threshold=0,
ActionsEnabled=True,
AlarmActions=['arn:aws:sns:<你的 region>:123456789012:alerts']
)
做到 21 天,有點疲乏XD再加把勁!!