iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0

前言

實際的機器學習會有多重情境,需要多個 pipeline 去做處理
像是從數據處理、模型訓練到部署往往需要經過多個步驟,
也許有些地方還需要人工介入這樣
我們將探討如何使用 SageMaker Pipeline 來自動化整個 ML 工作流程,
讓我們的機器學習專案更加高效和可重現

什麼是 Sagemaker pipeline

SageMaker Pipeline 是一個專為機器學習工作流程設計的 CI/CD 服務

他有幾項功能

  • 自動化 ML 工作流程:從數據準備到模型部署的完整流程
  • 提高可重現性:確保每次執行都使用相同的步驟和參數
  • 版本控制:追蹤每次執行的參數和結果
  • 並行處理:支援步驟的並行執行,提高效率
  • 條件執行:根據條件決定是否執行某些步驟

關於 pipeline 組件

  1. steps 步驟

以下是包含會用到的步驟類型

  • ProcessingStep:數據處理
  • TrainingStep:模型訓練
  • TuningStep:超參數調優
  • ModelStep:模型註冊
  • ConditionStep:條件判斷
  • TransformStep:批次轉換
  1. Parameters (參數)

可以在執行時動態傳入的參數,提高 Pipeline 的靈活性。

  1. Properties(屬性)

步驟執行後產生的輸出,可以作為後續步驟的輸入。

了解大致上的組件後我們隨後開始實作

實作: 建立完整的 ML Pipeline

我們先進行環境設置

import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

# 初始化
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker-pipeline-demo'

定義 Pipeline 參數

# 定義可調整的參數
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)

input_data = ParameterString(
    name="InputData",
    default_value=f"s3://{bucket}/{prefix}/input/data.csv"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

accuracy_threshold = ParameterString(
    name="AccuracyThreshold",
    default_value="0.75"
)

建立數據處理步驟

# 創建 SKLearn Processor
sklearn_processor = SKLearnProcessor(
    framework_version='1.0-1',
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name='sklearn-processor',
    role=role,
    sagemaker_session=sagemaker_session
)

# 定義處理步驟
step_process = ProcessingStep(
    name="DataProcessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=f"s3://{bucket}/{prefix}/train"
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=f"s3://{bucket}/{prefix}/validation"
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=f"s3://{bucket}/{prefix}/test"
        )
    ],
    code="preprocessing.py" # 等等會有範例
)

preprocessing.py 這裡的範例

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import os

if __name__ == "__main__":
    # 讀取數據
    input_path = "/opt/ml/processing/input/data.csv"
    df = pd.read_csv(input_path)
    
    # 數據清理和特徵工程
    # 移除缺失值
    df = df.dropna()
    
    # 特徵選擇(假設最後一列是目標變數)
    X = df.iloc[:, :-1]
    y = df.iloc[:, -1]
    
    # 分割數據集
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.3, random_state=42
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42
    )
    
    # 保存處理後的數據
    train_df = pd.concat([X_train, y_train], axis=1)
    val_df = pd.concat([X_val, y_val], axis=1)
    test_df = pd.concat([X_test, y_test], axis=1)
    
    train_df.to_csv("/opt/ml/processing/train/train.csv", index=False, header=False)
    val_df.to_csv("/opt/ml/processing/validation/validation.csv", index=False, header=False)
    test_df.to_csv("/opt/ml/processing/test/test.csv", index=False, header=False)

# 使用 XGBoost 作為範例
from sagemaker.image_uris import retrieve

xgboost_image = retrieve('xgboost', region, version='1.5-1')

xgb_estimator = Estimator(
    image_uri=xgboost_image,
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    output_path=f"s3://{bucket}/{prefix}/output",
    role=role,
    sagemaker_session=sagemaker_session
)

xgb_estimator.set_hyperparameters(
    objective='binary:logistic',
    num_round=100,
    max_depth=5,
    eta=0.2,
    subsample=0.8,
    colsample_bytree=0.8
)

# 定義訓練步驟
step_train = TrainingStep(
    name="ModelTraining",
    estimator=xgb_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

建立模型評估步驟

# 創建評估處理器
eval_processor = SKLearnProcessor(
    framework_version='1.0-1',
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name='eval-processor',
    role=role,
    sagemaker_session=sagemaker_session
)

# 定義評估結果屬性文件
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

# 定義評估步驟
step_eval = ProcessingStep(
    name="ModelEvaluation",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{bucket}/{prefix}/evaluation"
        )
    ],
    code="evaluation.py",
    property_files=[evaluation_report]
)

這裡建立 evaluation.py

import json
import tarfile
import pickle
import pandas as pd
import xgboost as xgb
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import os

if __name__ == "__main__":
    # 載入模型
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path="/opt/ml/processing/model")
    
    model = pickle.load(open("/opt/ml/processing/model/xgboost-model", "rb"))
    
    # 載入測試數據
    test_path = "/opt/ml/processing/test/test.csv"
    test_df = pd.read_csv(test_path, header=None)
    
    X_test = test_df.iloc[:, :-1]
    y_test = test_df.iloc[:, -1]
    
    # 預測
    dtest = xgb.DMatrix(X_test)
    predictions = model.predict(dtest)
    predictions_binary = [1 if p > 0.5 else 0 for p in predictions]
    
    # 計算評估指標
    accuracy = accuracy_score(y_test, predictions_binary)
    precision = precision_score(y_test, predictions_binary)
    recall = recall_score(y_test, predictions_binary)
    f1 = f1_score(y_test, predictions_binary)
    
    # 保存評估結果
    report = {
        "metrics": {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": f1
        }
    }
    
    output_dir = "/opt/ml/processing/evaluation"
    os.makedirs(output_dir, exist_ok=True)
    
    with open(f"{output_dir}/evaluation.json", "w") as f:
        json.dump(report, f)
    
    print(f"Model Accuracy: {accuracy}")

