iT邦幫忙

2021 iThome 鐵人賽

DAY 25
0

Azure machine learning: Pipeline for data- 建立工作流程來收集資料

Pipeline,流水線或管線,顧名思義,就是讓程式碼,按照使用者安排的順序一一執行,Azure machine learning 也有提供這樣的服務,所以我們可以把前面幾篇文章所做的事情,全部交給流水線執行。第一件事情,要先解決資料更新的問題,取得最新資料,並且將其存進 datastore,然後註冊這份資料。在此,我們繼續以匯率為例,來示範如何用pipeline更新資料。

執行pipeline一樣至少需要兩份以上的 py 檔,將需要執行的任務,分別寫成不同的 py 檔,然後再整合在一起交由pipeline執行。

  • get_currency.py:之前在上傳資料時介紹過,這次要以維護資料的角度改寫。
  • run_pipeline_data.py:在本地端執行,便可把get_currency.py上傳到workspace執行。

安裝Python套件

請在本機端安裝

pip3.7 install azureml-pipeline

示範程式

get_currency.py

import argparse
from datetime import datetime
import os
import pickle
import pandas as pd
import investpy
from sklearn.preprocessing import MinMaxScaler


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--history", type=bool, default=False)
    parser.add_argument("--target_folder", type=str)
    parser.add_argument("--input_folder", type=str)
    args = parser.parse_args()
    return args

# 以 history 這個變數作為區隔的依據,history 為 True,則是在本地端執行;反之,則是利用`pipeline`執行
def main():
    args = parse_args()
    if args.history:
        if not os.path.isdir("currency"):
            os.system("mkdir currency")

        usd_twd = investpy.get_currency_cross_historical_data(
            "USD/TWD",
            from_date="01/01/1900",
            to_date=datetime.now().strftime("%d/%m/%Y"),
        )
        usd_twd.reset_index(inplace=True)
        usd_twd.to_csv("currency/usd_twd.csv", index=False)
        currency_data = usd_twd.Close.values.reshape(-1, 1)
        scaler = MinMaxScaler(feature_range=(0, 1))
        scaler.fit(currency_data)
        with open("currency/scaler.pickle", "wb") as f_h:
            pickle.dump(scaler, f_h)
        f_h.close()
        currency_data = usd_twd[
            (usd_twd.Date >= "2010-01-01") & (usd_twd.Date < "2021-01-01")
        ]
        currency_data.to_csv("currency/training_data.csv")
    # 以上都跟之前一模一樣
    
    else:
        # 目標是從 input_path 取得舊的資料,與最新資料結合,將更新的結果存進 path。
        path = os.path.join(args.target_folder, "usd_twd.csv")
        input_path = os.path.join(args.input_folder, "usd_twd.csv")
        history = pd.read_csv(input_path)

        recent = investpy.get_currency_cross_recent_data("USD/TWD")
        recent.reset_index(inplace=True)
        history = history.append(recent, ignore_index=True)
        history.drop_duplicates(subset="Date", keep="last", inplace=True)
        history.to_csv(path, index=False)
        # 將最近 2400 天的資料作為訓練用的資料
        history = history.tail(2400)
        history.to_csv(
            os.path.join(args.target_folder, "training_data.csv"), index=False
        )
        
        # 接著就必須要註冊資料,資料才會真的更新。
        run = Run.get_context()
        work_space = run.experiment.workspace
        datastore = work_space.get_default_datastore()
        dataset = Dataset.File.from_files(path=(datastore, 'currency'))
        dataset.register(work_space, name='currency')

if __name__ == "__main__":
    main()

要注意的是,雖然在這邊要做的事情是,讀取舊資料,合併新資料後,更新資料。但是,輸入的資料夾路徑和輸出的資料夾路徑不能為同一個路徑,否則就會得到錯誤訊息:Graph shouldn't have cycles。可以利用OutputFileDatasetConfig這個class解決這個問題,讓資料先暫存,之後再推向 datastore。

run_pipeline_data.py

import os
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import Pipeline


def main():
    # 起手式,一定要先取得 workspace 權限
    interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
    work_space = Workspace.from_config(auth=interactive_auth)
    datastore = work_space.get_default_datastore()
    # 設定 input folder 
    input_folder = (
        Dataset.File.from_files(path=(datastore, "currency"))
        .as_named_input("input_folder")
        .as_mount()
    )
    # 設定 output 路徑
    dataset = (
        OutputFileDatasetConfig(name="usd_twd", destination=(datastore, "currency"))
        .as_upload(overwrite=True)
        .register_on_complete(name="currency")
    )
    
    # 選擇之前註冊的環境
    aml_run_config = RunConfiguration()
    environment = work_space.environments["train_lstm"]
    aml_run_config.environment = environment
    
    # 設定管線中的步驟,把會用到的 py 檔、輸入和輸出的資料夾帶入
    get_currency = PythonScriptStep(
        name="get_currency",
        script_name="get_currency.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        arguments=[
            "--target_folder",
            dataset,
            "--input",
            input_folder,
        ],
        allow_reuse=True,
    )
    # pipeline 的本質還是實驗,所以需要建立實驗,再把 pipeline帶入
    experiment = Experiment(work_space, "get_currency")

    pipeline = Pipeline(workspace=work_space, steps=[get_currency])
    run = experiment.submit(pipeline)
    run.wait_for_completion(show_output=True)
    # 執行終了,發布 pipeline,以便可以重複使用 
    run.publish_pipeline(
        name="get_currency_pipeline",
        description="Get currency with pipeline",
        version="1.0",
    )


if __name__ == "__main__":
    main()

然後,執行python3.7 run_pipeline_data.py。執行的結果可以,直接到workspace的實驗頁面查詢。點選剛剛執行的管線,可以看到執行的過程被畫成流程圖。

點進步驟,再點選執行完的步驟,則會看到該實驗的各種細節,也方便後續除錯。


這篇算是讓我搞懂pipeline,下一篇要再加上模型訓練和服務部署,完成一條龍服務。


上一篇
Day 24 Azure machine learning: deploy service and inference- 模型訓練完就是要拿來用啊
下一篇
Day 26 Azure machine learning: Pipeline for model and service- 把工作通通串起來
系列文
我不太懂 AI,可是我會一點 Python 和 Azure30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言