iT邦幫忙

2022 iThome 鐵人賽

DAY 23
0
Software Development

或躍在淵的CAE: 讓咱們用Python會一會ANSA + LS-DYNA系列 第 23

[Day23] - Streamlit WSL2 LS-DYNA Job Submitter - stem(2)

  • 分享至 

  • xImage
  •  

今天我們來講解stem核心的code。

UI概覽

General

stem-run

README

stem-readme

Sidebar

stem-sidebar

CSV log

stem-log-csv

RUN

UI

RUN為一個st.checkbox,可以將其視為開始/暫停的切換開關。

  • RUN是有打勾狀態時,Streamlit會不斷Auto-refresh interval
  • RUN不在打勾狀態時,我們可以執行Remove tasksMove task

UI Code

  • 建立一個名為RUNst.checkbox
  • RUN是有打勾狀態時,我們計算當前所有taskstatus有多少是TaskStatus.running
  • 如果算出來的數字少於MAX_CONCURRENT_LIMIT,代表我們還有資源可以執行最少一個task。此時從tasks取出一個statusTaskStatus.stagingtask執行run_task
#app.py
RUN = st.checkbox('RUN')
if RUN:
    len_run_statuses = sum(1 for status in get_task_statuses()
                           if status == TaskStatus.running)
    if len_run_statuses < MAX_CONCURRENT_LIMIT:
        for task in tasks:
            if task.status == TaskStatus.staging:
                run_task(task.id)
                break

Run Task

  • 透過parse_dyna_folder parse task.cmd取得dyna_folder
  • 使用 subprocess.Popen呼叫subprocess.PopenWSL2下執行求解指令。由於是使用subprocess ,我們必須先cdparse_dyna_folder。請留意,此處的creationflags=subprocess.CREATE_NEW_CONSOLE是指每個subprocess皆會彈出一個新的視窗。如果於求解過程中,關閉視窗的話,該task就會結束。
  • 最後我們將subprocess.Popen的回傳值指定給task.cp。此回傳值即為subprocess.Popen所產生的process,可以作為我們後續與此process溝通的管道。
#app.py
def run_task():
    dyna_folder = parse_dyna_folder(task.cmd)
    cp = subprocess.Popen(['wsl', '--cd', dyna_folder, '-e', *task.cmd.split()],
                          creationflags=subprocess.CREATE_NEW_CONSOLE)
    task.cp = cp

Sidebar

UI

st.sidebar是調整執行LS-DYNA command的面板,可以調整的有:

  • LS-DYNA solver version
  • LS-DYNA solver type(目前只完成smp部份,mpp及hybrid待開發)。
  • LS-DYNA solver precision
  • LS-DYNA core數。
  • LS-DYNA memory使用量。
  • LS-DYNA 其餘指令(目前僅有d=nodump可選擇)。
  • 是否尋找所選資料架下的所有資料夾內的k檔(我們假設每個資料夾內,最多只會有一個k檔)。

UI Code

  • 建立一個名為ADD_TASKSst.sidebar
  • ADD_TASKS中建立一個名為ADD_TASKS_FORMst.form
  • ADD_TASKS_FORM中透過各種widget收集UI中的各項資訊。
  • 檢查使用者是否有按下add_tasks_button
    • 如果有按下的話,會檢查RUN是不是已經unchecked
      • 如果已經uncheck,則執行add_tasks
      • 如果沒有uncheck,則會跳出一個warning通知使用者。
#app.py
ADD_TASKS = st.sidebar
with ADD_TASKS:
    ADD_TASKS_FORM = st.form("add-tasks-form")
    with ADD_TASKS_FORM:
        solver_version_ = st.selectbox(
            'LS-DYNA version', solver_version_pool)
        solver_type_ = solver_type_mapping.get(st.selectbox(
            'LS-DYNA solver type', solver_type_mapping.keys()))

        solver_precision_ = solver_precision_mapping.get(st.selectbox(
            'LS-DYNA solver precision', solver_precision_mapping.keys()))
        ncpu_ = st.slider('LS-DYNA cores', 1, 24, 2)
        memory_ = st.slider('Memory(Mb)', 200, 2000, 200, 100)
        consoles_ = st.multiselect('Consoles', ['d=nodump'], ['d=nodump'])
        solver_name = solver_type_ + '-dyna_' + solver_precision_

        solver = '/'.join([solver_dir,  solver_version_, solver_name])
        ncpu = f'ncpu={ncpu_}'
        memory = f'memory={memory_}m'
        consoles = ' '.join(consoles_) if consoles_ else ''
        glob_ckbox = st.checkbox('Get *.k recursively (same config)', True)
        add_tasks_button = st.form_submit_button('? Add task(s)')
        if add_tasks_button:
            if not RUN:
                add_tasks(solver, ncpu, memory, consoles, glob_ckbox)
            else:
                st.warning('Please uncheck RUN button first')

