使用streamlit建立一個可以輸入EdgeQL、query_args及query_kwargs的form,並於submit之後,傳送query至EdgeDB Cloud執行(註1)。
ECC
├── ...
├── st_comps.py
├── st_data_structures.py
├── st_utils.py
└── streamlit_app.py
為擺放各種streamlit的widget及element的檔案。
只有一個FormContent的NamedTuple,於收集form內容時使用。
內有著許多小工具,我們挑選幾個比較重要的function來說明。
get_loop_dict與get_conn_dict皆使用@st.cache_resource裝飾。這麼一來,當任何session一進來,都可以取得同一個loop_dict與conn_dict,可以幫助我們使用現在的timestamp與存在其中的timestamp進行對比,進而執行想要的操作。
@st.cache_resource
def get_loop_dict() -> dict[str, Any]:
return {}
@st.cache_resource
def get_conn_dict() -> dict[str, Any]:
return {}
_routine_clean會取得現在的timestamp,以此計算get_loop_dict與get_conn_dict中是否有超過threshold的loop或conn。相當於每次呼叫streamlit_app.py時,會定時清除閒置過久的資源。
def _routine_clean(excluded_tokens: list[str],
threshold: float = 300) -> None:
cur_ts = get_cur_ts()
ld = get_loop_dict()
to_del_loop_tokens = {t
for t, (_, loop_ts) in ld.items()
if cur_ts - loop_ts > threshold}
for _token in excluded_tokens:
to_del_loop_tokens.discard(_token)
for k in to_del_loop_tokens:
try:
del ld[k]
except Exception as ex:
st.toast(f'{ex=} happened in del loops', icon="🚨")
cd = get_conn_dict()
to_del_conn_tokens = {t
for t, (_, conn_ts) in cd.items()
if cur_ts - conn_ts > threshold}
for _token in excluded_tokens:
to_del_conn_tokens.discard(_token)
for k in to_del_conn_tokens:
try:
del cd[k]
except Exception as ex:
st.toast(f'{ex=} happened in del conns', icon="🚨")
_populate_qry_args將接收到的str以;分隔,接著檢查各個arg_str是否為支援的型態。如果是的話,嘗試使用eval(arg_str)取得轉換後的型態,再append到qry_args,最後返回return tuple(qry_args)。
def _populate_qry_args(qry_args_str: str) -> tuple[Any, ...]:
qry_args: list[Any] = []
for arg_str in qry_args_str.split(';'):
if arg_str.strip() and \
isinstance(arg_str, (str, datetime.date, datetime.datetime)):
try:
eval_arg = eval(arg_str)
except SyntaxError as e:
st.warning(
'Can not parse the positional query arguments!')
raise e
else:
qry_args.append(eval_arg)
return tuple(qry_args)
_populate_qry_kwargs將接收到的str以;分隔,接著嘗試使用exec(kwarg_str.strip(), globals(), qry_kwargs),將kwarg_strpopulate到qry_kwargs中,並於最後返回qry_kwargs。
def _populate_qry_kwargs(qry_kwargs_str: str) -> dict[str, Any]:
qry_kwargs: dict[str, Any] = {}
for kwarg_str in qry_kwargs_str.split(';'):
try:
exec(kwarg_str.strip(), globals(), qry_kwargs)
except SyntaxError as e:
st.warning(
'Can not parse the named query arguments!')
raise e
return qry_kwargs
_convert_form_to_record將form接收的內容轉換為QueryRecord格式。
我們曾經想在_receive_required_single中的st.radio使用Enum,但streamlit常會報錯,所以只好接收str型態再使用convert_str_to_required_single轉為Enum。
def _convert_form_to_record(form: FormContent) -> QueryRecord:
qry = form.qry
extra_args = _populate_qry_args(form.qry_args_str)
jsonify = convert_bool_to_jsonify(form.jsonify)
required_single = convert_str_to_required_single(form.required_single)
extra_kwargs = _populate_qry_kwargs(form.qry_kwargs_str)
task_name = uuid.uuid4().hex[:6]
return QueryRecord(qry,
extra_args,
jsonify,
required_single,
extra_kwargs,
task_name)
_create_task_from_form使用傳入的tg來新增task。
async def _create_task_from_form(tg: asyncio.TaskGroup,
conn: EdgeDBCloudConn,
form: FormContent,
tasks: set[asyncio.Task[Any]]) -> None:
record = _convert_form_to_record(form)
async with conn:
task = tg.create_task(conn.query(record.qry,
*record.extra_args,
jsonify=record.jsonify,
required_single=record.required_single,
**record.extra_kwargs),
name=record.task_name)
tasks.add(task)
loop我們希望多個session能同時獨立操作,所以需要針對每個session建立loop及conn,無法簡單的呼叫asyncio.run就好。由於loop及conn都需要在每個session一開始就確定下來,所以如果將_prepare_loop或_prepare_conn移至其它檔案會出現問題。
...
import nest_asyncio
nest_asyncio.apply()
if 'token' not in st.session_state:
token = generate_token()
logging.info(f'Generating token: {token}')
st.session_state['token'] = token
if __name__ == '__main__':
cur_ts = get_cur_ts()
token = st.session_state.token
excluded_tokens = [token]
loop = _prepare_loop(cur_ts, token)
conn = _prepare_conn(cur_ts, token)
_display_res(token, loop, conn, excluded_tokens)
_routine_clean(excluded_tokens)
asyncio.set_event_loop(loop)
loop.run_until_complete(run(main, conn, token))
session一開始時,呼叫generate_token產生獨特的token,並儲存於st.session_state中。_prepare_loop準備loop。_prepare_conn準備conn。_display_res於最上方顯示resource,並生成refresh與try free resource兩個button。_routine_clean定時清理loop與conn。asyncio.set_event_loop設定loop。loop.run_until_complete執行run,啟動event loop。_prepare_loopget_loop_dict取得loop_dict。token是否在loop_dict中。如果不在的話,呼叫asyncio.new_event_loop建立一個新loop;如果在的話,從中取出loop。loop_dict[token] = (loop, cur_ts)更新timestamp。loop。def _prepare_loop(cur_ts: int, token: str) -> asyncio.AbstractEventLoop:
loop_dict = get_loop_dict()
if token not in loop_dict:
loop = asyncio.new_event_loop()
else:
loop, _ = loop_dict[token]
loop_dict[token] = (loop, cur_ts)
return loop
_prepare_connget_conn_dict取得conn_dict。token是否在conn_dict中。如果不在的話,呼叫asyncio.new_event_loop建立一個新conn;如果在的話,從中取出conn。conn_dict[token] = (conn, cur_ts)更新timestamp。conn。def _prepare_conn(cur_ts: int, token: str) -> EdgeDBCloudConn:
conn_dict = get_conn_dict()
if token not in conn_dict:
conn = EdgeDBCloudConn(**load_st_toml())
else:
conn, _ = conn_dict[token]
conn_dict[token] = (conn, cur_ts)
return conn
run為asyncio所執行的coroutine,其內為一個try-except*-else結構。
try中,使用asyncio.TaskGroup將app整體布局的algo(即main function)加入到task。請注意,這邊我們用到[Day26]新學到的技巧,來將tg往下傳給algo。這麼一來,除了當前這個task外,algo內也可以新增其它的task。except* Exception as ex中,經過render後,印出各Exception的錯誤資訊。else中,呈現query的結果。...
tasks = set()
async def run(algo, conn: EdgeDBCloudConn, token: str) -> None:
top_name = 'top'
try:
async with asyncio.TaskGroup() as tg:
task = tg.create_task(algo(tg, conn, token), name=top_name)
tasks.add(task)
except* Exception as ex:
for exc in ex.exceptions:
st.warning(f'Exception: {type(exc).__name__}')
_render_exception(exc)
else:
for task in tasks:
if (task_name := task.get_name()) != top_name:
st.write(f'task_name: {task_name}')
_render_result(task.result())
main為整個app的layout。
_display_sidebar,建立sidebar element。_get_query_form,建立form element。form被submit時,呼叫_create_tasks_for_form搜集form中各項資料並整理後,建立query task。_display_big_red_btn_and_db_calls,建立一個清除conn的小工具。...
async def main(tg: asyncio.TaskGroup, conn: EdgeDBCloudConn, token: str) -> None:
_display_sidebar()
form = _get_query_form()
if form.submitted:
await _create_tasks_for_form(tg, conn, form)
_display_big_red_btn_and_db_calls(conn, token)
streamlit cloud可以部署無限制的public app及一個private app。
您可以使用private app或是使用public app加上authenticator來部署。
部署的過程很簡單,只需完成下列各個選項。所有的credentials可以置於Advanced settings中,並於app內使用st.secrets存取。這相當於local開發時的.streamlit/secrets.toml。

這個project還有非常多可以改進的地方,例如:
get_loop_dict及get_conn_dict這樣的方法,在連線數較少時可以使用,但是當連線數較多時,記憶體使用量也會增加不少。或許我們可以將其轉為其它格式,例如pickle,然後使用另一個背景程式來定時改動及讀取pickle。Try Free Res是可以清除掉所有threshold大於3的loops及conns。_populate_qry_args與_populate_qry_kwargs需要使用比eval與exec更安全的處理方式。mocking引入tests。type annotation還有很大的進步空間。app是設計以asyncio長期等待task,所以db connection不需關閉。但當遇到shutdown時,如何有效關閉所有的connections,還需要好好想想。儘管如此,還是學習到了很多,例如:
asyncio.TaskGroup與ExceptionGroup來處理asyncio問題。streamlit_app.py。註1:EdgeDB Cloud已有很好的UI操控介面可以使用,本日內容純屬自我練習之用。
註2:需使用nest_asyncio.apply來防止更新loop或conn時,容易出現的RuntimeError: Task <Task pending name='Task-xxx' ...> attached to a different loop。