iT邦幫忙

2021 iThome 鐵人賽

DAY 26
0

Azure machine learning: Pipeline for model and service- 把工作通通串起來

接續上一篇,上一篇搞懂了pipeline在做什麼之後,這篇就顯得格外輕鬆,把之前模型訓練和服務部署的流程,考慮後續維護的情況,全部丟給pipeline執行就對了。依照前面的描述,這邊會為了模型訓練和服務部署,各準備一個 py 檔,最後再以另一個 py 檔執行pipeline

示範程式

train_lstm.py之前介紹過,這邊為了使用pipeline執行,也考慮後續重複使用的情況,這邊再稍微改造一下。下面針對新增的內容解釋:

train_lstm.py

import argparse
import os
import pickle
import numpy as np
from azureml.core.run import Run
from azureml.core.model import Model
import pandas as pd
from keras.models import Sequential, load_model
from keras.layers import Dense, LSTM, Dropout
from keras.preprocessing.sequence import TimeseriesGenerator
from keras.callbacks import TensorBoard, EarlyStopping


def data_generator(data, data_len=240):
    """
    generate data for training and validation
    """
    generator = TimeseriesGenerator(
        data=data, targets=range(data.shape[0]), length=data_len, batch_size=1, stride=1
    )
    x_all = []
    for i in generator:
        x_all.append(i[0][0])
    x_all = np.array(x_all)
    y_all = data[range(data_len, len(x_all) + data_len)]
    rate = 0.4
    x_train = x_all[: int(len(x_all) * (1 - rate))]
    y_train = y_all[: int(y_all.shape[0] * (1 - rate))]
    x_val = x_all[int(len(x_all) * (1 - rate)) :]
    y_val = y_all[int(y_all.shape[0] * (1 - rate)) :]
    return x_train, y_train, x_val, y_val