add_tasks

  • 建立一個名為deckslist,用來收集檔案路徑。
  • 判斷glob_ckboxchecked還是unchecked
    • 如果是unchecked的話,代表我們僅需要submit單個task,這裡使用 filedialog.askopenfilename來獲取檔案路徑。如若使用者有選取檔案的話,透過tk_2_wsl2 將路徑轉換後,指定給decks
    • 如果是checked的話,代表我們需要submit一個資料夾下各資料夾內的k檔,這裡使用 filedialog.askdirectory來獲取資料夾路徑。如若使用者有選取資料夾的話,透過tk_2_wsl2 將各k檔路徑轉換後,置入decks
  • 最後依序將decks內的deck傳入add_task
#app.py
def add_tasks(solver, ncpu, memory, consoles, glob_ckbox):
    decks = []
    if not glob_ckbox:
        kfile = filedialog.askopenfilename(
            master=root,
            initialdir=initial_dir,
            filetypes=[('LS-DYNA file', '.key .k .i .dyn dynain .key.gz .k.gz .i.gz .dyn.gz')])
        if kfile:
            deck = f'i={tk_2_wsl2(kfile)}'
            decks = [deck]
    elif glob_ckbox:
        kfolder = filedialog.askdirectory(
            master=root, initialdir=initial_dir)
        if kfolder:
            for kfile in Path(kfolder).glob('**/*k'):
                deck = f'i={tk_2_wsl2(kfile.as_posix())}'
                decks.append(deck)
    for deck in decks:
        add_task(solver, deck, ncpu, memory, consoles)

add_task

  • 透過create_task_id建立獨特的識別碼,命名為task_id
  • 將要執行的commnad收集起來,命名為task_cmd
  • 透過get_sentinel取得獨特的sentinel,命名為task_cp
  • 收集task_idtask_cmdtask_cp
#app.py
def add_task(solver, deck, ncpu, memory, consoles):
    task_id = create_task_id()
    task_cmd = ' '.join(
        [solver, deck, ncpu, memory, consoles])
    task_cp = get_sentinel()
    task_data = {'id': task_id,
                 'cmd': task_cmd,
                 'cp': task_cp}
    task = Task(**task_data)

    tasks = get_tasks()
    tasks.append(task)
    return task

README

UI

README是一個可以收合的st.expander,裡面的資訊有:

  • Auto-refresh interval
  • Max concurrent limit
  • LSTC_LICENSELSTC_LICENSE_SERVERWSLENV三個所需環境變數的設定指令。

UI Code

  • 建立一個名為READMEst.expander
  • 使用st.metric來突顯Auto-refresh intervalMax concurrent limit
  • 透過st.code顯示三個需設定的環境變數。
#app.py
README = st.expander('README')
with README:
    col_st_auto_fresh_interval, col_max_concurrent_limit, * \
        _ = st.columns((1, 1, 4))
    col_st_auto_fresh_interval.metric(
        'Auto-refresh interval', f'{ST_AUTO_REFRESH_INTERVAL} ms')
    col_max_concurrent_limit.metric(
        'Max concurrent limit', f'{MAX_CONCURRENT_LIMIT} tasks')
    st.markdown(
        'open powershell terminal and set up 3 environment variables.')
    st.code('''setx LSTC_LICENSE "network"''')
    st.code('''setx LSTC_LICENSE_SERVER "192.168.0.5"''')
    st.code('''setx WSLENV "LSTC_LICENSE/u:LSTC_LICENSE_SERVER/u"''')
    st.markdown(
        f'Uncheck **RUN** before add, remove, move tasks. If **RUN** is checked, stem will continuously wait for tasks.')

