iT邦幫忙

2021 iThome 鐵人賽

DAY 23
0
AI & Data

我不太懂 AI,可是我會一點 Python 和 Azure系列 第 23

Day 23 Azure machine learning: training experiment and register model- 以 LSTM 模型為例

Azure machine learning: training experiment and register model- 以 LSTM 模型為例

這篇終於要訓練模型了,要做的事情就是,在workspace使用 Long-Short Term Memory (LSTM) 訓練出可以預測匯率趨勢的模型,以過去240天的資料預測隔天的收盤價。

依照之前的介紹,在workspace執行實驗訓練模型時,也需要兩個Python script:

  1. 一個要在workspace利用計算叢集執行的程式碼:train_lstm.py,其主要任務為訓練模型,應該考慮的步驟如下:
    • 取得資料
    • 整理資料
    • 建構模型
    • 訓練模型
    • 輸出模型與訓練結果
  2. 另一個 script run_experiment_training.py 在本機執行,把train_lstm.py上傳,並且通知workspace開始執行train_lstm.py。除了呼叫train_lstm.py來訓練模型之外,最後也會註冊訓練完的模型。

另外,這邊額外介紹一個功能,Azure machine learning 也可以將訓練過程中的各項觀察數值(例如:loss),利用tensorboard觀察。

安裝Python套件

請在本地端安裝
pip3.7 install azureml-tensorboard

workspace的環境設定

請參考Azure machine learning: set environment- 準備一個大家都能用的環境,如果環境沒有設定好,一定會遇到一連串的錯誤訊息,在此提醒各位。

示範程式

train_lstm.py

import argparse
import os
import pickle
import numpy as np
from azureml.core.run import Run
from azureml.core.model import Model
import pandas as pd
from keras.models import Sequential, load_model
from keras.layers import Dense, LSTM, Dropout
from keras.preprocessing.sequence import TimeseriesGenerator
from keras.callbacks import TensorBoard


# 產生 training data 和 validation data
def data_generator(data, data_len=240):
    """
    generate data for training and validation
    """
    generator = TimeseriesGenerator(
        data=data, targets=range(data.shape[0]), length=data_len, batch_size=1, stride=1
    )
    x_all = []
    for i in generator:
        x_all.append(i[0][0])
    x_all = np.array(x_all)
    y_all = data[range(data_len, len(x_all) + data_len)]
    # 資料的前面六成作為訓練之用,後面時間較新的四成資料作為驗證之用
    rate = 0.4
    x_train = x_all[: int(len(x_all) * (1 - rate))]
    y_train = y_all[: int(y_all.shape[0] * (1 - rate))]
    x_val = x_all[int(len(x_all) * (1 - rate)) :]
    y_val = y_all[int(y_all.shape[0] * (1 - rate)) :]
    return x_train, y_train, x_val, y_val


