iT邦幫忙

2023 iThome 鐵人賽

DAY 27
0
Software Development

Python十翼:與未來的自己對話系列 第 27

[Day27] 末翼 - Term Projects:Project ECC - 建立EdgeDB Cloud Connection(1)

  • 分享至 

  • xImage
  •  

末翼大綱

末翼我們將實作兩個小project,來活用前面九翼的內容。

Project ECC

[Day27]與[Day28]為project ECC,目標為實作一個可連接EdgeDB CloudEdgeDB cloud connection,並完成一個streamlit app作為前端。

  • [Day27]EdgeDBCloudConn class實作及實際測試。
  • [Day28]建立一個streamlit app,並使用EdgeDBCloudConn連接EdgeDB Cloud

Project postman

  • [Day29]Project postman,目標為研究傳遞decorator factory參數的各種可能方法。

ECC源起

於七月底到八月初參加了strealit connections hackathon比賽(因為參加比賽就有送一件薄帽T...),使用EdgeDBBlocking API實作。後來於八月底排到了EdgeDB Cloud的使用權限,加上又新學了asyncio.TaskGroupException Groups,於是便想趁著鐵人賽,使用EdgeDBAsyncIO API來重新實作看看。

ECC目標

  • 建立一個EdgeDBCloudConnclass,並實作__aenter____aeit__,使該class可以作為async context manager使用。
  • 於進入__aenter____aexit__時,logging進入訊息。
  • 於離開__aexit__時,logging實際database呼叫次數及實際在context managet中所經歷的時間。
  • 可以使用async語法進行簡單query,並於每次query進行logging
  • Readquery需有快取機制。
  • 不需著墨於transactions功能。

EdgeDB

EdgeDB建基於Postgres之上,有著自己的EdgeQL語法與type system。其EdgeQL query會於底層compile為相對應的Postgres query,由於其不是一個ORM,所以理論上所有想對Postgres做的操作,應該都能使用更簡潔的EdgeQL語法達成。

Co-Founder兼CEO的Yury Selivanov是Python asyncio背後的主要推手,也是asyncio威力加強版uvloop的主要開發者。因此不難想像EdgeDB從一開始就以async思維開發,故其效率極高。下圖為官方的benchmark。

benchmark

EdgeDB各語言的library都在活躍開發中,目前已經支援的有:

  • Python
  • TypeScript/JavaScript
  • Go
  • Rust
  • .NET
  • Java
  • Elixir

其雲端服務EdgeDB Cloud也已進入了Beta版,目前沒有對外開放,但可以加入watchlist或是到discord申請快速通關。

EdgeDB-Python

EdgeDB-Python為官方提供的library。由於我們的是目標是建立async的connection,所以直接查找說明文件中AsyncIO API

經過一番查找之後,發現edgedb.create_async_client可以幫忙生成AsyncIOClientinstance,而AsyncIOClient提供六種不同情況的query function

  • AsyncIOClient.query()
  • AsyncIOClient.query_single()
  • AsyncIOClient.query_required_single()
  • AsyncIOClient.query_json()
  • AsyncIOClient.query_single_json()
  • AsyncIOClient.query_required_single_json()

這幾個功能將是我們建立EdgeDBCloudConn的好幫手。

ECC架構

ECC架構如下:

ECC
├── ecc
│   ├── __init__.py
│   ├── connection.py
│   ├── data_structures.py
│   ├── queries.py
│   └── utils.py
│── edgedbcloud.toml
└── tests
    ├── __init__.py
    ├── test_healthy.py
    ├── test_imqry.py
    ├── tests_mqry.py
    ├── tests_qry_by_args.py
    └── utils.py
  • ecc/connection.pyEdgeDBCloudConn class
  • ecc/data_structures.pyEnumNamedtuple等資料結構。
  • ecc/queries.py:提供寫好的EdgeQL
  • ecc/utils.py:小工具。

data_structures.py

