實際的機器學習會有多重情境,需要多個 pipeline 去做處理
像是從數據處理、模型訓練到部署往往需要經過多個步驟,
也許有些地方還需要人工介入這樣
我們將探討如何使用 SageMaker Pipeline 來自動化整個 ML 工作流程,
讓我們的機器學習專案更加高效和可重現
SageMaker Pipeline 是一個專為機器學習工作流程設計的 CI/CD 服務
他有幾項功能
以下是包含會用到的步驟類型
可以在執行時動態傳入的參數,提高 Pipeline 的靈活性。
步驟執行後產生的輸出,可以作為後續步驟的輸入。
了解大致上的組件後我們隨後開始實作
import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
# 初始化
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker-pipeline-demo'
# 定義可調整的參數
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_count = ParameterInteger(
name="TrainingInstanceCount",
default_value=1
)
input_data = ParameterString(
name="InputData",
default_value=f"s3://{bucket}/{prefix}/input/data.csv"
)
model_approval_status = ParameterString(
name="ModelApprovalStatus",
default_value="PendingManualApproval"
)
accuracy_threshold = ParameterString(
name="AccuracyThreshold",
default_value="0.75"
)
# 創建 SKLearn Processor
sklearn_processor = SKLearnProcessor(
framework_version='1.0-1',
instance_type=processing_instance_type,
instance_count=1,
base_job_name='sklearn-processor',
role=role,
sagemaker_session=sagemaker_session
)
# 定義處理步驟
step_process = ProcessingStep(
name="DataProcessing",
processor=sklearn_processor,
inputs=[
ProcessingInput(
source=input_data,
destination="/opt/ml/processing/input"
)
],
outputs=[
ProcessingOutput(
output_name="train",
source="/opt/ml/processing/train",
destination=f"s3://{bucket}/{prefix}/train"
),
ProcessingOutput(
output_name="validation",
source="/opt/ml/processing/validation",
destination=f"s3://{bucket}/{prefix}/validation"
),
ProcessingOutput(
output_name="test",
source="/opt/ml/processing/test",
destination=f"s3://{bucket}/{prefix}/test"
)
],
code="preprocessing.py" # 等等會有範例
)
preprocessing.py
這裡的範例
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import os
if __name__ == "__main__":
# 讀取數據
input_path = "/opt/ml/processing/input/data.csv"
df = pd.read_csv(input_path)
# 數據清理和特徵工程
# 移除缺失值
df = df.dropna()
# 特徵選擇(假設最後一列是目標變數)
X = df.iloc[:, :-1]
y = df.iloc[:, -1]
# 分割數據集
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=0.3, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
X_temp, y_temp, test_size=0.5, random_state=42
)
# 保存處理後的數據
train_df = pd.concat([X_train, y_train], axis=1)
val_df = pd.concat([X_val, y_val], axis=1)
test_df = pd.concat([X_test, y_test], axis=1)
train_df.to_csv("/opt/ml/processing/train/train.csv", index=False, header=False)
val_df.to_csv("/opt/ml/processing/validation/validation.csv", index=False, header=False)
test_df.to_csv("/opt/ml/processing/test/test.csv", index=False, header=False)
# 使用 XGBoost 作為範例
from sagemaker.image_uris import retrieve
xgboost_image = retrieve('xgboost', region, version='1.5-1')
xgb_estimator = Estimator(
image_uri=xgboost_image,
instance_type=training_instance_type,
instance_count=training_instance_count,
output_path=f"s3://{bucket}/{prefix}/output",
role=role,
sagemaker_session=sagemaker_session
)
xgb_estimator.set_hyperparameters(
objective='binary:logistic',
num_round=100,
max_depth=5,
eta=0.2,
subsample=0.8,
colsample_bytree=0.8
)
# 定義訓練步驟
step_train = TrainingStep(
name="ModelTraining",
estimator=xgb_estimator,
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv"
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv"
)
}
)
# 創建評估處理器
eval_processor = SKLearnProcessor(
framework_version='1.0-1',
instance_type=processing_instance_type,
instance_count=1,
base_job_name='eval-processor',
role=role,
sagemaker_session=sagemaker_session
)
# 定義評估結果屬性文件
evaluation_report = PropertyFile(
name="EvaluationReport",
output_name="evaluation",
path="evaluation.json"
)
# 定義評估步驟
step_eval = ProcessingStep(
name="ModelEvaluation",
processor=eval_processor,
inputs=[
ProcessingInput(
source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model"
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs[
"test"
].S3Output.S3Uri,
destination="/opt/ml/processing/test"
)
],
outputs=[
ProcessingOutput(
output_name="evaluation",
source="/opt/ml/processing/evaluation",
destination=f"s3://{bucket}/{prefix}/evaluation"
)
],
code="evaluation.py",
property_files=[evaluation_report]
)
這裡建立 evaluation.py
import json
import tarfile
import pickle
import pandas as pd
import xgboost as xgb
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import os
if __name__ == "__main__":
# 載入模型
model_path = "/opt/ml/processing/model/model.tar.gz"
with tarfile.open(model_path) as tar:
tar.extractall(path="/opt/ml/processing/model")
model = pickle.load(open("/opt/ml/processing/model/xgboost-model", "rb"))
# 載入測試數據
test_path = "/opt/ml/processing/test/test.csv"
test_df = pd.read_csv(test_path, header=None)
X_test = test_df.iloc[:, :-1]
y_test = test_df.iloc[:, -1]
# 預測
dtest = xgb.DMatrix(X_test)
predictions = model.predict(dtest)
predictions_binary = [1 if p > 0.5 else 0 for p in predictions]
# 計算評估指標
accuracy = accuracy_score(y_test, predictions_binary)
precision = precision_score(y_test, predictions_binary)
recall = recall_score(y_test, predictions_binary)
f1 = f1_score(y_test, predictions_binary)
# 保存評估結果
report = {
"metrics": {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1
}
}
output_dir = "/opt/ml/processing/evaluation"
os.makedirs(output_dir, exist_ok=True)
with open(f"{output_dir}/evaluation.json", "w") as f:
json.dump(report, f)
print(f"Model Accuracy: {accuracy}")
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
# 定義準確度條件
cond_gte = ConditionGreaterThanOrEqualTo(
left=JsonGet(
step_name=step_eval.name,
property_file=evaluation_report,
json_path="metrics.accuracy"
),
right=accuracy_threshold
)
# 定義模型註冊步驟
step_register = RegisterModel(
name="RegisterModel",
estimator=xgb_estimator,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name="xgboost-model-package-group",
approval_status=model_approval_status,
model_metrics=ModelMetrics(
model_statistics=MetricsSource(
s3_uri=f"{step_eval.properties.ProcessingOutputConfig.Outputs['evaluation'].S3Output.S3Uri}/evaluation.json",
content_type="application/json"
)
)
)
# 定義條件步驟
step_cond = ConditionStep(
name="CheckAccuracy",
conditions=[cond_gte],
if_steps=[step_register],
else_steps=[]
)
# 組合 Pipeline
pipeline = Pipeline(
name="MLWorkflowPipeline",
parameters=[
processing_instance_type,
training_instance_type,
training_instance_count,
input_data,
model_approval_status,
accuracy_threshold
],
steps=[step_process, step_train, step_eval, step_cond],
sagemaker_session=sagemaker_session
)
# 創建或更新 Pipeline
pipeline.upsert(role_arn=role)
# 執行 Pipeline
execution = pipeline.start()
# 查看執行狀態
execution.describe()
# 等待執行完成
execution.wait()
# 查看步驟執行結果
execution.list_steps()
# 查看 Pipeline 執行歷史
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = "MLWorkflowPipeline"
pipeline = Pipeline(name=pipeline_name)
# 列出所有執行
executions = pipeline.list_executions()
for execution_summary in executions['PipelineExecutionSummaries'][:5]:
print(f"執行 ARN: {execution_summary['PipelineExecutionArn']}")
print(f"狀態: {execution_summary['PipelineExecutionStatus']}")
print(f"開始時間: {execution_summary['StartTime']}")
print("---")
# 使用參數讓 Pipeline 更靈活
model_prefix = ParameterString(
name="ModelPrefix",
default_value="my-model"
)
# 在步驟中使用參數
output_path = f"s3://{bucket}/{model_prefix}/output"
# 為處理步驟添加重試機制
from sagemaker.workflow.retry import RetryPolicy, StepRetryPolicy
retry_policy = StepRetryPolicy(
exception_types=["Step.SERVICE_FAULT", "Step.THROTTLING"],
max_attempts=3,
interval_seconds=10,
backoff_rate=2.0
)
step_process_with_retry = ProcessingStep(
name="DataProcessingWithRetry",
processor=sklearn_processor,
# ... 其他參數
retry_policies=[retry_policy]
)
from sagemaker.workflow.steps import CacheConfig
# 啟用步驟快取(24小時)
cache_config = CacheConfig(
enable_caching=True,
expire_after="PT24H" # ISO 8601 格式
)
step_train_cached = TrainingStep(
name="ModelTrainingCached",
estimator=xgb_estimator,
inputs=training_inputs,
cache_config=cache_config
)
# 多個獨立步驟可以平行執行
step_process_1 = ProcessingStep(name="Process1", ...)
step_process_2 = ProcessingStep(name="Process2", ...)
step_merge = ProcessingStep(
name="MergeResults",
depends_on=[step_process_1, step_process_2] # 等待兩個步驟完成
# ... 其他參數
)
使用 AWS CodePipeline 觸發 SageMaker Pipeline
import boto3
codepipeline = boto3.client('codepipeline')
sagemaker_client = boto3.client('sagemaker')
def lambda_handler(event, context):
# 從 CodePipeline 獲取參數
job_id = event['CodePipeline.job']['id']
try:
# 啟動 SageMaker Pipeline
response = sagemaker_client.start_pipeline_execution(
PipelineName='MLWorkflowPipeline',
PipelineExecutionDisplayName='CodePipeline-Triggered',
PipelineParameters=[
{
'Name': 'InputData',
'Value': 's3://your-bucket/latest-data.csv'
}
]
)
# 通知 CodePipeline 成功
codepipeline.put_job_success_result(jobId=job_id)
except Exception as e:
# 通知 CodePipeline 失敗
codepipeline.put_job_failure_result(
jobId=job_id,
failureDetails={'message': str(e)}
)
完成!