def parse_args():
    """
    Parse arguments
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--target_folder", type=str, help="Path to the training data")
    parser.add_argument(
        "--experiment",
        type=bool,
        default=False,
        help="Just run an experiment, there is no pipeline",
    )
    parser.add_argument(
        "--log_folder", type=str, help="Path to the log", default="./logs"
    )
    args = parser.parse_args()
    return args

# 考慮後續訓練時,先讀取效果最好的模型,以此為基礎繼續訓練
def load_best_model(work_space, model_name, x_val, y_val):
    """
    load the best model from registered models
    """
    model_obj = Model(work_space, model_name)
    # 取得模型清單,擷取最近五個版本。除了版本 version 以外,屬性 properties 也是可以作為選擇模型的依據
    model_list = model_obj.list(work_space, name=model_name)
    version = [i.version for i in model_list]
    version.sort(reverse=True)
    # 選擇最近訓練的五個模型,並且以最近一段時間的資料評估模型的效果
    version = version[:5]
    val_loss = []
    for i in version:
        print(i)
        model_obj = Model(work_space, model_name, version=i)
        model_path = model_obj.download(exist_ok=True)
        model = load_model(model_path)
        val_loss.append(model.evaluate(x_val, y_val))
    # 選擇 loss 最小的模型
    model_obj = Model(
        work_space, model_name, version=version[val_loss.index(min(val_loss))]
    )
    model_path = model_obj.download(exist_ok=True)
    model = load_model(model_path)
    return model, min(val_loss), version[val_loss.index(min(val_loss))]



def main():
    """
    Training of LeNet with keras
    """
    args = parse_args()
    # 在`workspace`執行時,取得當下資訊
    run = Run.get_context()
    usd_twd = pd.read_csv(os.path.join(args.target_folder, "training_data.csv"))
    data = usd_twd.Close.values.reshape(-1, 1)
    with open(os.path.join(args.target_folder, "scaler.pickle"), "rb") as f_h:
        scaler = pickle.load(f_h)
    f_h.close()
    data = scaler.transform(data)
    data_len = 240
    x_train, y_train, x_val, y_val = data_generator(data, data_len)
    # 單純執行實驗時,需要先定義模型架構,並且使用tensorboard
    loss_threshold = 1
    version = 0
    if args.experiment:
        model = Sequential()
        model.add(LSTM(16, input_shape=(data_len, 1)))
        model.add(Dropout(0.1))
        model.add(Dense(1))
        model.compile(loss="mse", optimizer="adam")
        # Tensorboard
        callback = TensorBoard(
            log_dir=args.log_folder,
            histogram_freq=0,
            write_graph=True,
            write_images=True,
            embeddings_freq=0,
            embeddings_layer_names=None,
            embeddings_metadata=None,
        )
    # 執行 pipeline 時,先讀取效果最好的模型
    else:
        # 取得`workspace`權限
        work_space = run.experiment.workspace
        model, loss_threshold, version = \
        load_best_model(work_space,
                        model_name="currency",
                        x_val=x_val,
                        y_val=y_val)
        origin_model = model
        print("Load Model")
        # 如果 val_loss 進步太慢,就結束訓練
        callback = EarlyStopping(monitor="val_loss",
                                 mode="min",
                                 min_delta=1e-8,
                                 patience=50)
        
    # train the network
    history_callback = model.fit(
        x_train,
        y_train,
        epochs=1000,
        batch_size=240,
        verbose=1,
        validation_data=[x_val, y_val],
        callbacks=[callback],
    )
    print("Finished Training")
    # 以 validation data 確認模型的效果,保留效果好的模型
    metrics = history_callback.history
    # 若剛訓練好的模型比之前模型效果好,將訓練的細節記錄下來
    if metrics["val_loss"][-1] <= loss_threshold:
        run.log_list("train_loss", metrics["loss"][:10])
        run.log_list("val_loss", metrics["val_loss"][:10])
        run.log_list("start", [usd_twd.Date.values[0]])
        run.log_list("end", [usd_twd.Date.values[-1]])
        run.log_list("epoch", [len(history_callback.epoch)])
        run.log_list("last_version", [version])
        model.save("outputs/keras_lstm.h5")
        properties = {
            "train_loss": metrics["loss"][-1],
            "val_loss": metrics["val_loss"][-1],
            "data": "USD/TWD from {0} to {1}".format(
                usd_twd.Date.values[0], usd_twd.Date.values[-1]
            ),
            "epoch": len(history_callback.epoch),
            "last_version": version,
        }
    # 反之,則記錄 val_loss,以及說明此模型是繼承哪一個版本的模型
    else:
        run.log_list("val_loss", [loss_threshold])
        run.log_list("last_version", [version])
        origin_model.save("outputs/keras_lstm.h5")
        properties = {"val_loss": loss_threshold, "last_version": version}
    if args.experiment:
        with open("outputs/scaler.pickle", "wb") as f_h:
            pickle.dump(scaler, f_h)
        f_h.close()
    else:
    # 為了讓整個流程自動化,所以訓練完,直接在`workspace`註冊模型
        model = Model.register(
            workspace=work_space,
            model_name="currency",
            tags={"model": "LSTM"},
            model_path="outputs/keras_lstm.h5",
            model_framework="keras",
            model_framework_version="2.2.4",
            properties=properties,
        )
        print("Registered Model")


if __name__ == "__main__":
    main()

在訓練之後,選擇當下最佳的模型註冊,這樣在部署服務時,就可以直接使用最新版的模型部署服務。
deploy_currency_prediction的內容不變,只是在執行pipeline時,workspace的權限也是必須靠Run取得。

deploy_currency_prediction


import os
import numpy as np
from azureml.core import Model, Workspace
from azureml.core import Run
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice
from azureml.core.authentication import InteractiveLoginAuthentication


def main():
    """
    Deploy model to your service
    """
    run = Run.get_context()
    try:
        work_space = run.experiment.workspace
    except AttributeError:
        interactive_auth = InteractiveLoginAuthentication(
            tenant_id=os.getenv("TENANT_ID")
        )
        work_space = Workspace.from_config(auth=interactive_auth)
    environment = work_space.environments["train_lstm"]
    model = Model(work_space, "currency")
    service_name = "currency-service"
    inference_config = InferenceConfig(
        entry_script="predict_currency.py", environment=environment
    )
    aci_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)
    scaler = Model(work_space, name="scaler", version=1)
    service = Model.deploy(
        workspace=work_space,
        name=service_name,
        models=[model, scaler],
        inference_config=inference_config,
        deployment_config=aci_config,
        overwrite=True,
    )
    service.wait_for_deployment(show_output=True)
    print(service.get_logs())
    print(service.scoring_uri)


if __name__ == "__main__":
    main()


跟之前上一篇的run_pipeline_data.py比較起來,這邊會再新增兩個流程到pipeline之中,分別是為了模型訓練和服務部署。

run_pipeline.py

import os
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import Pipeline


def main():
    interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
    work_space = Workspace.from_config(auth=interactive_auth)
    datastore = work_space.get_default_datastore()
    input_folder = (
        Dataset.File.from_files(path=(datastore, "currency"))
        .as_named_input("input_folder")
        .as_mount()
    )
    dataset = OutputFileDatasetConfig(
        name="usd_twd", destination=(datastore, "currency")
    )
    aml_run_config = RunConfiguration()
    environment = work_space.environments["train_lstm"]
    aml_run_config.environment = environment
    # 更新資料的步驟
    get_currency = PythonScriptStep(
        source_directory=".",
        name="get_currency",
        script_name="get_currency.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        arguments=[
            "--target_folder",
            dataset.as_upload(overwrite=True).register_on_complete(name="currency"),
            "--input",
            input_folder,
        ],
        allow_reuse=True,
    )
    # 訓練模型的步驟
    training = PythonScriptStep(
        source_directory=".",
        name="train_lstm",
        script_name="train_lstm.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        arguments=["--target_folder", dataset.as_input()],
        allow_reuse=True,
    )
    # 部署服務的步驟
    deploy = PythonScriptStep(
        source_directory=".",
        name="deploy_currency_prediction",
        script_name="deploy_currency_prediction.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        allow_reuse=True,
    )
    experiment = Experiment(work_space, "pipeline_data_train_deploy")

    pipeline = Pipeline(workspace=work_space, steps=[get_currency, training, deploy])
    run = experiment.submit(pipeline)
    run.wait_for_completion(show_output=True)
    # Pipeline 必須被發布,才能在後續進行排程(schedule)
    run.publish_pipeline(
        name="pipeline_data_train_deploy",
        description="data-->train-->deploy",
        version="1.0",
    )


if __name__ == "__main__":
    main()


執行python3.7 run_pipeline.py完成後,一樣可以從workspace觀察結果和pipeline的流程圖。


Azure machine learning 的介紹即將進入尾聲,既然都有了pipeline可以從把集資料、訓練模型、部署服務,全部都串成起來,依照順序執行,最後一件事情就是將pipeline排程。下一篇將介紹如何在workspace排程。


上一篇
Day 25 Azure machine learning: Pipeline for data- 建立工作流程來收集資料
下一篇
Day 27 Azure machine learning: Schedule- Azure 為你定期執行任務
系列文
我不太懂 AI,可是我會一點 Python 和 Azure30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言