內有兩個Enum及一個NamedTuple

RespJson(Enum)

內有NOYES兩個member,並使用enum.auto為其自動賦值。其功用是用來區別query是否需要返回json格式,會於QueryRecord 中使用。

from enum import Enum, auto


class RespJson(Enum):
    NO = auto()
    YES = auto()

RespConstraint(Enum)

內有FREENO_MORE_THAN_ONEEXACTLY_ONE三個member,並使用enum.auto為其自動賦值。其功用是用來區別是否需要檢驗query返回結果的長度,會於QueryRecord 中使用。

...

class RespConstraint(Enum):
    FREE = auto()
    NO_MORE_THAN_ONE = auto()
    EXACTLY_ONE = auto()

QueryRecord(NamedTuple)

  • qrystr):EdgeQL語法的query str
  • extra_argstuple):當需要Filter時使用。
  • jsonifyRespJson):返回結果是否為json格式。
  • required_singleRespConstraint):是否檢驗返回結果的長度。
  • extra_kwargsdict):當需要Filter時使用。
  • task_namestr):asyncio tasktask name
...
from typing import NamedTuple


class QueryRecord(NamedTuple):
    qry: str
    extra_args: tuple[Any, ...]
    jsonify: RespJson
    required_single: RespConstraint
    extra_kwargs: dict[str, Any]
    task_name: str

utils.py

get_logger

為一個輔助function來幫助我們取得logger instance。由於logging.getLogger是一個module-level function,只要名字不變的話,每次呼叫都可以取回同一個instance,所以不用顯性以參數在各個obj中傳遞。

def get_logger(logger_name: str = 'edgedb-cloud') -> logging.Logger:
    return logging.getLogger(logger_name)

load_toml

建立edgedbcloud.toml作為設定檔,並於其中定義一個edgedb-cloud table,輸入所需的參數。

  • host需要由EdgeDB cli登入EdgeDB cloud後,才能於command lineREPL中取得。
  • EdgeDB預設使用port 5656
  • secret_key可以由指令取得,也可以由EdgeDB Cloud的UI取得。
  • database_example。其為EdgeDB提供練習用的database,可於EdgeDB Cloud中一鍵生成。
  • ttlimmutable query的快取時間。
[edgedb-cloud]
host = 'xxx.aws.edgedb.cloud'
port = 5656
secret_key = 'secret_key'
database = '_example'
ttl = 5

load_toml接受toml_nametable_name兩個參數,並回傳一個dict,其內是toml_nametable_name的所有參數。Python於3.11加入了tomllib模組,幫助我們讀取toml格式的檔案。請注意文件中特別指出tomllib.load需接受readablebinary object,所以於openmode需指定為rb

def load_toml(toml_name: str = 'edgedbcloud.toml',
              table_name: str = 'edgedb-cloud') -> dict[str, Any]:
    with open(toml_name, 'rb') as f:
        data: dict[str, dict[str, Any]] = tomllib.load(f)
    return data[table_name]

match_func_name

幫助我們選擇AsyncIOClient query function的小工具。

AsyncIOClient的六種query function可以分成兩個大類:

  • 一類是這個query是否需要返回json格式,可依function名最後是否有_json來判斷。
  • 一類是返回結果的長度,是否符合預期,總共又分成三類:
    • 不設限制,function名為query開頭。
    • 回傳長度不能超過一,function名為query_single開頭。
    • 回傳長度必須恰恰是一,function名為query_required_single開頭。

於是我們開始思考,如何用jsonifyrequired_single兩個參數來組合出這六個function呢?此外,又要用什麼來區別各種可能的值呢?神奇的12345?還是singletonTrueFalseNone等?

Python的Enum可能是一個不錯的解決方法,於是我們在data_structures.py建立了RespJsonRespConstraint兩個Enum。值得一提的是,因為我們只會比較Enum memberentity,而不會比較其值,所以其值是多少並不重要,這也是為什麼會使用enum.auto自動賦值的原因。

