今天我們來講解stem核心的code。




RUN為一個st.checkbox,可以將其視為開始/暫停的切換開關。
RUN是有打勾狀態時,Streamlit會不斷Auto-refresh interval。RUN不在打勾狀態時,我們可以執行Remove tasks或Move task。RUN的st.checkbox。RUN是有打勾狀態時,我們計算當前所有task的status有多少是TaskStatus.running。MAX_CONCURRENT_LIMIT,代表我們還有資源可以執行最少一個task。此時從tasks取出一個status為TaskStatus.staging的task執行run_task。#app.py
RUN = st.checkbox('RUN')
if RUN:
len_run_statuses = sum(1 for status in get_task_statuses()
if status == TaskStatus.running)
if len_run_statuses < MAX_CONCURRENT_LIMIT:
for task in tasks:
if task.status == TaskStatus.staging:
run_task(task.id)
break
parse_dyna_folder parse task.cmd取得dyna_folder。subprocess.Popen呼叫subprocess.Popen於WSL2下執行求解指令。由於是使用subprocess ,我們必須先cd到parse_dyna_folder。請留意,此處的creationflags=subprocess.CREATE_NEW_CONSOLE是指每個subprocess皆會彈出一個新的視窗。如果於求解過程中,關閉視窗的話,該task就會結束。subprocess.Popen的回傳值指定給task.cp。此回傳值即為subprocess.Popen所產生的process,可以作為我們後續與此process溝通的管道。#app.py
def run_task():
dyna_folder = parse_dyna_folder(task.cmd)
cp = subprocess.Popen(['wsl', '--cd', dyna_folder, '-e', *task.cmd.split()],
creationflags=subprocess.CREATE_NEW_CONSOLE)
task.cp = cp
st.sidebar是調整執行LS-DYNA command的面板,可以調整的有:
solver version。solver type(目前只完成smp部份,mpp及hybrid待開發)。solver precision。core數。memory使用量。d=nodump可選擇)。ADD_TASKS的st.sidebar。ADD_TASKS中建立一個名為ADD_TASKS_FORM的st.form。ADD_TASKS_FORM中透過各種widget收集UI中的各項資訊。add_tasks_button。
RUN是不是已經unchecked。
uncheck,則執行add_tasks。uncheck,則會跳出一個warning通知使用者。#app.py
ADD_TASKS = st.sidebar
with ADD_TASKS:
ADD_TASKS_FORM = st.form("add-tasks-form")
with ADD_TASKS_FORM:
solver_version_ = st.selectbox(
'LS-DYNA version', solver_version_pool)
solver_type_ = solver_type_mapping.get(st.selectbox(
'LS-DYNA solver type', solver_type_mapping.keys()))
solver_precision_ = solver_precision_mapping.get(st.selectbox(
'LS-DYNA solver precision', solver_precision_mapping.keys()))
ncpu_ = st.slider('LS-DYNA cores', 1, 24, 2)
memory_ = st.slider('Memory(Mb)', 200, 2000, 200, 100)
consoles_ = st.multiselect('Consoles', ['d=nodump'], ['d=nodump'])
solver_name = solver_type_ + '-dyna_' + solver_precision_
solver = '/'.join([solver_dir, solver_version_, solver_name])
ncpu = f'ncpu={ncpu_}'
memory = f'memory={memory_}m'
consoles = ' '.join(consoles_) if consoles_ else ''
glob_ckbox = st.checkbox('Get *.k recursively (same config)', True)
add_tasks_button = st.form_submit_button('? Add task(s)')
if add_tasks_button:
if not RUN:
add_tasks(solver, ncpu, memory, consoles, glob_ckbox)
else:
st.warning('Please uncheck RUN button first')
decks的list,用來收集檔案路徑。glob_ckbox是checked還是unchecked。
unchecked的話,代表我們僅需要submit單個task,這裡使用 filedialog.askopenfilename來獲取檔案路徑。如若使用者有選取檔案的話,透過tk_2_wsl2 將路徑轉換後,指定給decks。checked的話,代表我們需要submit一個資料夾下各資料夾內的k檔,這裡使用 filedialog.askdirectory來獲取資料夾路徑。如若使用者有選取資料夾的話,透過tk_2_wsl2 將各k檔路徑轉換後,置入decks。decks內的deck傳入add_task。#app.py
def add_tasks(solver, ncpu, memory, consoles, glob_ckbox):
decks = []
if not glob_ckbox:
kfile = filedialog.askopenfilename(
master=root,
initialdir=initial_dir,
filetypes=[('LS-DYNA file', '.key .k .i .dyn dynain .key.gz .k.gz .i.gz .dyn.gz')])
if kfile:
deck = f'i={tk_2_wsl2(kfile)}'
decks = [deck]
elif glob_ckbox:
kfolder = filedialog.askdirectory(
master=root, initialdir=initial_dir)
if kfolder:
for kfile in Path(kfolder).glob('**/*k'):
deck = f'i={tk_2_wsl2(kfile.as_posix())}'
decks.append(deck)
for deck in decks:
add_task(solver, deck, ncpu, memory, consoles)
create_task_id建立獨特的識別碼,命名為task_id。task_cmd。get_sentinel取得獨特的sentinel,命名為task_cp。task_id、task_cmd及task_cp
#app.py
def add_task(solver, deck, ncpu, memory, consoles):
task_id = create_task_id()
task_cmd = ' '.join(
[solver, deck, ncpu, memory, consoles])
task_cp = get_sentinel()
task_data = {'id': task_id,
'cmd': task_cmd,
'cp': task_cp}
task = Task(**task_data)
tasks = get_tasks()
tasks.append(task)
return task
README是一個可以收合的st.expander,裡面的資訊有:
Auto-refresh interval。Max concurrent limit。LSTC_LICENSE、LSTC_LICENSE_SERVER及WSLENV三個所需環境變數的設定指令。README的st.expander。st.metric來突顯Auto-refresh interval及Max concurrent limit。st.code顯示三個需設定的環境變數。#app.py
README = st.expander('README')
with README:
col_st_auto_fresh_interval, col_max_concurrent_limit, * \
_ = st.columns((1, 1, 4))
col_st_auto_fresh_interval.metric(
'Auto-refresh interval', f'{ST_AUTO_REFRESH_INTERVAL} ms')
col_max_concurrent_limit.metric(
'Max concurrent limit', f'{MAX_CONCURRENT_LIMIT} tasks')
st.markdown(
'open powershell terminal and set up 3 environment variables.')
st.code('''setx LSTC_LICENSE "network"''')
st.code('''setx LSTC_LICENSE_SERVER "192.168.0.5"''')
st.code('''setx WSLENV "LSTC_LICENSE/u:LSTC_LICENSE_SERVER/u"''')
st.markdown(
f'Uncheck **RUN** before add, remove, move tasks. If **RUN** is checked, stem will continuously wait for tasks.')
Refresh是一個st.container。
REFRESH的st.container。refresh_button的st.button。refresh_button。如果有按下的話,則執行st_sync。#app.py
REFRESH = st.container()
with REFRESH:
refresh_button = st.button('Refresh')
if refresh_button:
st_sync()
st_sync包含sync_task_cp_status及sync_insertable_idx兩個function。sync_task_cp_status更新當前所有task的status,而sync_insertable_idx則取得當前task可供插入的第一個index。get_tasks取得tasks及get_sentinel取得sentinel。tasks打一個迴圈,判斷每個task的cp是否被指定為除了sentinel以外的值。如果是的話,透過cp.poll判斷其回傳值是否為None。
None的話,代表該process還在跑,將task.status指定為TaskStatus.running。None的話,透過cp.returncode判斷其值是否為0 。
0的話,代表該process已經結束,將task.status指定為TaskStatus.finished。0的話,代表該process可能有不正常情況發生,將task.status指定為TaskStatus.notOK。get_tasks取得tasks。tasks打一個迴圈,直到遇到第一個task的status為TaskStatus.staging時跳出迴圈。該將task的index指定給st.session_state['insertable_idx']。#app.py
def sync_task_cp_status():
'''
cp.poll() is None means alive
returncode=0 means successful for subprocess
'''
tasks = get_tasks()
sentinel = get_sentinel()
for task in tasks:
cp = task.cp
if cp != sentinel:
if cp.poll() is None:
task.status = TaskStatus.running
else:
if cp.returncode == 0:
task.status = TaskStatus.finished
else:
task.status = TaskStatus.notOK
def sync_insertable_idx():
tasks = get_tasks()
idx = 0
for task in tasks:
if task.status == TaskStatus.staging:
break
idx += 1
st.session_state['insertable_idx'] = idx
def st_sync():
sync_task_cp_status()
sync_insertable_idx()
PERFORM_TASKS是一個st.columns。
PERFORM_TASKS的st.columns。
PERFORM_TASKS中建立一個名為REMOVE_TASKS的st.container及一個名為REMOVE_TASKS_FORM的st.form。
REMOVE_TASKS_FORM中透過st.multiselect收集想要刪除的task id,這些task的status皆需為TaskStatus.staging。removed_task_button。如果有按的話:
RUN是否為打勾狀態:
task。如果成功取得則呼叫remove_task移除,失敗的話則顯示warning。warning。PERFORM_TASKS中建立一個名為MOVE_TASK的st.container及一個名為MOVE_TASK_FORM的st.form。
MOVE_TASK_FORM中透過st.selectbox取得想移動task的id及透過st.number_input取得想插入位置的index。move_task_button。如果有按的話:
RUN是否為打勾狀態:
move_id及target_idx的話,則呼叫move_task,失敗的話則顯示warning。warning。#app.py
PERFORM_TASKS, DASHBOARD = st.columns((1, 3))
with PERFORM_TASKS:
REMOVE_TASKS, REMOVE_TASKS_FORM = st.container(), st.form('remove-tasks-form')
with REMOVE_TASKS:
with REMOVE_TASKS_FORM:
s_removed_task_ids = st.multiselect('Select tasks to be removed',
get_staging_task_ids())
removed_task_button = st.form_submit_button('Remove tasks')
if removed_task_button:
if not RUN:
s_removed_tasks = [get_task_by_id(s_remove_task_id)
for s_remove_task_id in s_removed_task_ids]
if s_removed_tasks:
for s_removed_task in s_removed_tasks:
remove_task(s_removed_task)
else:
st.warning(f'Invalid operation.Nothing happened!')
else:
st.warning('Please uncheck RUN button first')
MOVE_TASK, MOVE_TASK_FORM = st.container(), st.form('move-task-form')
with MOVE_TASK:
with MOVE_TASK_FORM:
move_id = st.selectbox('Move the selected task :',
get_staging_task_ids())
target_idx = st.number_input('To position :',
1,
len(get_tasks()))
move_task_button = st.form_submit_button('Move task')
if move_task_button:
if not RUN:
if move_id and target_idx:
move_task(move_id, target_idx)
else:
st.warning('Invalid input')
else:
st.warning('Please uncheck RUN button first')
get_tasks取得st.session.tasks這個list,接著利用list.removef移除st.session.tasks中的task。此處用try-catch是因為remove_task其實也可以使用在非GUI輸入時。#app.py
def remove_task(task):
tasks = get_tasks()
try:
tasks.remove(task)
except ValueError:
pass
get_task_by_id取得move_task。
move_task不為None的話:
inserted_idx >= get_insertable_idx()且move_id != be_inserted_task.id,則透過list.remove移除move_task且透過list.insert插入move_task至inserted_idx(註1)。warning。None的話則顯示warning。#app.py
def move_task(move_id, target_idx):
move_task = get_task_by_id(move_id)
if move_task is not None:
inserted_idx = target_idx-1
be_inserted_task = tasks[inserted_idx]
conds = (inserted_idx >= get_insertable_idx(),
move_id != be_inserted_task.id)
if all(conds):
tasks.remove(move_task)
tasks.insert(inserted_idx, move_task)
else:
st.warning(f'Invalid operation.Nothing happened!')
else:
st.warning('No tasks can be moved now')
Dashboard是一個st.columns,可以顯示:
JobID。solver type與precision。deck。core數。memory。status(task執行狀態)。emogi(以顏色表示task執行狀態)。DASHBOARD的st.columns(實際建立的code在Perform_tasks中)。get_ds取得準備顯示於dashboard中的list of dicts,命名為ds。ds 不為空的話:
st.empty中,使用pd.DataFrame將ds轉換為dataframe,並使用st.dataframe呈現。st.empty的功用是可於每次進行手動Refresh或Auto-refresh時,更新顯示的dataframe。dataframe寫出為csv檔。#app.py
with DASHBOARD:
ds = get_ds(tasks)
if ds:
with st.empty():
df = pd.DataFrame(ds, index=range(1, len(ds)+1))
st.dataframe(df)
st.download_button(label="Export to CSV",
data=convert_df(df),
file_name=get_csv_filename(),
mime='text/csv')
於terminal中確認虛擬環境已啟動後,輸入streamlit run app.py即可開啟這個App。
恭喜您有勇氣看到現在,今天的內容也是相當硬呀XD
註1:會要做這兩個檢查是因為,我們沒辦法讓move_id及target_idx的widget做即時的顯示更新,所以改於程式中進行檢查。