iT邦幫忙

2021 iThome 鐵人賽

DAY 26
0
AI & Data

一起學習 Azure Machine Learning 系列 第 26

[DAY26] 用 Azure Machine Learning SDK 來做 Pipeline

DAY26 用 Azure Machine Learning SDK 來做 Pipeline

在 Azure Machine Learning 中,Pipelines 是機器學習工作的工作流程,流程中的每個工作都是一個步驟(step)。這裡的 Pipeline,和 Scikit-Learn 的 Pipeline 是不一樣的。在 Scikit-Learn 中是把資料轉換處理,而在 AML 中是實驗執行的步驟,當然可以把 Scikit-Learn 的 Pipeline 視為一個步驟,包在 AML Pipeline 中。

AML Pipeline 管線中常見的步驟種類包括:

  • PythonScriptStep:執行指定的 Python 程式碼。
  • DataTransferStep:使用 Azure Data Factory 在資料存放區之間複製資料。
  • DatabricksStep:在 Databricks 叢集上執行程式碼。
  • AdlaStep:在 Azure Data Lake Analytics 中執行 U-SQL 作業。
  • ParallelRunStep:在多個計算節點上以分散式工作的形式執行 Python 程式碼。

在一個 Pipeline 裡,這些 step 都可以被用上。舉例來說,我這個 Pipeline 可以用先跑一個 DatabricksStep,再跑3個 PythonScriptStep。

建立 Pipeline

  1. 我們實務上最常用到的是 PythonScriptStep。它就是把一段要執行的 script,包成一個 step。使用方式如下:
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment, Workspace

ws = Workspace.from_config() 

# 第一步來資料前處理
step1 = PythonScriptStep(name = 'preprocess data',
                         source_directory = 'pipeline_script_folder',
                         script_name = 'preprocess_data.py',
                         compute_target = 'ironmancpu')

# 第二步訓練模型
step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'pipeline_script_folder',
                         script_name = 'train_model.py',
                         compute_target = 'ironmancpu')

# 接著建立一個 pipeline,把步驟一二放進去
train_pipeline = Pipeline(workspace = ws, steps = [step1,step2])

# 接著提交實驗
experiment = Experiment(workspace = ws, name = 'pipeline-sdk')
pipeline_run = experiment.submit(train_pipeline)

# Pipeline 也可以發佈出去
published_pipeline = train_pipeline.publish(name='Pipeline_sdk',
                                          description='sdk build pipeline',
                                          version='1.0')
  1. AML 會自動幫你快取已經執行過的 Step,重複使用時就不會執行,用以提高效率。但是有時候可能參數變更、或是 script 改變等等的,不執行快取住的 Pipeline 就不好了。有下面兩種作法,程式碼參考如下:
# 法一:可以在 PythonScriptStep 裡設定 allow_reuse = False
step1 = PythonScriptStep(name = 'preprocess data',
                         source_directory = '.',
                         script_name = 'preprocess_data.py',
                         compute_target = 'aml-cluster',
                         allow_reuse = False)

# 法二:可以在 submit 實驗時,用 regenerate_outputs=True 強制執行所有的步驟。
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True))

  1. 在步驟和步驟之間,常常會需要傳遞資料。例如說第一個步驟的資料前處理好後,把資料給第二個步驟訓練模型。這時候我們就要用到 OutputFileDatasetConfig。它可以將資料暫時儲存起來,傳給下一個步驟。

OutputFileDatasetConfig 在使用上有下列重點,並有兩段程式碼供參考:

  • Python script 要參數化,就像我們在講 ScriptRunConfig 那天的做法一樣。
  • 將 OutputFileDatasetConfig 做為參數來輸出或輸入。

Python script 要參數化的參考程式碼如下:

from azureml.core import Run
import argparse
import os

run = Run.get_context()

parser = argparse.ArgumentParser()
parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id')

# 參數化輸出的資料夾
parser.add_argument('--out_folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()

prepped_df = raw_df[['col1', 'col2', 'col3']]

# 把處理好的資料存在要輸出的資料夾
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepared_data.csv')
prepped_df.to_csv(output_path)

OutputFileDatasetConfig 參考程式碼如下:

from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core import Experiment, Workspace

ws = Workspace.from_config() 

raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

# 建立 OutputFileDatasetConfig,以傳遞資料
prepared_data = OutputFileDatasetConfig('prepared_data')

step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'pipeline_script_folder',
                         script_name = 'preprocess_data.py',
                         compute_target = 'aml-cluster',
                         # 在這裡要輸出前處理過的 data
                         arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'),
                                      '--out_folder', prepped_data])

step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'pipeline_script_folder',
                         script_name = 'train_model.py',
                         compute_target = 'aml-cluster',
                        # 在這裡要輸出前處理過的 data
                         arguments=['--training-data', prepped_data.as_input()])
  1. Pipeline 一樣也可以設計成丟參數進去給 PythonScriptStep 使用的型式,程式碼參考如下:
from azureml.pipeline.core.graph import PipelineParameter

length_param = PipelineParameter(name='data_length', default_value=100)

step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'pipeline_script_folder',
                         script_name = 'preprocess_data.py',
                         compute_target = 'aml-cluster',
                         # 在這裡放 pipeline 要丟進來的參數
                         arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'),
                                      '--length', data_length,
                                      '--out_folder', prepped_data])

  1. Pipeline 也可以建立排程定期間隔,時間一定就自動觸發。程式碼參考如下:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# frequency 可以是 "Minute"、"Hour"、"Day"、"Week" 或 "Month"。interval 是重跑排程之前,要等候的時間單位數。這裡是一天。
daily = ScheduleRecurrence(frequency='Day', interval=1)
schedule = Schedule.create( ws, name='Everyday',
                                description='天天跑',
                                pipeline_id='your pipeline id',
                                experiment_name='Training_Pipeline',
                                recurrence=daily)
  1. Pipeline 也可以被資料改變時觸發,程式碼參考如下:
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

datastore = Datastore(workspace=ws, name='titanic')
pipeline_schedule = Schedule.create(ws, name='Reactive Training',
                                    description='資料改變時就跑',
                                    pipeline_id='your pipeline id',
                                    experiment_name='Training_Pipeline',
                                    datastore=datastore,
                                    path_on_datastore='data/training')

今天就是我們 Pipeline 的內容啦!不知不覺又破了六千字了真的有夠多。明天我們來講怎麼用 AML SDK 做 AutoML。


上一篇
[DAY25] 用 Azure Machine Learning SDK 註冊模型與部署
下一篇
[DAY27] 用 Azure Machine Learning SDK 來做 AutoML
系列文
一起學習 Azure Machine Learning 30

尚未有邦友留言

立即登入留言