match_func_name依靠structural pattern matchingmatch enum的功能,來取得相對應的function名。我們於match_func_name的最後,有設定一個catch allcase _,並於其中使用Python3.11新增的assert_never

def match_func_name(jsonify: RespJson, required_single: RespConstraint) -> str:
    match (jsonify, required_single):
        case (RespJson.NO, RespConstraint.FREE):
            func_name = 'query'
        case (RespJson.NO, RespConstraint.NO_MORE_THAN_ONE):
            func_name = 'query_single'
        case (RespJson.NO, RespConstraint.EXACTLY_ONE):
            func_name = 'query_required_single'
        case (RespJson.YES, RespConstraint.FREE):
            func_name = 'query_json'
        case (RespJson.YES, RespConstraint.NO_MORE_THAN_ONE):
            func_name = 'query_single_json'
        case (RespJson.YES, RespConstraint.EXACTLY_ONE):
            func_name = 'query_required_single_json'
        case _ as unreachable:
            assert_never(unreachable)
    return func_name

queries.py

pack_imqry_records

預先打包八種常用的immutable query為一list,作為測試之用。每個query型式都是一個QueryRecordinstance

def pack_imqry_records() -> list[QueryRecord]:
    qries = ['SELECT Movie {title};',
             *['''SELECT assert_single(
                         (SELECT Movie {title, release_year} 
                          FILTER .title=<str>$title and 
                                 .release_year=<int64>$release_year));''']*3,
             'SELECT Account {username};',
             *['''SELECT assert_single(
                         (SELECT Account {username} 
                          FILTER .username=<str>$username))''']*3]
    args_collector = [()]*8
    jsons = [*[RespJson.NO]*4, *[RespJson.YES]*4]
    required_singles = [RespConstraint.FREE,
                        *[RespConstraint.NO_MORE_THAN_ONE]*2,
                        RespConstraint.EXACTLY_ONE]*2
    kwargs_collector = [{},
                        {'title': 'Ant-Man', 'release_year': 2015},
                        {'title': 'Ant-Man100', 'release_year': 2015},
                        {'title': 'Ant-Man', 'release_year': 2015},
                        {},
                        {'username': 'Alice'},
                        {'username': 'AliceCCC'},
                        {'username': 'Alice'}]
    task_names = [*[f'QueryMovie{n}' for n in range(4)],
                  *[f'QueryAccount{n}' for n in range(4)]]

    return [QueryRecord(*qr)
            for qr in zip(qries,
                          args_collector,
                          jsons,
                          required_singles,
                          kwargs_collector,
                          task_names)]

pack_mqry_records

pack_imqry_records類似,只是包含的是兩種mutable query

def pack_mqry_records() -> list[QueryRecord]:
    qries = ['''WITH p := (INSERT Person {name:=<str>$name}) 
           SELECT p {name};''',
             '''WITH p:= (DELETE Person FILTER .name=<str>$name) 
           SELECT p {name};''']
    args_collector = [()]*2
    jsons = [RespJson.NO]*2
    required_singles = [RespConstraint.FREE]*2
    kwargs_collector = [{'name': 'Adam Gramham'}]*2
    task_names = ['insert', 'delete']

    return [QueryRecord(*qr)
            for qr in zip(qries,
                          args_collector,
                          jsons, required_singles,
                          kwargs_collector,
                          task_names)]

pack_imqry_records_by_args

為測試是否能順利使用像$0$1的語法進行query, 目前僅包含一個immutable query

def pack_imqry_records_by_args() -> list[QueryRecord]:
    qries = ['''SELECT Movie {title, release_year} 
                FILTER .title=<str>$0 and .release_year=<int64>$1;''']
    args_collector = [('Ant-Man', 2015)]
    jsons = [RespJson.NO]
    required_singles = [RespConstraint.FREE]
    kwargs_collector: list[dict[str, Any]] = [{}]
    task_names = ['QueryMovieTitleByArgs']

    return [QueryRecord(*qr)
            for qr in zip(qries,
                          args_collector,
                          jsons,
                          required_singles,
                          kwargs_collector,
                          task_names)]

