接續上一篇,上一篇搞懂了pipeline
在做什麼之後,這篇就顯得格外輕鬆,把之前模型訓練和服務部署的流程,考慮後續維護的情況,全部丟給pipeline
執行就對了。依照前面的描述,這邊會為了模型訓練和服務部署,各準備一個 py 檔,最後再以另一個 py 檔執行pipeline
。
train_lstm.py
之前介紹過,這邊為了使用pipeline
執行,也考慮後續重複使用的情況,這邊再稍微改造一下。下面針對新增的內容解釋:
train_lstm.py
import argparse
import os
import pickle
import numpy as np
from azureml.core.run import Run
from azureml.core.model import Model
import pandas as pd
from keras.models import Sequential, load_model
from keras.layers import Dense, LSTM, Dropout
from keras.preprocessing.sequence import TimeseriesGenerator
from keras.callbacks import TensorBoard, EarlyStopping
def data_generator(data, data_len=240):
"""
generate data for training and validation
"""
generator = TimeseriesGenerator(
data=data, targets=range(data.shape[0]), length=data_len, batch_size=1, stride=1
)
x_all = []
for i in generator:
x_all.append(i[0][0])
x_all = np.array(x_all)
y_all = data[range(data_len, len(x_all) + data_len)]
rate = 0.4
x_train = x_all[: int(len(x_all) * (1 - rate))]
y_train = y_all[: int(y_all.shape[0] * (1 - rate))]
x_val = x_all[int(len(x_all) * (1 - rate)) :]
y_val = y_all[int(y_all.shape[0] * (1 - rate)) :]
return x_train, y_train, x_val, y_val
def parse_args():
"""
Parse arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument("--target_folder", type=str, help="Path to the training data")
parser.add_argument(
"--experiment",
type=bool,
default=False,
help="Just run an experiment, there is no pipeline",
)
parser.add_argument(
"--log_folder", type=str, help="Path to the log", default="./logs"
)
args = parser.parse_args()
return args
# 考慮後續訓練時,先讀取效果最好的模型,以此為基礎繼續訓練
def load_best_model(work_space, model_name, x_val, y_val):
"""
load the best model from registered models
"""
model_obj = Model(work_space, model_name)
# 取得模型清單,擷取最近五個版本。除了版本 version 以外,屬性 properties 也是可以作為選擇模型的依據
model_list = model_obj.list(work_space, name=model_name)
version = [i.version for i in model_list]
version.sort(reverse=True)
# 選擇最近訓練的五個模型,並且以最近一段時間的資料評估模型的效果
version = version[:5]
val_loss = []
for i in version:
print(i)
model_obj = Model(work_space, model_name, version=i)
model_path = model_obj.download(exist_ok=True)
model = load_model(model_path)
val_loss.append(model.evaluate(x_val, y_val))
# 選擇 loss 最小的模型
model_obj = Model(
work_space, model_name, version=version[val_loss.index(min(val_loss))]
)
model_path = model_obj.download(exist_ok=True)
model = load_model(model_path)
return model, min(val_loss), version[val_loss.index(min(val_loss))]
def main():
"""
Training of LeNet with keras
"""
args = parse_args()
# 在`workspace`執行時,取得當下資訊
run = Run.get_context()
usd_twd = pd.read_csv(os.path.join(args.target_folder, "training_data.csv"))
data = usd_twd.Close.values.reshape(-1, 1)
with open(os.path.join(args.target_folder, "scaler.pickle"), "rb") as f_h:
scaler = pickle.load(f_h)
f_h.close()
data = scaler.transform(data)
data_len = 240
x_train, y_train, x_val, y_val = data_generator(data, data_len)
# 單純執行實驗時,需要先定義模型架構,並且使用tensorboard
loss_threshold = 1
version = 0
if args.experiment:
model = Sequential()
model.add(LSTM(16, input_shape=(data_len, 1)))
model.add(Dropout(0.1))
model.add(Dense(1))
model.compile(loss="mse", optimizer="adam")
# Tensorboard
callback = TensorBoard(
log_dir=args.log_folder,
histogram_freq=0,
write_graph=True,
write_images=True,
embeddings_freq=0,
embeddings_layer_names=None,
embeddings_metadata=None,
)
# 執行 pipeline 時,先讀取效果最好的模型
else:
# 取得`workspace`權限
work_space = run.experiment.workspace
model, loss_threshold, version = \
load_best_model(work_space,
model_name="currency",
x_val=x_val,
y_val=y_val)
origin_model = model
print("Load Model")
# 如果 val_loss 進步太慢,就結束訓練
callback = EarlyStopping(monitor="val_loss",
mode="min",
min_delta=1e-8,
patience=50)
# train the network
history_callback = model.fit(
x_train,
y_train,
epochs=1000,
batch_size=240,
verbose=1,
validation_data=[x_val, y_val],
callbacks=[callback],
)
print("Finished Training")
# 以 validation data 確認模型的效果,保留效果好的模型
metrics = history_callback.history
# 若剛訓練好的模型比之前模型效果好,將訓練的細節記錄下來
if metrics["val_loss"][-1] <= loss_threshold:
run.log_list("train_loss", metrics["loss"][:10])
run.log_list("val_loss", metrics["val_loss"][:10])
run.log_list("start", [usd_twd.Date.values[0]])
run.log_list("end", [usd_twd.Date.values[-1]])
run.log_list("epoch", [len(history_callback.epoch)])
run.log_list("last_version", [version])
model.save("outputs/keras_lstm.h5")
properties = {
"train_loss": metrics["loss"][-1],
"val_loss": metrics["val_loss"][-1],
"data": "USD/TWD from {0} to {1}".format(
usd_twd.Date.values[0], usd_twd.Date.values[-1]
),
"epoch": len(history_callback.epoch),
"last_version": version,
}
# 反之,則記錄 val_loss,以及說明此模型是繼承哪一個版本的模型
else:
run.log_list("val_loss", [loss_threshold])
run.log_list("last_version", [version])
origin_model.save("outputs/keras_lstm.h5")
properties = {"val_loss": loss_threshold, "last_version": version}
if args.experiment:
with open("outputs/scaler.pickle", "wb") as f_h:
pickle.dump(scaler, f_h)
f_h.close()
else:
# 為了讓整個流程自動化,所以訓練完,直接在`workspace`註冊模型
model = Model.register(
workspace=work_space,
model_name="currency",
tags={"model": "LSTM"},
model_path="outputs/keras_lstm.h5",
model_framework="keras",
model_framework_version="2.2.4",
properties=properties,
)
print("Registered Model")
if __name__ == "__main__":
main()
在訓練之後,選擇當下最佳的模型註冊,這樣在部署服務時,就可以直接使用最新版的模型部署服務。deploy_currency_prediction
的內容不變,只是在執行pipeline
時,workspace
的權限也是必須靠Run
取得。
deploy_currency_prediction
import os
import numpy as np
from azureml.core import Model, Workspace
from azureml.core import Run
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice
from azureml.core.authentication import InteractiveLoginAuthentication
def main():
"""
Deploy model to your service
"""
run = Run.get_context()
try:
work_space = run.experiment.workspace
except AttributeError:
interactive_auth = InteractiveLoginAuthentication(
tenant_id=os.getenv("TENANT_ID")
)
work_space = Workspace.from_config(auth=interactive_auth)
environment = work_space.environments["train_lstm"]
model = Model(work_space, "currency")
service_name = "currency-service"
inference_config = InferenceConfig(
entry_script="predict_currency.py", environment=environment
)
aci_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)
scaler = Model(work_space, name="scaler", version=1)
service = Model.deploy(
workspace=work_space,
name=service_name,
models=[model, scaler],
inference_config=inference_config,
deployment_config=aci_config,
overwrite=True,
)
service.wait_for_deployment(show_output=True)
print(service.get_logs())
print(service.scoring_uri)
if __name__ == "__main__":
main()
跟之前上一篇的run_pipeline_data.py
比較起來,這邊會再新增兩個流程到pipeline
之中,分別是為了模型訓練和服務部署。
run_pipeline.py
import os
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import Pipeline
def main():
interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
work_space = Workspace.from_config(auth=interactive_auth)
datastore = work_space.get_default_datastore()
input_folder = (
Dataset.File.from_files(path=(datastore, "currency"))
.as_named_input("input_folder")
.as_mount()
)
dataset = OutputFileDatasetConfig(
name="usd_twd", destination=(datastore, "currency")
)
aml_run_config = RunConfiguration()
environment = work_space.environments["train_lstm"]
aml_run_config.environment = environment
# 更新資料的步驟
get_currency = PythonScriptStep(
source_directory=".",
name="get_currency",
script_name="get_currency.py",
compute_target="cpu-cluster",
runconfig=aml_run_config,
arguments=[
"--target_folder",
dataset.as_upload(overwrite=True).register_on_complete(name="currency"),
"--input",
input_folder,
],
allow_reuse=True,
)
# 訓練模型的步驟
training = PythonScriptStep(
source_directory=".",
name="train_lstm",
script_name="train_lstm.py",
compute_target="cpu-cluster",
runconfig=aml_run_config,
arguments=["--target_folder", dataset.as_input()],
allow_reuse=True,
)
# 部署服務的步驟
deploy = PythonScriptStep(
source_directory=".",
name="deploy_currency_prediction",
script_name="deploy_currency_prediction.py",
compute_target="cpu-cluster",
runconfig=aml_run_config,
allow_reuse=True,
)
experiment = Experiment(work_space, "pipeline_data_train_deploy")
pipeline = Pipeline(workspace=work_space, steps=[get_currency, training, deploy])
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)
# Pipeline 必須被發布,才能在後續進行排程(schedule)
run.publish_pipeline(
name="pipeline_data_train_deploy",
description="data-->train-->deploy",
version="1.0",
)
if __name__ == "__main__":
main()
執行python3.7 run_pipeline.py
完成後,一樣可以從workspace
觀察結果和pipeline
的流程圖。
Azure machine learning 的介紹即將進入尾聲,既然都有了pipeline
可以從把集資料、訓練模型、部署服務,全部都串成起來,依照順序執行,最後一件事情就是將pipeline
排程。下一篇將介紹如何在workspace
排程。