建立條件步驟和模型註冊

from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

# 定義準確度條件
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy"
    ),
    right=accuracy_threshold
)

# 定義模型註冊步驟
step_register = RegisterModel(
    name="RegisterModel",
    estimator=xgb_estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name="xgboost-model-package-group",
    approval_status=model_approval_status,
    model_metrics=ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=f"{step_eval.properties.ProcessingOutputConfig.Outputs['evaluation'].S3Output.S3Uri}/evaluation.json",
            content_type="application/json"
        )
    )
)

# 定義條件步驟
step_cond = ConditionStep(
    name="CheckAccuracy",
    conditions=[cond_gte],
    if_steps=[step_register],
    else_steps=[]
)

建立並執行 pipeline

# 組合 Pipeline
pipeline = Pipeline(
    name="MLWorkflowPipeline",
    parameters=[
        processing_instance_type,
        training_instance_type,
        training_instance_count,
        input_data,
        model_approval_status,
        accuracy_threshold
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session
)

# 創建或更新 Pipeline
pipeline.upsert(role_arn=role)

# 執行 Pipeline
execution = pipeline.start()

# 查看執行狀態
execution.describe()

# 等待執行完成
execution.wait()

# 查看步驟執行結果
execution.list_steps()

監控並執行 pipeline

# 查看 Pipeline 執行歷史
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "MLWorkflowPipeline"
pipeline = Pipeline(name=pipeline_name)

# 列出所有執行
executions = pipeline.list_executions()

for execution_summary in executions['PipelineExecutionSummaries'][:5]:
    print(f"執行 ARN: {execution_summary['PipelineExecutionArn']}")
    print(f"狀態: {execution_summary['PipelineExecutionStatus']}")
    print(f"開始時間: {execution_summary['StartTime']}")
    print("---")

一些最佳實踐

  • 參數化設計
# 使用參數讓 Pipeline 更靈活
model_prefix = ParameterString(
    name="ModelPrefix",
    default_value="my-model"
)

# 在步驟中使用參數
output_path = f"s3://{bucket}/{model_prefix}/output"
  • 錯誤處理
# 為處理步驟添加重試機制
from sagemaker.workflow.retry import RetryPolicy, StepRetryPolicy

retry_policy = StepRetryPolicy(
    exception_types=["Step.SERVICE_FAULT", "Step.THROTTLING"],
    max_attempts=3,
    interval_seconds=10,
    backoff_rate=2.0
)

step_process_with_retry = ProcessingStep(
    name="DataProcessingWithRetry",
    processor=sklearn_processor,
    # ... 其他參數
    retry_policies=[retry_policy]
)
  • 快取機制
from sagemaker.workflow.steps import CacheConfig

# 啟用步驟快取(24小時)
cache_config = CacheConfig(
    enable_caching=True,
    expire_after="PT24H"  # ISO 8601 格式
)

step_train_cached = TrainingStep(
    name="ModelTrainingCached",
    estimator=xgb_estimator,
    inputs=training_inputs,
    cache_config=cache_config
)
  • 平行執行
# 多個獨立步驟可以平行執行
step_process_1 = ProcessingStep(name="Process1", ...)
step_process_2 = ProcessingStep(name="Process2", ...)
step_merge = ProcessingStep(
    name="MergeResults",
    depends_on=[step_process_1, step_process_2]  # 等待兩個步驟完成
    # ... 其他參數
)

使用 SageMaker Studio 視覺化 Pipeline

  • 視覺化查看 Pipeline 結構
  • 監控每個步驟的執行狀態
  • 查看步驟的輸入輸出
  • 檢查錯誤日誌
  • 比較不同執行的結果

與 CI/CD 整合

使用 AWS CodePipeline 觸發 SageMaker Pipeline

import boto3

codepipeline = boto3.client('codepipeline')
sagemaker_client = boto3.client('sagemaker')

def lambda_handler(event, context):
    # 從 CodePipeline 獲取參數
    job_id = event['CodePipeline.job']['id']
    
    try:
        # 啟動 SageMaker Pipeline
        response = sagemaker_client.start_pipeline_execution(
            PipelineName='MLWorkflowPipeline',
            PipelineExecutionDisplayName='CodePipeline-Triggered',
            PipelineParameters=[
                {
                    'Name': 'InputData',
                    'Value': 's3://your-bucket/latest-data.csv'
                }
            ]
        )
        
        # 通知 CodePipeline 成功
        codepipeline.put_job_success_result(jobId=job_id)
        
    except Exception as e:
        # 通知 CodePipeline 失敗
        codepipeline.put_job_failure_result(
            jobId=job_id,
            failureDetails={'message': str(e)}
        )

完成!


上一篇
向量資料庫整合 (OpenSearch/Pinecone)
下一篇
A/B Testing與模型實驗管理
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰20
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言