connection.py

其內只有EdgeDBCloudConn class

database在一定時間內,通常不會變動的情況下,可以設定一個快取時間ttl。在ttl內如果使用同樣的query與參數來讀取資料時,可以直接回傳快取結果,而不真正呼叫database

但是當database頻繁變動的話,對這類query進行快取就有很多眉角要注意。究竟使用者是真的想要快速發出多次同樣的mutable query?還是可能因為網路問題或retry等邏輯沒寫好,不小心發送多次,而我們應該只呼叫一次database就好?

因此我們決定EdgeDBCloudConn預設ttl=0,即沒有快取。當ttl>0時,會使用alru_cache來將_imquery包上快取設定,而_mquery則一律執行。

繼承AbstractAsyncContextManager

contextlib.AbstractAsyncContextManager是Python提供的abstract base class。由於EdgeDBCloudConn class將會實作__aenter____aexit__,所以在此繼承AbstractAsyncContextManager是個絕佳的應用。

class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...

__init__

共接收七個變數(註2)。

  • hostportdatabasesecret_key將由load_toml讀取edgedb.toml所得,為實際建立連接所需要的參數。
  • ttl為設定的快取時間,預設為0,即不快取。
  • logger為指定的logger instance。當沒有指定的時候,會呼叫get_logger來取得一個預設的logger
  • log_level為想要記錄的層級,當沒有指定的時候,設定為logging.INFO

因為loggerlog_level,這兩個變數所要傳遞的值比較明確,所以我們使用了or的語法,而不顯性比較是否為None

此外:

  • self._client將會是AsyncIOClient建立的client之變數名,先預設為None
  • self._start為計算進入__aenter__與離開__aexit__所用時間之用。
  • self._dbcalls為計算於__aenter____aexit__中實際呼叫database的次數之用。
  • self._total_dbcalls為計算實際呼叫database的總次數之用。

最後一個if是用來設定immutable query的快取機制。這個手法相當微妙,這使得我們將會由self.__dict__中來存取self._imquery。現在的情況是:

  • 由於self._imquery為一般function,只是non-data descriptor而不是data descriptor,所以當我們使用self._imquery = alru_cache(ttl=ttl)(self._imquery)的語法時,相當於在self.__dict__中,加入一個已經包過alru_cache(ttl=ttl)_imquery
  • EdgeDBCloudConn.__dict__中,還是保有原來沒加上alru_cache_imquery
    def __init__(self,
                 *,
                 host: str,
                 port: int,
                 database: str,
                 secret_key: str,
                 ttl: float = 0,
                 logger: logging.Logger | None = None,
                 log_level: int | None = None) -> None:
        self._host = host
        self._port = port
        self._database = database
        self._secret_key = secret_key
        self._logger = logger or get_logger()
        self._log_level = log_level or logging.INFO
        self._logger.setLevel(self._log_level)

        self._client: EdgeDBAsyncClient | None = None
        self._start = 0.0
        self._dbcalls = 0
        self._total_dbcalls = 0

        if ttl > 0:
            self._imquery = alru_cache(ttl=ttl)(self._imquery)

client

為一property,用來包住底層的self._client。由於我們希望只建立一個client,所以每當self.client被呼叫時,我們會先檢查self._client是否為None,如果是的話,表示我們還沒有建立client,此時會先呼叫edgedb.create_async_client並搭配由load_toml所提供的各個參數來建立async-client。由於在建立client後,就不需要用到self._secret_key,與其讓它待在instance內,我們選擇刪除它,最後回傳self._client

class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...
    
    @property
    def client(self) -> EdgeDBAsyncClient:
        if self._client is None:
            self._client = edgedb.create_async_client(host=self._host,
                                                      port=self._port,
                                                      database=self._database,
                                                      secret_key=self._secret_key)
            del self._secret_key
        return self._client

