今天我們來講解stem
核心的code。
RUN
為一個st.checkbox
,可以將其視為開始/暫停
的切換開關。
RUN
是有打勾狀態時,Streamlit
會不斷Auto-refresh interval
。RUN
不在打勾狀態時,我們可以執行Remove tasks
或Move task
。RUN
的st.checkbox
。RUN
是有打勾狀態時,我們計算當前所有task
的status
有多少是TaskStatus.running
。MAX_CONCURRENT_LIMIT
,代表我們還有資源可以執行最少一個task
。此時從tasks
取出一個status
為TaskStatus.staging
的task
執行run_task
。#app.py
RUN = st.checkbox('RUN')
if RUN:
len_run_statuses = sum(1 for status in get_task_statuses()
if status == TaskStatus.running)
if len_run_statuses < MAX_CONCURRENT_LIMIT:
for task in tasks:
if task.status == TaskStatus.staging:
run_task(task.id)
break
parse_dyna_folder
parse task.cmd
取得dyna_folder
。subprocess.Popen
呼叫subprocess.Popen
於WSL2
下執行求解指令。由於是使用subprocess
,我們必須先cd
到parse_dyna_folder
。請留意,此處的creationflags=subprocess.CREATE_NEW_CONSOLE
是指每個subprocess
皆會彈出一個新的視窗。如果於求解過程中,關閉視窗的話,該task
就會結束。subprocess.Popen
的回傳值指定給task.cp
。此回傳值即為subprocess.Popen
所產生的process
,可以作為我們後續與此process
溝通的管道。#app.py
def run_task():
dyna_folder = parse_dyna_folder(task.cmd)
cp = subprocess.Popen(['wsl', '--cd', dyna_folder, '-e', *task.cmd.split()],
creationflags=subprocess.CREATE_NEW_CONSOLE)
task.cp = cp
st.sidebar
是調整執行LS-DYNA command的面板,可以調整的有:
solver version
。solver type
(目前只完成smp部份,mpp及hybrid待開發)。solver precision
。core
數。memory
使用量。d=nodump
可選擇)。ADD_TASKS
的st.sidebar
。ADD_TASKS
中建立一個名為ADD_TASKS_FORM
的st.form
。ADD_TASKS_FORM
中透過各種widget
收集UI
中的各項資訊。add_tasks_button
。
RUN
是不是已經unchecked
。
uncheck
,則執行add_tasks
。uncheck
,則會跳出一個warning
通知使用者。#app.py
ADD_TASKS = st.sidebar
with ADD_TASKS:
ADD_TASKS_FORM = st.form("add-tasks-form")
with ADD_TASKS_FORM:
solver_version_ = st.selectbox(
'LS-DYNA version', solver_version_pool)
solver_type_ = solver_type_mapping.get(st.selectbox(
'LS-DYNA solver type', solver_type_mapping.keys()))
solver_precision_ = solver_precision_mapping.get(st.selectbox(
'LS-DYNA solver precision', solver_precision_mapping.keys()))
ncpu_ = st.slider('LS-DYNA cores', 1, 24, 2)
memory_ = st.slider('Memory(Mb)', 200, 2000, 200, 100)
consoles_ = st.multiselect('Consoles', ['d=nodump'], ['d=nodump'])
solver_name = solver_type_ + '-dyna_' + solver_precision_
solver = '/'.join([solver_dir, solver_version_, solver_name])
ncpu = f'ncpu={ncpu_}'
memory = f'memory={memory_}m'
consoles = ' '.join(consoles_) if consoles_ else ''
glob_ckbox = st.checkbox('Get *.k recursively (same config)', True)
add_tasks_button = st.form_submit_button('? Add task(s)')
if add_tasks_button:
if not RUN:
add_tasks(solver, ncpu, memory, consoles, glob_ckbox)
else:
st.warning('Please uncheck RUN button first')
decks
的list
,用來收集檔案路徑。glob_ckbox
是checked
還是unchecked
。
unchecked
的話,代表我們僅需要submit單個task
,這裡使用 filedialog.askopenfilename
來獲取檔案路徑。如若使用者有選取檔案的話,透過tk_2_wsl2
將路徑轉換後,指定給decks
。checked
的話,代表我們需要submit一個資料夾下各資料夾內的k檔,這裡使用 filedialog.askdirectory
來獲取資料夾路徑。如若使用者有選取資料夾的話,透過tk_2_wsl2
將各k檔路徑轉換後,置入decks
。decks
內的deck
傳入add_task
。#app.py
def add_tasks(solver, ncpu, memory, consoles, glob_ckbox):
decks = []
if not glob_ckbox:
kfile = filedialog.askopenfilename(
master=root,
initialdir=initial_dir,
filetypes=[('LS-DYNA file', '.key .k .i .dyn dynain .key.gz .k.gz .i.gz .dyn.gz')])
if kfile:
deck = f'i={tk_2_wsl2(kfile)}'
decks = [deck]
elif glob_ckbox:
kfolder = filedialog.askdirectory(
master=root, initialdir=initial_dir)
if kfolder:
for kfile in Path(kfolder).glob('**/*k'):
deck = f'i={tk_2_wsl2(kfile.as_posix())}'
decks.append(deck)
for deck in decks:
add_task(solver, deck, ncpu, memory, consoles)
create_task_id
建立獨特的識別碼,命名為task_id
。task_cmd
。get_sentinel
取得獨特的sentinel
,命名為task_cp
。task_id
、task_cmd
及task_cp
#app.py
def add_task(solver, deck, ncpu, memory, consoles):
task_id = create_task_id()
task_cmd = ' '.join(
[solver, deck, ncpu, memory, consoles])
task_cp = get_sentinel()
task_data = {'id': task_id,
'cmd': task_cmd,
'cp': task_cp}
task = Task(**task_data)
tasks = get_tasks()
tasks.append(task)
return task
README
是一個可以收合的st.expander
,裡面的資訊有:
Auto-refresh interval
。Max concurrent limit
。LSTC_LICENSE
、LSTC_LICENSE_SERVER
及WSLENV
三個所需環境變數的設定指令。README
的st.expander
。st.metric
來突顯Auto-refresh interval
及Max concurrent limit
。st.code
顯示三個需設定的環境變數。#app.py
README = st.expander('README')
with README:
col_st_auto_fresh_interval, col_max_concurrent_limit, * \
_ = st.columns((1, 1, 4))
col_st_auto_fresh_interval.metric(
'Auto-refresh interval', f'{ST_AUTO_REFRESH_INTERVAL} ms')
col_max_concurrent_limit.metric(
'Max concurrent limit', f'{MAX_CONCURRENT_LIMIT} tasks')
st.markdown(
'open powershell terminal and set up 3 environment variables.')
st.code('''setx LSTC_LICENSE "network"''')
st.code('''setx LSTC_LICENSE_SERVER "192.168.0.5"''')
st.code('''setx WSLENV "LSTC_LICENSE/u:LSTC_LICENSE_SERVER/u"''')
st.markdown(
f'Uncheck **RUN** before add, remove, move tasks. If **RUN** is checked, stem will continuously wait for tasks.')
Refresh
是一個st.container
。
REFRESH
的st.container
。refresh_button
的st.button
。refresh_button
。如果有按下的話,則執行st_sync
。#app.py
REFRESH = st.container()
with REFRESH:
refresh_button = st.button('Refresh')
if refresh_button:
st_sync()
st_sync
包含sync_task_cp_status
及sync_insertable_idx
兩個function
。sync_task_cp_status
更新當前所有task
的status
,而sync_insertable_idx
則取得當前task
可供插入的第一個index
。get_tasks
取得tasks
及get_sentinel
取得sentinel
。tasks
打一個迴圈,判斷每個task
的cp
是否被指定為除了sentinel
以外的值。如果是的話,透過cp.poll
判斷其回傳值是否為None
。
None
的話,代表該process
還在跑,將task.status
指定為TaskStatus.running
。None
的話,透過cp.returncode
判斷其值是否為0
。
0
的話,代表該process
已經結束,將task.status
指定為TaskStatus.finished
。0
的話,代表該process
可能有不正常情況發生,將task.status
指定為TaskStatus.notOK
。get_tasks
取得tasks
。tasks
打一個迴圈,直到遇到第一個task
的status
為TaskStatus.staging
時跳出迴圈。該將task
的index
指定給st.session_state['insertable_idx']
。#app.py
def sync_task_cp_status():
'''
cp.poll() is None means alive
returncode=0 means successful for subprocess
'''
tasks = get_tasks()
sentinel = get_sentinel()
for task in tasks:
cp = task.cp
if cp != sentinel:
if cp.poll() is None:
task.status = TaskStatus.running
else:
if cp.returncode == 0:
task.status = TaskStatus.finished
else:
task.status = TaskStatus.notOK
def sync_insertable_idx():
tasks = get_tasks()
idx = 0
for task in tasks:
if task.status == TaskStatus.staging:
break
idx += 1
st.session_state['insertable_idx'] = idx
def st_sync():
sync_task_cp_status()
sync_insertable_idx()
PERFORM_TASKS
是一個st.columns
。
PERFORM_TASKS
的st.columns
。
PERFORM_TASKS
中建立一個名為REMOVE_TASKS
的st.container
及一個名為REMOVE_TASKS_FORM
的st.form
。
REMOVE_TASKS_FORM
中透過st.multiselect
收集想要刪除的task id
,這些task
的status
皆需為TaskStatus.staging
。removed_task_button
。如果有按的話:
RUN
是否為打勾狀態:
task
。如果成功取得則呼叫remove_task
移除,失敗的話則顯示warning
。warning
。PERFORM_TASKS
中建立一個名為MOVE_TASK
的st.container
及一個名為MOVE_TASK_FORM
的st.form
。
MOVE_TASK_FORM
中透過st.selectbox
取得想移動task
的id
及透過st.number_input
取得想插入位置的index
。move_task_button
。如果有按的話:
RUN
是否為打勾狀態:
move_id
及target_idx
的話,則呼叫move_task
,失敗的話則顯示warning
。warning
。#app.py
PERFORM_TASKS, DASHBOARD = st.columns((1, 3))
with PERFORM_TASKS:
REMOVE_TASKS, REMOVE_TASKS_FORM = st.container(), st.form('remove-tasks-form')
with REMOVE_TASKS:
with REMOVE_TASKS_FORM:
s_removed_task_ids = st.multiselect('Select tasks to be removed',
get_staging_task_ids())
removed_task_button = st.form_submit_button('Remove tasks')
if removed_task_button:
if not RUN:
s_removed_tasks = [get_task_by_id(s_remove_task_id)
for s_remove_task_id in s_removed_task_ids]
if s_removed_tasks:
for s_removed_task in s_removed_tasks:
remove_task(s_removed_task)
else:
st.warning(f'Invalid operation.Nothing happened!')
else:
st.warning('Please uncheck RUN button first')
MOVE_TASK, MOVE_TASK_FORM = st.container(), st.form('move-task-form')
with MOVE_TASK:
with MOVE_TASK_FORM:
move_id = st.selectbox('Move the selected task :',
get_staging_task_ids())
target_idx = st.number_input('To position :',
1,
len(get_tasks()))
move_task_button = st.form_submit_button('Move task')
if move_task_button:
if not RUN:
if move_id and target_idx:
move_task(move_id, target_idx)
else:
st.warning('Invalid input')
else:
st.warning('Please uncheck RUN button first')
get_tasks
取得st.session.tasks
這個list
,接著利用list.remove
f移除st.session.tasks
中的task
。此處用try-catch
是因為remove_task
其實也可以使用在非GUI輸入時。#app.py
def remove_task(task):
tasks = get_tasks()
try:
tasks.remove(task)
except ValueError:
pass
get_task_by_id
取得move_task
。
move_task
不為None
的話:
inserted_idx >= get_insertable_idx()
且move_id != be_inserted_task.id
,則透過list.remove
移除move_task
且透過list.insert
插入move_task
至inserted_idx
(註1
)。warning
。None
的話則顯示warning
。#app.py
def move_task(move_id, target_idx):
move_task = get_task_by_id(move_id)
if move_task is not None:
inserted_idx = target_idx-1
be_inserted_task = tasks[inserted_idx]
conds = (inserted_idx >= get_insertable_idx(),
move_id != be_inserted_task.id)
if all(conds):
tasks.remove(move_task)
tasks.insert(inserted_idx, move_task)
else:
st.warning(f'Invalid operation.Nothing happened!')
else:
st.warning('No tasks can be moved now')
Dashboard
是一個st.columns
,可以顯示:
JobID
。solver type
與precision
。deck
。core
數。memory
。status
(task
執行狀態)。emogi
(以顏色表示task
執行狀態)。DASHBOARD
的st.columns
(實際建立的code在Perform_tasks
中)。get_ds
取得準備顯示於dashboard
中的list of dicts
,命名為ds
。ds
不為空的話:
st.empty
中,使用pd.DataFrame
將ds
轉換為dataframe
,並使用st.dataframe
呈現。st.empty
的功用是可於每次進行手動Refresh
或Auto-refresh
時,更新顯示的dataframe
。dataframe
寫出為csv
檔。#app.py
with DASHBOARD:
ds = get_ds(tasks)
if ds:
with st.empty():
df = pd.DataFrame(ds, index=range(1, len(ds)+1))
st.dataframe(df)
st.download_button(label="Export to CSV",
data=convert_df(df),
file_name=get_csv_filename(),
mime='text/csv')
於terminal
中確認虛擬環境已啟動後,輸入streamlit run app.py
即可開啟這個App
。
恭喜您有勇氣看到現在,今天的內容也是相當硬呀XD
註1:會要做這兩個檢查是因為,我們沒辦法讓move_id
及target_idx
的widget
做即時的顯示更新,所以改於程式中進行檢查。