末翼我們將實作兩個小project,來活用前面九翼的內容。
[Day27]與[Day28]為project ECC,目標為實作一個可連接EdgeDB Cloud的EdgeDB cloud connection,並完成一個streamlit app作為前端。
Project postman,目標為研究傳遞decorator factory參數的各種可能方法。於七月底到八月初參加了strealit connections hackathon比賽(因為參加比賽就有送一件薄帽T...),使用EdgeDB的Blocking API實作。後來於八月底排到了EdgeDB Cloud的使用權限,加上又新學了asyncio.TaskGroup與Exception Groups,於是便想趁著鐵人賽,使用EdgeDB的AsyncIO API來重新實作看看。
EdgeDBCloudConn的class,並實作__aenter__與__aeit__,使該class可以作為async context manager使用。__aenter__與__aexit__時,logging進入訊息。__aexit__時,logging實際database呼叫次數及實際在context managet中所經歷的時間。query,並於每次query進行logging。Read的query需有快取機制。transactions功能。EdgeDB建基於Postgres之上,有著自己的EdgeQL語法與type system。其EdgeQL query會於底層compile為相對應的Postgres query,由於其不是一個ORM,所以理論上所有想對Postgres做的操作,應該都能使用更簡潔的EdgeQL語法達成。
Co-Founder兼CEO的Yury Selivanov是Python asyncio背後的主要推手,也是asyncio威力加強版uvloop的主要開發者。因此不難想像EdgeDB從一開始就以async思維開發,故其效率極高。下圖為官方的benchmark。
EdgeDB各語言的library都在活躍開發中,目前已經支援的有:
其雲端服務EdgeDB Cloud也已進入了Beta版,目前沒有對外開放,但可以加入watchlist或是到discord申請快速通關。
EdgeDB-Python為官方提供的library。由於我們的是目標是建立async的connection,所以直接查找說明文件中AsyncIO API。
經過一番查找之後,發現edgedb.create_async_client可以幫忙生成AsyncIOClient的instance,而AsyncIOClient提供六種不同情況的query function:
這幾個功能將是我們建立EdgeDBCloudConn的好幫手。
ECC架構如下:
ECC
├── ecc
│ ├── __init__.py
│ ├── connection.py
│ ├── data_structures.py
│ ├── queries.py
│ └── utils.py
│── edgedbcloud.toml
└── tests
├── __init__.py
├── test_healthy.py
├── test_imqry.py
├── tests_mqry.py
├── tests_qry_by_args.py
└── utils.py
ecc/connection.py:EdgeDBCloudConn class。ecc/data_structures.py:Enum及Namedtuple等資料結構。ecc/queries.py:提供寫好的EdgeQL。ecc/utils.py:小工具。內有兩個Enum及一個NamedTuple。
RespJson(Enum)內有NO及YES兩個member,並使用enum.auto為其自動賦值。其功用是用來區別query是否需要返回json格式,會於QueryRecord 中使用。
from enum import Enum, auto
class RespJson(Enum):
NO = auto()
YES = auto()
RespConstraint(Enum)內有FREE、NO_MORE_THAN_ONE及EXACTLY_ONE三個member,並使用enum.auto為其自動賦值。其功用是用來區別是否需要檢驗query返回結果的長度,會於QueryRecord 中使用。
...
class RespConstraint(Enum):
FREE = auto()
NO_MORE_THAN_ONE = auto()
EXACTLY_ONE = auto()
QueryRecord(NamedTuple)qry(str):EdgeQL語法的query str。extra_args(tuple):當需要Filter時使用。jsonify(RespJson):返回結果是否為json格式。required_single(RespConstraint):是否檢驗返回結果的長度。extra_kwargs(dict):當需要Filter時使用。task_name(str):asyncio task的task name。...
from typing import NamedTuple
class QueryRecord(NamedTuple):
qry: str
extra_args: tuple[Any, ...]
jsonify: RespJson
required_single: RespConstraint
extra_kwargs: dict[str, Any]
task_name: str
get_logger為一個輔助function來幫助我們取得logger instance。由於logging.getLogger是一個module-level function,只要名字不變的話,每次呼叫都可以取回同一個instance,所以不用顯性以參數在各個obj中傳遞。
def get_logger(logger_name: str = 'edgedb-cloud') -> logging.Logger:
return logging.getLogger(logger_name)
load_toml建立edgedbcloud.toml作為設定檔,並於其中定義一個edgedb-cloud table,輸入所需的參數。
host需要由EdgeDB cli登入EdgeDB cloud後,才能於command line或REPL中取得。EdgeDB預設使用port 5656。secret_key可以由指令取得,也可以由EdgeDB Cloud的UI取得。database為_example。其為EdgeDB提供練習用的database,可於EdgeDB Cloud中一鍵生成。ttl為immutable query的快取時間。[edgedb-cloud]
host = 'xxx.aws.edgedb.cloud'
port = 5656
secret_key = 'secret_key'
database = '_example'
ttl = 5
load_toml接受toml_name與table_name兩個參數,並回傳一個dict,其內是toml_name中table_name的所有參數。Python於3.11加入了tomllib模組,幫助我們讀取toml格式的檔案。請注意文件中特別指出tomllib.load需接受readable的binary object,所以於open的mode需指定為rb。
def load_toml(toml_name: str = 'edgedbcloud.toml',
table_name: str = 'edgedb-cloud') -> dict[str, Any]:
with open(toml_name, 'rb') as f:
data: dict[str, dict[str, Any]] = tomllib.load(f)
return data[table_name]
match_func_name幫助我們選擇AsyncIOClient query function的小工具。
AsyncIOClient的六種query function可以分成兩個大類:
query是否需要返回json格式,可依function名最後是否有_json來判斷。function名為query開頭。function名為query_single開頭。function名為query_required_single開頭。於是我們開始思考,如何用jsonify與required_single兩個參數來組合出這六個function呢?此外,又要用什麼來區別各種可能的值呢?神奇的12345?還是singleton的True、False、None等?
Python的Enum可能是一個不錯的解決方法,於是我們在data_structures.py建立了RespJson與RespConstraint兩個Enum。值得一提的是,因為我們只會比較Enum member的entity,而不會比較其值,所以其值是多少並不重要,這也是為什麼會使用enum.auto自動賦值的原因。
match_func_name依靠structural pattern matching的match enum的功能,來取得相對應的function名。我們於match_func_name的最後,有設定一個catch all的case _,並於其中使用Python3.11新增的assert_never。
def match_func_name(jsonify: RespJson, required_single: RespConstraint) -> str:
match (jsonify, required_single):
case (RespJson.NO, RespConstraint.FREE):
func_name = 'query'
case (RespJson.NO, RespConstraint.NO_MORE_THAN_ONE):
func_name = 'query_single'
case (RespJson.NO, RespConstraint.EXACTLY_ONE):
func_name = 'query_required_single'
case (RespJson.YES, RespConstraint.FREE):
func_name = 'query_json'
case (RespJson.YES, RespConstraint.NO_MORE_THAN_ONE):
func_name = 'query_single_json'
case (RespJson.YES, RespConstraint.EXACTLY_ONE):
func_name = 'query_required_single_json'
case _ as unreachable:
assert_never(unreachable)
return func_name
pack_imqry_records預先打包八種常用的immutable query為一list,作為測試之用。每個query型式都是一個QueryRecord的instance。
def pack_imqry_records() -> list[QueryRecord]:
qries = ['SELECT Movie {title};',
*['''SELECT assert_single(
(SELECT Movie {title, release_year}
FILTER .title=<str>$title and
.release_year=<int64>$release_year));''']*3,
'SELECT Account {username};',
*['''SELECT assert_single(
(SELECT Account {username}
FILTER .username=<str>$username))''']*3]
args_collector = [()]*8
jsons = [*[RespJson.NO]*4, *[RespJson.YES]*4]
required_singles = [RespConstraint.FREE,
*[RespConstraint.NO_MORE_THAN_ONE]*2,
RespConstraint.EXACTLY_ONE]*2
kwargs_collector = [{},
{'title': 'Ant-Man', 'release_year': 2015},
{'title': 'Ant-Man100', 'release_year': 2015},
{'title': 'Ant-Man', 'release_year': 2015},
{},
{'username': 'Alice'},
{'username': 'AliceCCC'},
{'username': 'Alice'}]
task_names = [*[f'QueryMovie{n}' for n in range(4)],
*[f'QueryAccount{n}' for n in range(4)]]
return [QueryRecord(*qr)
for qr in zip(qries,
args_collector,
jsons,
required_singles,
kwargs_collector,
task_names)]
pack_mqry_records與pack_imqry_records類似,只是包含的是兩種mutable query。
def pack_mqry_records() -> list[QueryRecord]:
qries = ['''WITH p := (INSERT Person {name:=<str>$name})
SELECT p {name};''',
'''WITH p:= (DELETE Person FILTER .name=<str>$name)
SELECT p {name};''']
args_collector = [()]*2
jsons = [RespJson.NO]*2
required_singles = [RespConstraint.FREE]*2
kwargs_collector = [{'name': 'Adam Gramham'}]*2
task_names = ['insert', 'delete']
return [QueryRecord(*qr)
for qr in zip(qries,
args_collector,
jsons, required_singles,
kwargs_collector,
task_names)]
pack_imqry_records_by_args為測試是否能順利使用像$0或$1的語法進行query, 目前僅包含一個immutable query。
def pack_imqry_records_by_args() -> list[QueryRecord]:
qries = ['''SELECT Movie {title, release_year}
FILTER .title=<str>$0 and .release_year=<int64>$1;''']
args_collector = [('Ant-Man', 2015)]
jsons = [RespJson.NO]
required_singles = [RespConstraint.FREE]
kwargs_collector: list[dict[str, Any]] = [{}]
task_names = ['QueryMovieTitleByArgs']
return [QueryRecord(*qr)
for qr in zip(qries,
args_collector,
jsons,
required_singles,
kwargs_collector,
task_names)]
其內只有EdgeDBCloudConn class。
當database在一定時間內,通常不會變動的情況下,可以設定一個快取時間ttl。在ttl內如果使用同樣的query與參數來讀取資料時,可以直接回傳快取結果,而不真正呼叫database。
但是當database頻繁變動的話,對這類query進行快取就有很多眉角要注意。究竟使用者是真的想要快速發出多次同樣的mutable query?還是可能因為網路問題或retry等邏輯沒寫好,不小心發送多次,而我們應該只呼叫一次database就好?
因此我們決定EdgeDBCloudConn預設ttl=0,即沒有快取。當ttl>0時,會使用alru_cache來將_imquery包上快取設定,而_mquery則一律執行。
AbstractAsyncContextManagercontextlib.AbstractAsyncContextManager是Python提供的abstract base class。由於EdgeDBCloudConn class將會實作__aenter__與__aexit__,所以在此繼承AbstractAsyncContextManager是個絕佳的應用。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
__init__共接收七個變數(註2)。
host、port、database與secret_key將由load_toml讀取edgedb.toml所得,為實際建立連接所需要的參數。ttl為設定的快取時間,預設為0,即不快取。logger為指定的logger instance。當沒有指定的時候,會呼叫get_logger來取得一個預設的logger。log_level為想要記錄的層級,當沒有指定的時候,設定為logging.INFO。因為logger及log_level,這兩個變數所要傳遞的值比較明確,所以我們使用了or的語法,而不顯性比較是否為None。
此外:
self._client將會是AsyncIOClient建立的client之變數名,先預設為None。self._start為計算進入__aenter__與離開__aexit__所用時間之用。self._dbcalls為計算於__aenter__與__aexit__中實際呼叫database的次數之用。self._total_dbcalls為計算實際呼叫database的總次數之用。最後一個if是用來設定immutable query的快取機制。這個手法相當微妙,這使得我們將會由self.__dict__中來存取self._imquery。現在的情況是:
self._imquery為一般function,只是non-data descriptor而不是data descriptor,所以當我們使用self._imquery = alru_cache(ttl=ttl)(self._imquery)的語法時,相當於在self.__dict__中,加入一個已經包過alru_cache(ttl=ttl)的_imquery。EdgeDBCloudConn.__dict__中,還是保有原來沒加上alru_cache的_imquery。 def __init__(self,
*,
host: str,
port: int,
database: str,
secret_key: str,
ttl: float = 0,
logger: logging.Logger | None = None,
log_level: int | None = None) -> None:
self._host = host
self._port = port
self._database = database
self._secret_key = secret_key
self._logger = logger or get_logger()
self._log_level = log_level or logging.INFO
self._logger.setLevel(self._log_level)
self._client: EdgeDBAsyncClient | None = None
self._start = 0.0
self._dbcalls = 0
self._total_dbcalls = 0
if ttl > 0:
self._imquery = alru_cache(ttl=ttl)(self._imquery)
client為一property,用來包住底層的self._client。由於我們希望只建立一個client,所以每當self.client被呼叫時,我們會先檢查self._client是否為None,如果是的話,表示我們還沒有建立client,此時會先呼叫edgedb.create_async_client並搭配由load_toml所提供的各個參數來建立async-client。由於在建立client後,就不需要用到self._secret_key,與其讓它待在instance內,我們選擇刪除它,最後回傳self._client。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
@property
def client(self) -> EdgeDBAsyncClient:
if self._client is None:
self._client = edgedb.create_async_client(host=self._host,
port=self._port,
database=self._database,
secret_key=self._secret_key)
del self._secret_key
return self._client
_get_client_qry_func幫助我們實際由self.client,取到名字為match_func_name回傳值的function。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
def _get_client_qry_func(self,
jsonify: RespJson,
required_single: RespConstraint) -> Callable[..., Any]:
return getattr(self.client, match_func_name(jsonify, required_single))
get_cur_timestamp為回傳timestamp的function,可以幫助計算快取時間。get_cur_timestamp雖與self或cls的狀態無關,卻是EdgeDBCloudConn class的好幫手,所以我們設計其為static method。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
@staticmethod
def get_cur_timestamp() -> float:
return datetime.now().timestamp()
_is_qry_immutable是用來判斷我們的query內是否含有可能會mutate database的關鍵字。我們定義當query內含有insert、update或 delete時,就將此query判定為會mutate database。由於_mutated_kws不會隨著不同instance而改變,所以我們將其設為class variable。
class EdgeDBCloudConn(AbstractAsyncContextManager):
_mutated_kws = ('insert', 'update', 'delete')
...
def _is_qry_immutable(self, qry: str) -> bool:
return all(mutated_kw not in qry.casefold()
for mutated_kw in self._mutated_kws)
query、_query、_imquery與_mquery這四個都是async function。query會依照_is_qry_immutable判斷該將query轉到_imquery或是_mquery。而_imquery與_mquery程式其實完全一樣,都是再將query轉到_query。這樣的用意是方便我們於__init__中於ttl>0時,可以動態加上alru_cache至_imquery。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def query(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
if self._is_qry_immutable(qry):
return await self._imquery(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
return await self._mquery(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
async def _imquery(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
return await self._query(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
async def _mquery(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
return await self._query(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
_query中:
logging.info記錄其實際被呼叫。self._dbcalls加上1。_get_client_qry_func取回的function,搭配qry、*args及**kwargs進行呼叫,並回傳計算結果。class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def _query(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
self._logger.info(self._fmt_query_log_msg(
qry, args, jsonify, required_single, kwargs))
self._dbcalls += 1
qry_func = self._get_client_qry_func(jsonify, required_single)
return await qry_func(qry, *args, **kwargs)
__aenter__logging.info記錄。self.get_cur_timestamp()將其值賦予self._start。class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def __aenter__(self) -> Self:
self._logger.info(self._fmt_enter_aenter_log_msg())
self._start = self.get_cur_timestamp()
return self
__aexit__await asyncio.sleep(1e-5)的原因是,這樣可以方便UI可以即時更新。logging.info,一次記錄已經進入__aexit__,另一次記錄實際呼叫database次數。exception發生的話,以logging.error記錄。self._dbcalls加總至self._total_dbcalls後,呼叫self._reset_db_calls重設self._dbcalls為0。__aexit__前,計算在with中的時間,並以logging.info記錄。__aexit__,呼叫self._reset_start重設self._start為0.0。class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def __aexit__(self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None) -> None:
await asyncio.sleep(1e-5)
self._logger.info(self._fmt_enter_aexit_log_msg())
self._logger.info(self._fmt_db_calls_log_msg())
if exc_type:
self._logger.error(self._fmt_aexit_exception_log_msg(exc_value))
self._total_dbcalls += self._dbcalls
self._reset_db_calls()
elapsed = self.get_cur_timestamp() - self._start
self._logger.info(self._fmt_exit_aexit_log_msg(elapsed))
self._reset_start()
aclose可以顯式呼叫底層client的self.client.aclose。請注意我們在__aexit__內,並沒有顯式的呼叫await self.aclose(),原因是現在很多database都具有pooling的機制,包括EdgeDB。
async def aclose(self, timeout: float = 5) -> None:
print('aclose called')
await asyncio.wait_for(self.client.aclose(), timeout)
_healthy_check_url與is_healthy_healthy_check_url是EdgeDB內建能判斷database現在狀態的url(註3)。
is_healthy針對self._healthy_check_url發出GET request,並檢查返回status_code是否為200,來判斷database是否健康。如果遭遇到任何錯誤,設定回傳False。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
@property
def _healthy_check_url(self) -> str:
return f'https://{self._host}:{self._port}/server/status/alive'
@property
def is_healthy(self) -> bool:
"""https://www.edgedb.com/docs/guides/deployment/health_checks#health-checks"""
try:
return httpx.get(self._healthy_check_url,
follow_redirects=True,
verify=False,
timeout=30).status_code == 200
except httpx.HTTPError:
return False
_fmt_*開頭的多個function皆為返回str,供logger所用,在此不加贅述。
_reset_*開頭的多個function為重設時間或資料庫呼叫次數所用。
總共有四個TestClass,都是實際對database進行query,沒有mocking(註4)。
TestHealthy測試_healthy_check_url及is_healthy。TestImqryCachedConn測試在少量及大量query時都有進行快取。TestImqryNonCachedConn測試其沒有快取機制。TestMqryConn測試其沒有快取機制,且可正常insert及delete。註1:由於這個project的目標是建立與EdgeDB cloud連結的connection,而不是學習EdgeQL的語法。如果是對其語法有興趣的朋友:
Rust影片解說,此外他也是Learn Rust in a Month of Lunches的作者。註2:如果您想要的是一個可以連接local或是host在其它雲端的EdgeDB connection,需要考慮較複雜的建立方式,包括支援DSN。
註3:如果這是連接local或是ip的connection,此healthy_check_url網址可能使用http而非https。
註4:當在Windows上測試時,會出現ResourceWarning: Enable tracemalloc to get the object allocation traceback,需要再研究資源關閉的問題。