_get_client_qry_func

幫助我們實際由self.client,取到名字為match_func_name回傳值的function

class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...

    def _get_client_qry_func(self,
                             jsonify: RespJson,
                             required_single: RespConstraint) -> Callable[..., Any]:
        return getattr(self.client, match_func_name(jsonify, required_single))

get_cur_timestamp

為回傳timestampfunction,可以幫助計算快取時間。get_cur_timestamp雖與selfcls的狀態無關,卻是EdgeDBCloudConn class的好幫手,所以我們設計其為static method。

class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...
    
    @staticmethod
    def get_cur_timestamp() -> float:
        return datetime.now().timestamp()

_is_qry_immutable

是用來判斷我們的query內是否含有可能會mutate database的關鍵字。我們定義當query內含有insertupdatedelete時,就將此query判定為會mutate database。由於_mutated_kws不會隨著不同instance而改變,所以我們將其設為class variable

class EdgeDBCloudConn(AbstractAsyncContextManager):
    _mutated_kws = ('insert', 'update', 'delete')
    
    ...
    
    def _is_qry_immutable(self, qry: str) -> bool:
        return all(mutated_kw not in qry.casefold()
                   for mutated_kw in self._mutated_kws)

query_query_imquery_mquery

這四個都是async functionquery會依照_is_qry_immutable判斷該將query轉到_imquery或是_mquery。而_imquery_mquery程式其實完全一樣,都是再將query轉到_query。這樣的用意是方便我們於__init__中於ttl>0時,可以動態加上alru_cache_imquery

class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...
    
    async def query(self,
                    qry: str,
                    *args: Any,
                    jsonify: RespJson = RespJson.NO,
                    required_single: RespConstraint = RespConstraint.FREE,
                    **kwargs: Any) -> QueryResult:
        if self._is_qry_immutable(qry):
            return await self._imquery(qry,
                                       *args,
                                       jsonify=jsonify,
                                       required_single=required_single,
                                       **kwargs)
        return await self._mquery(qry,
                                  *args,
                                  jsonify=jsonify,
                                  required_single=required_single,
                                  **kwargs)

    async def _imquery(self,
                       qry: str,
                       *args: Any,
                       jsonify: RespJson = RespJson.NO,
                       required_single: RespConstraint = RespConstraint.FREE,
                       **kwargs: Any) -> QueryResult:
        return await self._query(qry,
                                 *args,
                                 jsonify=jsonify,
                                 required_single=required_single,
                                 **kwargs)

    async def _mquery(self,
                      qry: str,
                      *args: Any,
                      jsonify: RespJson = RespJson.NO,
                      required_single: RespConstraint = RespConstraint.FREE,
                      **kwargs: Any) -> QueryResult:
        return await self._query(qry,
                                 *args,
                                 jsonify=jsonify,
                                 required_single=required_single,
                                 **kwargs)

_query中:

  • logging.info記錄其實際被呼叫。
  • self._dbcalls加上1
  • 使用_get_client_qry_func取回的function,搭配qry*args**kwargs進行呼叫,並回傳計算結果。
class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...
    
    async def _query(self,
                     qry: str,
                     *args: Any,
                     jsonify: RespJson = RespJson.NO,
                     required_single: RespConstraint = RespConstraint.FREE,
                     **kwargs: Any) -> QueryResult:
        self._logger.info(self._fmt_query_log_msg(
            qry, args, jsonify, required_single, kwargs))
        self._dbcalls += 1
        qry_func = self._get_client_qry_func(jsonify, required_single)
        return await qry_func(qry, *args, **kwargs)

__aenter__

  • 於進入時,以logging.info記錄。
  • 呼叫self.get_cur_timestamp()將其值賦予self._start
class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...
    
    async def __aenter__(self) -> Self:
        self._logger.info(self._fmt_enter_aenter_log_msg())
        self._start = self.get_cur_timestamp()
        return self