def parse_args():
    """
    Parse arguments
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--target_folder", type=str, help="Path to the training data")
    parser.add_argument(
        "--experiment",
        type=bool,
        default=False,
        help="Just run an experiment, there is no pipeline",
    )
    # 此處的 log folder 是為了使用 tensorboard ,在 workspace 之中訓練時的相對路徑,儲存訓練過程中的觀察數值
    parser.add_argument(
        "--log_folder", type=str, help="Path to the log", default="./logs"
    )
    args = parser.parse_args()
    return args



def main():
    """
    Training of LeNet with keras
    """
    args = parse_args()
    run = Run.get_context()
    # 從 datastore 讀取資料,並且加以整理
    usd_twd = pd.read_csv(os.path.join(args.target_folder, "training_data.csv"))
    data = usd_twd.Close.values.reshape(-1, 1)
    with open(os.path.join(args.target_folder, "scaler.pickle"), "rb") as f_h:
        scaler = pickle.load(f_h)
    f_h.close()
    data = scaler.transform(data)
    data_len = 240
    x_train, y_train, x_val, y_val = data_generator(data, data_len)
    # 這裡留一個伏筆,之後還需要考慮到 pipeline 的情況,在使用 pipeline 的時候部分步驟會省略
    if args.experiment:
    # 模型很簡單,LSTM 後,就接 dropout,最後再加一層 full connected network 就直接輸出了
        model = Sequential()
        model.add(LSTM(16, input_shape=(data_len, 1)))
        model.add(Dropout(0.1))
        model.add(Dense(1))
        model.compile(loss="mse", optimizer="adam")
        # Tensorboard
        callback = TensorBoard(
            log_dir=args.log_folder,
            histogram_freq=0,
            write_graph=True,
            write_images=True,
            embeddings_freq=0,
            embeddings_layer_names=None,
            embeddings_metadata=None,
        )
    # 訓練模型
    history_callback = model.fit(
        x_train,
        y_train,
        epochs=1000,
        batch_size=240,
        verbose=1,
        validation_data=[x_val, y_val],
        callbacks=[callback],
    )

    # 訓練過程中產生的數值,都可以輸出到 workspace ,可以在 workspace 的網頁上看到
    # 可以輸出的資料有上限,資料長度上限是 250,所以不要把所有 loss 都塞進去
    # 另外該注意的是,所有數值必須以 list 的格式輸出
    metrics = history_callback.history
    run.log_list("train_loss", metrics["loss"][:10])
    run.log_list("val_loss", metrics["val_loss"][:10])
    run.log_list("start", [usd_twd.Date.values[0]])
    run.log_list("end", [usd_twd.Date.values[-1]])
    run.log_list("epoch", [len(history_callback.epoch)])

    print("Finished Training")
    # 這邊要非常注意!!!!只能將模型存在 outputs 這個資料夾之下,後續才能註冊模型
    model.save("outputs/keras_lstm.h5")
    print("Saved Model")
    # 順便將 scaler 存檔,以便註冊
    if args.experiment:
        with open("outputs/scaler.pickle", "wb") as f_h:
            pickle.dump(scaler, f_h)
        f_h.close()
    


if __name__ == "__main__":
    main()

run_experiment_training.py


import os
import argparse
from azureml.core import ScriptRunConfig, Dataset, Workspace, Experiment
from azureml.tensorboard import Tensorboard
from azureml.core.authentication import InteractiveLoginAuthentication


def parse_args():
    """
    Parse arguments
    """
    parser = argparse.ArgumentParser()
    # 提供 py 檔
    parser.add_argument("-f", "--file", help="python script", type=str)
    # target_folder 則是需要輸入當初上傳到 Azure 資料夾路徑
    parser.add_argument(
        "-t", "--target_folder", help="file folder in datastore", type=str
    )
    args = parser.parse_args()
    return args


def main():
    """
    Run the experiment for training
    """
    args = parse_args()
    interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
    work_space = Workspace.from_config(auth=interactive_auth)

    # 從 datastore 取得資料
    datastore = work_space.get_default_datastore()
    dataset = Dataset.File.from_files(path=(datastore, args.target_folder))

    # 設定實驗,名稱可以隨意,這邊是直接以輸入的 py 檔為名
    experiment = Experiment(workspace=work_space, name=args.file.replace(".py", ""))
    # 設定要在 workspace 執行的 py 檔的檔名與路徑,選定運算集群,並且將 py 檔會用到的參數放在 arguments
    config = ScriptRunConfig(
        source_directory=".",
        script=args.file,
        compute_target="cpu-cluster",
        arguments=[
            "--target_folder",
            dataset.as_named_input("input").as_mount(), # 輸入資料集在 datastore 的路徑
            "--experiment",
            True,
            "--log_folder",
            "./logs",
        ],
    )

    # 選擇已經註冊的環境,之前的環境也是以 py 檔的檔名命名
    environment = work_space.environments[args.file.replace(".py", "")]
    config.run_config.environment = environment

    # 開始進行實驗,訓練模型
    run = experiment.submit(config)
    # 取得 URL,透過瀏覽器觀察實驗過程
    aml_url = run.get_portal_url()
    print(
        "Submitted to an Azure Machine Learning compute cluster. Click on the link below"
    )
    print("")
    print(aml_url)


    # 開啟 tensorboard
    tboard = Tensorboard([run])
    # 自動開啟瀏覽器
    tboard.start(start_browser=True)
    run.wait_for_completion(show_output=True)
    # 這邊設定一個緩衝,實驗執行完後,在 terminal 按下 enter ,才會結束 tensorboard
    print("Press enter to stop")
    input()
    tboard.stop()

    # 最後註冊模型,所有模型都必須放到 outputs/ 的路徑之下
    # properties 可以用來記錄跟此模型有關的所有數值
    metrics = run.get_metrics()
    run.register_model(
        model_name=args.target_folder,
        tags={"model": "LSTM"},
        model_path="outputs/keras_lstm.h5",
        model_framework="keras",
        model_framework_version="2.2.4",
        properties={
            "train_loss": metrics["train_loss"][-1],
            "val_loss": metrics["val_loss"][-1],
            "data": "USD/TWD from {0} to {1}".format(metrics["start"], metrics["end"]),
            "epoch": metrics["epoch"],
        },
    )

    run.register_model(
        model_name="scaler",
        tags={"data": "USD/TWD from 1983-10-04", "model": "MinMaxScaler"},
        model_path="outputs/scaler.pickle",
        model_framework="sklearn",
    )


if __name__ == "__main__":
    main()

執行實驗

python3.7 run_experiment_training.py --file train_lstm.py --target_folder currency

執行上述指令後,整個流程大約需要 15 分鐘,一開始可以考慮把 epochs 設定成 100 或更少,確認整個流程沒有問題,再把 epochs 放大。實驗執行沒多久,就會從瀏覽器開啟 tensorboard ,但不會馬上就有圖表,因為還需要設定環境,需要等一下。Tensorboard 出現的圖表,即是訓練過程中的loss隨著訓練的變化。另外,也可以看到訓練模型的結構。

workspace的網頁也可以看到,實驗執行完之後的各種數據視覺化。

這邊有一些實驗的限制,可供參考

終於完成一次模型訓練了,這一步因為kerastensorflow的版本相容性問題,讓我踩了幾次雷(謎之音:因為太久沒更新了,不知道外面的世界長怎樣......)。在實驗過程中,會出現的問題,多半都是跟上傳上去的程式碼有關,最常出現的錯誤,大概有三種:套件、路徑和寫錯字。出錯之後,到workspace上觀察一下紀錄(70_drive_log.txt),多半都能看出端倪。

workspace上也可以看到註冊過的模型。點選進去每個模型,也能下載下來使用。

提供以下程式碼,視覺化預測結果。比較一下 2020 ~ 2021 年的真實匯率與預測結果。

from plotly.offline import iplot, plot, init_notebook_mode
import plotly.graph_objects as go
init_notebook_mode(connected=True)

USD_TWD = investpy.get_currency_cross_historical_data('USD/TWD', from_date='01/01/1900', to_date='01/09/2021')
USD_TWD.reset_index(inplace=True)
data = USD_TWD.Close.values.reshape(-1, 1)
data = scaler.transform(data)
data_len = 240
generator = TimeseriesGenerator(data=data, targets=range(data.shape[0]), length=data_len, batch_size=1, stride=1)
X = []
for i in generator:
    X.append(i[0][0])
X = np.array(X)
Y = data[range(data_len, len(X) + data_len)]
prediction = model.predict(X)
prediction = scaler.inverse_transform(prediction)
Y = scaler.inverse_transform(Y)
date = USD_TWD.Date.values[range(data_len, len(X) + data_len)]
date = [str(i)[:10] for i in date]
result = pd.DataFrame(dict(date=date, real=Y.flatten(), prediction=prediction.flatten()))
result = result[result.date >= '2020']

fig = [
    go.Scatter(x=result.date, y=result.prediction, name='Real'),
    go.Scatter(x=result.date, y=result.real, name='Prediction'),]
iplot(fig, filename='test')

看起來大致上趨勢相同,整體上亦步亦趨。


這一關算是卡關比較多次的關卡,過了這一關就比較舒服了。有了模型,就可以部署服務了,下一篇介紹怎麼部署服務。


上一篇
Day 22 Azure machine learning: set environment- 準備一個大家都能用的環境
下一篇
Day 24 Azure machine learning: deploy service and inference- 模型訓練完就是要拿來用啊
系列文
我不太懂 AI,可是我會一點 Python 和 Azure30

尚未有邦友留言

立即登入留言