傳統的SaaS稱為Software as a Service,或許我們今天要build的可以更狹義地稱為Solver as a Service,其特色為:
稍等一下,根據[Day08]的精進大綱,沒有今天這個主題呀。
沒錯!其實我們猶豫了一陣子該不該講這個主題呢?不是想藏私,是真正production的環境裡,我們有自建的前端、後端、資料庫、ANSA的前處理節點及LS-DYNA的求解節點來搭配Prefect Cloud。該如何有個簡單的範例,來講解如何搭配Prefect Cloud,著實讓我們失眠了幾晚。
幾經思量,我們訂出目標是使用[Day20]部屬於Streamlit Cloud的App做為前端,於前端上傳input_data.json後,透過Prefect CLoud自動下載input_data.json及產生box_drop.k,並自動呼叫LS-DYNA進行求解。
今天的code是由我們自建的Solver as a Service抽取出來,做了大量的簡化,目標是做最基本的概念分享,離實務有點距離。
實務上需要考慮各種可能的情況,諸如:
config上傳到Linode Object Storage。config下載至ANSA前處理節點(ex:網路偶而不順時,需設定retry次數)。Prefect可以視為一種自動化處理資料的工具。只需要對原有的code進行少許修改,就可以快速地串接為自動化的流程。我們也曾經用過Apache Airflow,除了需要寫較多的code外,處理DAGs有時也會遇到滿棘手的問題。Prefect 2.0後改進及添加許多功能,而且有了全新的UI,所以我們決定試試換到Prefect,現在看來成效也還不錯。
Prefect Cloud是一種orchestration as a service,需要獲取API KEY才能進使用其服務。getting-started文件可以找到相關的資訊。
下面簡單說明今天會用到的Prefect功能:
flow是Prefect用來區別不同工作的decorator。當一個function被@flow後,prefect會知道這是此工作的入口,而flow包在flow內作為subflow及@task可以幫助我們將工作做更細的規劃。ConcurrentTaskRunner,但如果我們想要讓工作是有序的執行,則需選擇SequentialTaskRunner。因為我們的範例只預設一個concurrent job,所以選哪個應該都可以。package,現在好像只有一個名為shell_run_command的function。此function已經@task可以用來執行shell command。此處我們hardcode四個變數名:
jfile為下載的json config本機端檔案路徑。kfile為ANSA寫出的box_drop.k本機端檔案路徑。call_ansa_py為box_drop.py本機端檔案路徑。s3_url為透過Streamlit上傳的遠端config檔案路徑。由於box drop project有用到numpy及scipy,所以這邊需要做點處理。如果是於ANSA的GUI環境下,我們可以直接import numpy及scipy使用,但是此處我們是準備透過Prefect藉由command line來呼叫ANSA,此時當前的Python環境為conda或自建的venv等虛擬環境,在ANSA被呼叫之後,是沒有辦法自動引入其附的五個third-party package。我們的辦法是透過先import sys module,然後透過sys.path.append將numpy及scipy的所在路徑加入搜尋路徑中。
改寫box_drop.py中的main function:
jfile參數 ,並利用json讀取此檔案,作為config。run_dyna移出此main function,示範如何建立task。if __name__ == '__main__':,原因有二:
main flow中下載config。subprocess直接呼叫box_drop的main function。#box_drop.py
import json
import sys
from pathlib import Path
sys.path.append(f'{Path.home()}/BETA_CAE_Systems/shared_v23.0.0/python/linux64/lib/python3.8/site-packages/numpy-1.21.3-py3.8-linux-x86_64.egg')
sys.path.append(f'{Path.home()}/BETA_CAE_Systems/shared_v23.0.0/python/linux64/lib/python3.8/site-packages/scipy-1.7.1-py3.8-linux-x86_64.egg')
def main(jfile):
with open(jfile) as f:
config = json.load(f)
......
#remove run_dyna(file_str)
#remove lines after if __name__ == '__main__':
pull_to_trigger並以@flow(task_runner=SequentialTaskRunner())裝飾,作為Prefect的入口。
urllib.request下載遠端的config檔。jfile是否已經存在本機,如果存在就比較remote及local config檔的MD5 hash值是否相同,更新is_triggered。jfile不存在本機或is_triggered為真的話,就使用遠端的config覆蓋寫入jfile。接著呼叫call_ansa,並於呼叫完畢後,透過檢查kfile是否存在,來判斷是否呼叫run_dyna。#pfct.py
@flow(task_runner=SequentialTaskRunner())
def pull_to_trigger():
ssl._create_default_https_context = ssl._create_unverified_context
is_triggered, is_file = False, Path(jfile).is_file()
with urllib.request.urlopen(s3_url) as resp:
remote_config = resp.read()
if is_file:
with open(jfile, 'rb') as frb:
remote_config_md5 = hashlib.md5(remote_config).hexdigest()
local_config_md5 = hashlib.md5(frb.read()).hexdigest()
is_triggered = remote_config_md5 != local_config_md5
if not is_file or is_triggered:
new_config = json.loads(remote_config.decode("utf-8"))
with open(jfile, 'w') as fw:
json.dump(new_config, fw)
call_ansa(jfile)
if Path(kfile).is_file():
run_dyna()
call_ansa主要參考Interacting with ANSA的Running Scripts without GUI。
下面這段code的白話意思是,在不打開ANSA GUI的情況下,呼叫ANSA執行call_ansa_py這個檔案中的main function,並把jfile作為參數傳給main。
可能您會問,為什麼不使用shell_run_command呢?很遺憾,我們試過很多種方式,總是沒辦法呼叫成功,所以只能使用原生的subprocess。
最後請留意,我們使用了@task來宣告這是一個task。
#pfct.py
@task
def call_ansa(jfile):
command = ['ansa',
'-exec',
f"load_script: '{call_ansa_py}'",
'-exec',
f"main('{jfile}')",
'-nogui']
return subprocess.run(command)
run_dyna使用shell_run_command呼叫求解的指令。需要注意的是,因為我們是透過prefect來呼叫LS-DYNA,所以需要處理路徑問題,要先cd到當前目錄。
因為prefect認為@task內不能再有@task,所以我們不能將run_dyna加上@task,原因是shell_run_command內已經有@task了。
但@flow內可以再有@flow,所以我們可以給run_dyna加上@flow作為的subflow。
#pfct.py
@flow
def run_dyna():
run_kfile = Path(kfile)
dyna_dir_str = (run_kfile.parent).as_posix()
solver = f'{Path.home()}/LS-DYNA/13.0/smp-dyna_s'
i = run_kfile.as_posix()
command = f'cd {dyna_dir_str};\
{solver} i={i} ncpu=8 memory=1024m d=nodump'
return shell_run_command(command=command, return_all=True)
deploy_pfct.sh為我們透過指令部署code至Prefect cloud的script,詳細的說明可以參考官方文件。
prefect work-queue create day21 -l 1為建立一個concurrent_limit為1的work-queue。prefect deployment build pfct.py:pull_to_trigger -n pull_to_trigger -t pull_to_trigger -q day21為設定使用day21這個work-queue來搭配pfct.pull_to_trigger這個flow。並且命名此deployment為pull_to_trigger而tag亦為pull_to_trigger。prefect deployment apply pull_to_trigger-deployment.yaml為真正使用pull_to_trigger-deployment.yaml檔案進行部署。prefect agent start -q 'day21'為真正啟動work-queue,準備迎接傳入的flow。#deploy_pfct.sh
#!/usr/bin/bash
prefect work-queue create day21 -l 1
prefect deployment build pfct.py:pull_to_trigger -n pull_to_trigger -t pull_to_trigger -q day21
prefect deployment apply pull_to_trigger-deployment.yaml
prefect agent start -q 'day21'
接著透過sh deploy_pfct.sh即可完成部署。
我們可以看到兩個flow都成功地執行。
pull-to-trigger flow的詳細log。
run-dyna flow的詳細log。
於Prefect Cloud的Deployments面板裡,有調整scheduling的選項。這樣一來,您可以設定一個合理的檢查間距,自動去確認是否有flow需要執行。
請注意因為我們concurrent_limit是1,所以當您的檢查間距過密時,flow可能還沒執行完畢,最後的結果會是不斷延遲並不斷累積flow。
解決的方法之一是不要有hardcode的jfile、kfile甚至s3_url,至於詳細怎麼做...請留一點給我們賺吧XD
或許明年鐵人賽我們再繼續深入討論?
box drop project至此告一段落。希望諸位收獲滿滿,也看出我們是很認真地在設計分享的內容。如果覺得這些文章對您有幫助的話,可以幫我們點個讚或留言支持我們,感恩!
明天讓我們暫時離開ANSA,利用Streamlit來建立一個job submitter project,使其可在Windows WSL2下搭配LS-DYNA small system運行。