__aexit__

  • 開頭使用await asyncio.sleep(1e-5)的原因是,這樣可以方便UI可以即時更新。
  • 進行兩次logging.info,一次記錄已經進入__aexit__,另一次記錄實際呼叫database次數。
  • 如果有exception發生的話,以logging.error記錄。
  • self._dbcalls加總至self._total_dbcalls後,呼叫self._reset_db_calls重設self._dbcalls0
  • 離開__aexit__前,計算在with中的時間,並以logging.info記錄。
  • 最後於離開__aexit__,呼叫self._reset_start重設self._start0.0
class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...
    
    async def __aexit__(self,
                        exc_type: type[BaseException] | None,
                        exc_value: BaseException | None,
                        exc_tb: TracebackType | None) -> None:
        await asyncio.sleep(1e-5)
        self._logger.info(self._fmt_enter_aexit_log_msg())
        self._logger.info(self._fmt_db_calls_log_msg())
        if exc_type:
            self._logger.error(self._fmt_aexit_exception_log_msg(exc_value))
        self._total_dbcalls += self._dbcalls
        self._reset_db_calls()
        elapsed = self.get_cur_timestamp() - self._start
        self._logger.info(self._fmt_exit_aexit_log_msg(elapsed))
        self._reset_start()

aclose

可以顯式呼叫底層clientself.client.aclose。請注意我們在__aexit__內,並沒有顯式的呼叫await self.aclose(),原因是現在很多database都具有pooling的機制,包括EdgeDB

    async def aclose(self, timeout: float = 5) -> None:
        print('aclose called')
        await asyncio.wait_for(self.client.aclose(), timeout) 

_healthy_check_urlis_healthy

_healthy_check_urlEdgeDB內建能判斷database現在狀態的url註3)。

is_healthy針對self._healthy_check_url發出GET request,並檢查返回status_code是否為200,來判斷database是否健康。如果遭遇到任何錯誤,設定回傳False

class EdgeDBCloudConn(AbstractAsyncContextManager):
    ...
    
    @property
    def _healthy_check_url(self) -> str:
        return f'https://{self._host}:{self._port}/server/status/alive'

    @property
    def is_healthy(self) -> bool:
        """https://www.edgedb.com/docs/guides/deployment/health_checks#health-checks"""
        try:
            return httpx.get(self._healthy_check_url,
                             follow_redirects=True,
                             verify=False,
                             timeout=30).status_code == 200
        except httpx.HTTPError:
            return False

_fmt_*

開頭的多個function皆為返回str,供logger所用,在此不加贅述。

_reset_*

開頭的多個function為重設時間或資料庫呼叫次數所用。

tests.py

總共有四個TestClass,都是實際對database進行query,沒有mocking註4)。

  • TestHealthy測試_healthy_check_urlis_healthy
  • TestImqryCachedConn測試在少量及大量query時都有進行快取。
  • TestImqryNonCachedConn測試其沒有快取機制。
  • TestMqryConn測試其沒有快取機制,且可正常insertdelete

備註

註1:由於這個project的目標是建立與EdgeDB cloud連結的connection,而不是學習EdgeQL的語法。如果是對其語法有興趣的朋友:

註2:如果您想要的是一個可以連接local或是host在其它雲端的EdgeDB connection,需要考慮較複雜的建立方式,包括支援DSN

註3:如果這是連接local或是ipconnection,此healthy_check_url網址可能使用http而非https

註4:當在Windows上測試時,會出現ResourceWarning: Enable tracemalloc to get the object allocation traceback,需要再研究資源關閉的問題。

Code

本日程式碼傳送門


上一篇
[Day26] 九翼 - Exception Groups與except*:相關應用
下一篇
[Day28] 末翼 - Term Projects:Project ECC - 建立EdgeDB Cloud Connection(2)
系列文
Python十翼:與未來的自己對話30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言