前面利用pipeline
管線把好幾個不同的工作項目串接起來了,接下來就可以利用schedule
的功能,讓整個管線定期執行。這就好比用crontab
在本地端讓自己的程式碼定期執行一樣,而 Azure 則是在雲端用schedule
實驗排程的功能,這樣一來,所有功能都能在雲端執行,自己也不用準備 server 隨時待命,Azure machine learning 執行完也會把紀錄留下,有需要再到workspace
檢查即可。接下來就相當單純了,只要選定之前已經發佈的管線,設定好時間與頻率,使其定期發動。
import os
from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import PublishedPipeline
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule, TimeZone
def main():
interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
work_space = Workspace.from_config(auth=interactive_auth)
# 只有已發布的 pipeline 才能進行排程
pipelines = PublishedPipeline.list(work_space)
pipeline_id = next(
p_l.id for p_l in pipelines if p_l.name == "pipeline_data_train_deploy"
)
# 排程的時候,要注意時區,才能確保在正確的時間執行
recurrence = ScheduleRecurrence(
frequency="Week", # 觸發排程頻率的時間單位,可以是 "Minute"、"Hour"、"Day"、"Week" 或 "Month"。
interval=1, # 間隔多少時間單位觸發
start_time="2021-07-21T07:00:00",
time_zone=TimeZone.TaipeiStandardTime,
week_days=["Sunday"], # 如果每週執行的話,可以選擇某一天執行
time_of_day="6:00",
)
Schedule.create(
work_space,
name="pipeline_data_train_deploy",
description="Get data, train model and deploy service at 6:00 every Sunday",
pipeline_id=pipeline_id,
experiment_name="pipeline_data_train_deploy",
recurrence=recurrence,
)
if __name__ == "__main__":
main()
執行python3.7 create_schedule.py
之後,其實無法從workspace
看到排程的相關資訊,不過可以透過以下做法得知目前所有排程
import os
from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import PublishedPipeline
from azureml.pipeline.core.schedule import Schedule
interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
work_space = Workspace.from_config(auth=interactive_auth)
sche_list = Schedule.list(work_space)
print(sche_list)
這樣就能看到排程的資訊,其中也包含了詳細的執行頻率與時間,例如:每週日早上六點執行一次。
另外,如果管線已安排排程,那就必須把schedule
刪掉,才能刪除pipeline
。以上述情況為例:
sche = next(s for s in sche_list if s.id == "18ff1269-d837-42b6-85f1-972171ef6216")
sche.disable()
pipe_list = PublishedPipeline.list(work_space)
pipe = next(p_l.id for p_l in pipe_list if p_l.name == "pipeline_data_train_deploy")
pipe.disable()
一開始比較不熟悉,應該會很常刪掉管線和排程,重複實驗,所以提供大家參考。
經過幾篇文章的折騰,現在我們有了一個可以預測匯率並且定期更新模型的服務,實作了MLOps。接下來,我們可以把它整合進 Line chatbot,提供預測匯率的私人服務了。(謎之聲:下一篇就是最後一個中頭目了~~)