Refresh

UI

Refresh是一個st.container

UI Code

  • 建立一個名為REFRESHst.container
  • 建立一個名為refresh_buttonst.button
  • 檢查使用者是否有按下refresh_button。如果有按下的話,則執行st_sync
#app.py
REFRESH = st.container()
with REFRESH:
    refresh_button = st.button('Refresh')
    if refresh_button:
        st_sync()

st_sync

  • st_sync包含sync_task_cp_statussync_insertable_idx兩個functionsync_task_cp_status更新當前所有taskstatus,而sync_insertable_idx則取得當前task可供插入的第一個index

sync_task_cp_status

  • 透過get_tasks取得tasksget_sentinel取得sentinel
  • tasks打一個迴圈,判斷每個taskcp是否被指定為除了sentinel以外的值。如果是的話,透過cp.poll判斷其回傳值是否為None
    • 如果是None的話,代表該process還在跑,將task.status指定為TaskStatus.running
    • 如果不是None的話,透過cp.returncode判斷其值是否為0
      • 如果為0的話,代表該process已經結束,將task.status指定為TaskStatus.finished
      • 如果不為0的話,代表該process可能有不正常情況發生,將task.status指定為TaskStatus.notOK

sync_insertable_idx

  • 透過get_tasks取得tasks
  • tasks打一個迴圈,直到遇到第一個taskstatusTaskStatus.staging時跳出迴圈。該將taskindex指定給st.session_state['insertable_idx']
#app.py
def sync_task_cp_status():
    '''
    cp.poll() is None means alive
    returncode=0 means successful for subprocess
    '''
    tasks = get_tasks()
    sentinel = get_sentinel()
    for task in tasks:
        cp = task.cp
        if cp != sentinel:
            if cp.poll() is None:
                task.status = TaskStatus.running
            else:
                if cp.returncode == 0:
                    task.status = TaskStatus.finished
                else:
                    task.status = TaskStatus.notOK


def sync_insertable_idx():
    tasks = get_tasks()
    idx = 0
    for task in tasks:
        if task.status == TaskStatus.staging:
            break
        idx += 1
    st.session_state['insertable_idx'] = idx


def st_sync():
    sync_task_cp_status()
    sync_insertable_idx()

Perform_tasks

UI

PERFORM_TASKS是一個st.columns

UI Code

  • 建立一個名為PERFORM_TASKSst.columns
    • PERFORM_TASKS中建立一個名為REMOVE_TASKSst.container及一個名為REMOVE_TASKS_FORMst.form
      • REMOVE_TASKS_FORM中透過st.multiselect收集想要刪除的task id,這些taskstatus皆需為TaskStatus.staging
      • 檢查使用者是否有按下removed_task_button。如果有按的話:
        • 檢查當下RUN是否為打勾狀態:
          • 如果不是打勾狀態,嘗試取得所有準備被移除的task。如果成功取得則呼叫remove_task移除,失敗的話則顯示warning
          • 如果是打勾狀態的話則顯示warning
    • PERFORM_TASKS中建立一個名為MOVE_TASKst.container及一個名為MOVE_TASK_FORMst.form
      • MOVE_TASK_FORM中透過st.selectbox取得想移動taskid及透過st.number_input取得想插入位置的index
      • 檢查使用者是否有按下move_task_button。如果有按的話:
        • 檢查當下RUN是否為打勾狀態:
          • 如果不是打勾狀態且有取得move_idtarget_idx的話,則呼叫move_task,失敗的話則顯示warning
          • 如果是打勾狀態的話則顯示warning
