Pipeline
,流水線或管線,顧名思義,就是讓程式碼,按照使用者安排的順序一一執行,Azure machine learning 也有提供這樣的服務,所以我們可以把前面幾篇文章所做的事情,全部交給流水線執行。第一件事情,要先解決資料更新的問題,取得最新資料,並且將其存進 datastore,然後註冊這份資料。在此,我們繼續以匯率為例,來示範如何用pipeline
更新資料。
執行pipeline
一樣至少需要兩份以上的 py 檔,將需要執行的任務,分別寫成不同的 py 檔,然後再整合在一起交由pipeline
執行。
get_currency.py
:之前在上傳資料時介紹過,這次要以維護資料的角度改寫。run_pipeline_data.py
:在本地端執行,便可把get_currency.py
上傳到workspace
執行。Python
套件請在本機端安裝
pip3.7 install azureml-pipeline
get_currency.py
import argparse
from datetime import datetime
import os
import pickle
import pandas as pd
import investpy
from sklearn.preprocessing import MinMaxScaler
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--history", type=bool, default=False)
parser.add_argument("--target_folder", type=str)
parser.add_argument("--input_folder", type=str)
args = parser.parse_args()
return args
# 以 history 這個變數作為區隔的依據,history 為 True,則是在本地端執行;反之,則是利用`pipeline`執行
def main():
args = parse_args()
if args.history:
if not os.path.isdir("currency"):
os.system("mkdir currency")
usd_twd = investpy.get_currency_cross_historical_data(
"USD/TWD",
from_date="01/01/1900",
to_date=datetime.now().strftime("%d/%m/%Y"),
)
usd_twd.reset_index(inplace=True)
usd_twd.to_csv("currency/usd_twd.csv", index=False)
currency_data = usd_twd.Close.values.reshape(-1, 1)
scaler = MinMaxScaler(feature_range=(0, 1))
scaler.fit(currency_data)
with open("currency/scaler.pickle", "wb") as f_h:
pickle.dump(scaler, f_h)
f_h.close()
currency_data = usd_twd[
(usd_twd.Date >= "2010-01-01") & (usd_twd.Date < "2021-01-01")
]
currency_data.to_csv("currency/training_data.csv")
# 以上都跟之前一模一樣
else:
# 目標是從 input_path 取得舊的資料,與最新資料結合,將更新的結果存進 path。
path = os.path.join(args.target_folder, "usd_twd.csv")
input_path = os.path.join(args.input_folder, "usd_twd.csv")
history = pd.read_csv(input_path)
recent = investpy.get_currency_cross_recent_data("USD/TWD")
recent.reset_index(inplace=True)
history = history.append(recent, ignore_index=True)
history.drop_duplicates(subset="Date", keep="last", inplace=True)
history.to_csv(path, index=False)
# 將最近 2400 天的資料作為訓練用的資料
history = history.tail(2400)
history.to_csv(
os.path.join(args.target_folder, "training_data.csv"), index=False
)
# 接著就必須要註冊資料,資料才會真的更新。
run = Run.get_context()
work_space = run.experiment.workspace
datastore = work_space.get_default_datastore()
dataset = Dataset.File.from_files(path=(datastore, 'currency'))
dataset.register(work_space, name='currency')
if __name__ == "__main__":
main()
要注意的是,雖然在這邊要做的事情是,讀取舊資料,合併新資料後,更新資料。但是,輸入的資料夾路徑和輸出的資料夾路徑不能為同一個路徑,否則就會得到錯誤訊息:Graph shouldn't have cycles。可以利用OutputFileDatasetConfig
這個class
解決這個問題,讓資料先暫存,之後再推向 datastore。
run_pipeline_data.py
import os
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import Pipeline
def main():
# 起手式,一定要先取得 workspace 權限
interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
work_space = Workspace.from_config(auth=interactive_auth)
datastore = work_space.get_default_datastore()
# 設定 input folder
input_folder = (
Dataset.File.from_files(path=(datastore, "currency"))
.as_named_input("input_folder")
.as_mount()
)
# 設定 output 路徑
dataset = (
OutputFileDatasetConfig(name="usd_twd", destination=(datastore, "currency"))
.as_upload(overwrite=True)
.register_on_complete(name="currency")
)
# 選擇之前註冊的環境
aml_run_config = RunConfiguration()
environment = work_space.environments["train_lstm"]
aml_run_config.environment = environment
# 設定管線中的步驟,把會用到的 py 檔、輸入和輸出的資料夾帶入
get_currency = PythonScriptStep(
name="get_currency",
script_name="get_currency.py",
compute_target="cpu-cluster",
runconfig=aml_run_config,
arguments=[
"--target_folder",
dataset,
"--input",
input_folder,
],
allow_reuse=True,
)
# pipeline 的本質還是實驗,所以需要建立實驗,再把 pipeline帶入
experiment = Experiment(work_space, "get_currency")
pipeline = Pipeline(workspace=work_space, steps=[get_currency])
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)
# 執行終了,發布 pipeline,以便可以重複使用
run.publish_pipeline(
name="get_currency_pipeline",
description="Get currency with pipeline",
version="1.0",
)
if __name__ == "__main__":
main()
然後,執行python3.7 run_pipeline_data.py
。執行的結果可以,直接到workspace
的實驗頁面查詢。點選剛剛執行的管線,可以看到執行的過程被畫成流程圖。
點進步驟
,再點選執行完的步驟,則會看到該實驗的各種細節,也方便後續除錯。
這篇算是讓我搞懂pipeline
,下一篇要再加上模型訓練和服務部署,完成一條龍服務。