#app.py
PERFORM_TASKS, DASHBOARD = st.columns((1, 3))
with PERFORM_TASKS:
    REMOVE_TASKS, REMOVE_TASKS_FORM = st.container(), st.form('remove-tasks-form')
    with REMOVE_TASKS:
        with REMOVE_TASKS_FORM:
            s_removed_task_ids = st.multiselect('Select tasks to be removed',
                                                get_staging_task_ids())
            removed_task_button = st.form_submit_button('Remove tasks')
            if removed_task_button:
                if not RUN:
                    s_removed_tasks = [get_task_by_id(s_remove_task_id)
                                       for s_remove_task_id in s_removed_task_ids]
                    if s_removed_tasks:
                        for s_removed_task in s_removed_tasks:
                            remove_task(s_removed_task)
                    else:
                        st.warning(f'Invalid operation.Nothing happened!')
                else:
                    st.warning('Please uncheck RUN button first')

    MOVE_TASK, MOVE_TASK_FORM = st.container(), st.form('move-task-form')
    with MOVE_TASK:
        with MOVE_TASK_FORM:
            move_id = st.selectbox('Move the selected task :',
                                   get_staging_task_ids())

            target_idx = st.number_input('To position :',
                                         1,
                                         len(get_tasks()))

            move_task_button = st.form_submit_button('Move task')
            if move_task_button:
                if not RUN:
                    if move_id and target_idx:
                        move_task(move_id, target_idx)
                    else:
                        st.warning('Invalid input')
                else:
                    st.warning('Please uncheck RUN button first')

remove_task

  • 透過get_tasks取得st.session.tasks這個list,接著利用list.removef移除st.session.tasks中的task。此處用try-catch是因為remove_task其實也可以使用在非GUI輸入時。
#app.py
def remove_task(task):
    tasks = get_tasks()
    try:
        tasks.remove(task)
    except ValueError:
        pass

move_task

  • 透過get_task_by_id取得move_task
    • 如果move_task不為None的話:
      • 如果inserted_idx >= get_insertable_idx()move_id != be_inserted_task.id,則透過list.remove移除move_task且透過list.insert插入move_taskinserted_idx(註1)。
      • 如果不是的話則顯示warning
    • 如果為None的話則顯示warning
#app.py
def move_task(move_id, target_idx):
    move_task = get_task_by_id(move_id)
    if move_task is not None:
        inserted_idx = target_idx-1
        be_inserted_task = tasks[inserted_idx]
        conds = (inserted_idx >= get_insertable_idx(),
                 move_id != be_inserted_task.id)
        if all(conds):
            tasks.remove(move_task)
            tasks.insert(inserted_idx, move_task)
        else:
            st.warning(f'Invalid operation.Nothing happened!')
    else:
        st.warning('No tasks can be moved now')

Dashboard

UI

Dashboard是一個st.columns,可以顯示:

  • JobID
  • solver typeprecision
  • deck
  • 使用core數。
  • 使用memory
  • 其餘補充指令。
  • status(task執行狀態)。
  • emogi(以顏色表示task執行狀態)。

UI Code

  • 建立一個名為DASHBOARDst.columns(實際建立的code在Perform_tasks中)。
  • 透過get_ds取得準備顯示於dashboard中的list of dicts,命名為ds
  • 如果ds 不為空的話:
    • st.empty中,使用pd.DataFrameds轉換為dataframe,並使用st.dataframe呈現。st.empty的功用是可於每次進行手動RefreshAuto-refresh時,更新顯示的dataframe
    • 透過st.download_button製造出一個下載按鈕,將當前dataframe寫出為csv檔。
#app.py
with DASHBOARD:
    ds = get_ds(tasks)
    if ds:
        with st.empty():
            df = pd.DataFrame(ds, index=range(1, len(ds)+1))
            st.dataframe(df)
        st.download_button(label="Export to CSV",
                           data=convert_df(df),
                           file_name=get_csv_filename(),
                           mime='text/csv')

啟動

terminal中確認虛擬環境已啟動後,輸入streamlit run app.py即可開啟這個App
恭喜您有勇氣看到現在,今天的內容也是相當硬呀XD

備註

註1:會要做這兩個檢查是因為,我們沒辦法讓move_idtarget_idxwidget做即時的顯示更新,所以改於程式中進行檢查。

Code

本日程式碼傳送門(與Day22相同)


上一篇
[Day22] - Streamlit WSL2 LS-DYNA Job Submitter - stem(1)
下一篇
[Day24] - Batch Mesh介紹
系列文
或躍在淵的CAE: 讓咱們用Python會一會ANSA + LS-DYNA30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言