iT邦幫忙

2023 iThome 鐵人賽

DAY 26
0
Software Development

Python十翼:與未來的自己對話系列 第 26

[Day26] 九翼 - Exception Groups與except*:相關應用

  • 分享至 

  • xImage
  •  

今天我們分享三個Exception Groupsexcept*的相關應用。

  • 應用1:asyncio
  • 應用2:retry
  • 應用3:context manager

應用1:asyncio

asyncio是開發Exception Groupsexcept*的主要推手之一。在Python3.11前,處理asyncio相關問題最常使用的方法是asyncio.gatherasyncio.wait,而於Python3.11新添加了asyncio.TaskGroup。以下我們將分別使用三種方法來同時處理# 00內的四個coroutine function

# 00
async def coro1():
    raise ValueError('1')


async def coro2():
    raise TypeError('2')


async def coro3():
    raise TypeError('3')


async def coro4():
    return 'coro4 is good'

如果對asyncio有想深入了解的朋友,我們相當推薦Łukasz Langa註1)代表EdgeDB錄製的asyncio介紹系列

Coroutine function vs Coroutine object

在開始分享之前,在asyncio的世界裡,我們需要很清楚分別何謂coroutine

下面這段程式,是Python docs的例子。

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

此外,其也很明白地定義coroutine functioncoroutine object

a coroutine function: an async def function.
a coroutine object: an object returned by calling a coroutine function.

簡單說,使用async def來定義的function稱為coroutine function;而呼叫coroutine function會返回一個coroutine object。雖然為了方便溝通,我們常常使用coroutine來同時代稱這兩種概念,但是作為優秀的Python開發者,我們一定要能清楚分辨兩者的不同。

asyncio.gather

asyncio.gathersignature如下:

awaitable asyncio.gather(*aws, return_exceptions=False)

其可接收多個awaitable,並有一個return_exceptions的flag。當return_exceptionsTrue時,會將例外與結果包在一個list中返回。如果為False的話,遇到第一個例外就會報錯,但是其它的aws並不會取消,而是會繼續執行。

return_exceptions為True時

# 01a
...

async def main():
    tasks = []
    coros = [coro1(), coro2(), coro3(), coro4()]
    for coro in coros:
        task = asyncio.create_task(coro)
        tasks.append(task)
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # results=[ValueError('1'), TypeError('2'), TypeError('3'), None]

    for result in results:
        match result:
            case ValueError() as msg:
                print(f'ValueError handling: {msg}')
            case TypeError() as msg:
                print(f'TypeError handling: {msg}')
            case _ as others:
                print(f'Rest: {others}')


if __name__ == '__main__':
    asyncio.run(main())
ValueError handling: 1
TypeError handling: 2
TypeError handling: 3
Rest: coro4 is good

results中我們可以得到所以的結果及例外,但需要利用多個if或是新的structural pattern matching來分出各個情況。

return_exceptions為False時

# 01b
...

async def main():
    tasks = []
    coros = [coro1(), coro2(), coro3(), coro4()]
    for coro in coros:
        task = asyncio.create_task(coro)
        tasks.append(task)

    # raise ValueError('1')
    await asyncio.gather(*tasks, return_exceptions=False)


if __name__ == '__main__':
    asyncio.run(main())
Traceback (most recent call last):
    ....
    raise ValueError('1')
ValueError: 1

# 01b中,當遇到第一個例外ValueError('1')即會返回,但是其它aws的工作並不會取消。

注意事項

在建立tasks,很多人喜歡使用list comprehensions來做。沒錯,大部份情況下list comprehensions是個好點子,但在這邊或許不是...。有興趣了解為什麼的朋友可以參考Will McGugan(註2)寫的解釋

asyncio.wait

asyncio.waitsignature如下:

coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

其接受一個awsiterableTimeoutreturn_when的flag。
return_when共有三個選擇,預設為最常用的ALL_COMPLETED,其餘兩個為FIRST_COMPLETEDFIRST_EXCEPTION。其會返回兩個set,第一個set包含已經完成的task,第二個set則是未完成的task。一般使用的pattern會像這樣:

done, pending = await asyncio.wait(aws)
for p in pending:
    p.cancel()
    
for d in done:
    ...

一般asyncio.wait之後,我們會針對pending打一個迴圈,來取消未完成的工作。然後再對done打一個迴圈,取出其結果。

return_when為ALL_COMPLETED

我們使用預設的return_when=ALL_COMPLETED來解決問題。

  • 處理pending時,Or Chen建議對每一個pending task都再await asyncio.wait一小段時間,確保它們都能順利被取消。
  • 處理done時,由於裡面同時有resultException,所以必須在try-except中分開處理。
# 02a
...
async def main():
    tasks = []
    coros = [coro1(), coro2(), coro3(), coro4()]
    for coro in coros:
        task = asyncio.create_task(coro)
        tasks.append(task)
    done, pending = await asyncio.wait(tasks)
    for pt in pending:
        pt.cancael()
        await asyncio.wait(pt, timeout=1)

    for task in done:
        try:
            if exc := task.exception():
                raise exc
            else:
                print(f'{task.result()}')
        except Exception as e:
            print(f'handling {e}')


if __name__ == '__main__':
    asyncio.run(main())
handling 3
handling 1
coro4 is good
handling 2

由於這樣的pattern非常常用,Or Chen建議我們可以將大部份邏輯抽取到wait coroutine function並搭配ExceptionGroup使用。

# 02b
...

async def wait(aws: Iterable[Awaitable]) -> set[asyncio.Future]:
    # create tasks for aws
    tasks = []
    for aw in aws:
        if isinstance(aw, asyncio.Future):
            task = aw
        elif asyncio.iscoroutine(aw):
            task = asyncio.create_task(aw)
        else:
            raise TypeError('aws must all be awaitables')
        tasks.append(task)

    # wait
    done, pending = await asyncio.wait(tasks)
    for task in pending:
        task.cancel()
        await asyncio.wait(task, timeout=1)  # gracefully wait again

    # raise ExceptionGroup or return done
    exceptions = [exc for t in done if (exc := t.exception())]
    if exceptions:
        raise ExceptionGroup('Erros in aws', exceptions)
    return done


async def main():
    coros = [coro1(), coro2(), coro3(), coro4()]
    try:
        done = await wait(coros)
        for task in done:
            print(f'{task.result()}')
    except* Exception as eg:
        for exc in eg.exceptions:
            print(f'handling {exc}')
handling 2
handling 3
handling 1

wait內,可以分為三段:

  • 第一段針對aws建立task
  • 第二段執行asyncio.wait,並取消所有pending task,並多await asyncio.wait一次。
  • 第三段為收集例外。如果有收例外的話,則生成一個EG返回;如果沒有的話就返回done

main中,我們就可以使用try-except*的語法來處理例外。

# 02a# 02b其實並不完全相等。

  • # 02a的寫法可以同時得到result「以及」處理例外。
  • # 02b的寫法只能得到result「或是」處理例外。

實際上要用哪個方法,得視應用情況而定。

asyncio.TaskGroup

asyncio.TaskGroup是一個class,我們實際要用的是它的create_task function,其signature如下:

create_task(coro, *, name=None, context=None)

create_task接受單個coroutine,並可接受namecontext兩個參數。

其使用pattern會像:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")
  • async with會自動await,直到tg.create_task所生成的task完成為止。
  • 於等待期間,新的task還是可以加入(例如可以將tg可以傳到另一個coroutine內,再次使用tg.create_task)。
  • 一旦所有task完成,離開async with的範圍後,就無法添加新task
  • 於執行task時,當遇到第一個非asyncio.CancelledError的例外時,所有剩下的task都會被取消,也不能加入新的task。此時,若程式還在async with的範圍內,則直接在async with內的task也會被取消。至於最後的asyncio.CancelledError只會形成await的情況,不會傳遞出async with
  • 當所有task完成後,若有例外發生的話,會集合成ExceptionGroupBaseExceptionGroup(例外為KeyboardInterruptSystemExit時)。
  • 程式若於離開async with出錯時(__aexit__被一個exception set呼叫時),將會被視為任何一個task發生例外一樣,取消所有task,最後將無法取消的task集合成EGreraise。傳入__aexit__的例外除非是asyncio.CancelledError,否則也會加入EG中(KeyboardInterruptSystemExit一樣是例外)。

以tg.create_task代替asyncio.create_task

# 03a寫法相比於前面兩種精簡了不少,task不需顯性的await,當任一task遇到錯誤時也會自動取消。

# 03a
async def main():
    coros = [coro1(), coro2(), coro3(), coro4()]
    tasks = []
    try:
        async with asyncio.TaskGroup() as tg:
            for coro in coros:
                task = tg.create_task(coro)
                tasks.append(task)
    except* Exception as eg:
        for exc in eg.exceptions:
            print(f'handling {exc}')
    else:
        for task in tasks:
            print(task.result())

if __name__ == '__main__':
    asyncio.run(main())
handling 1
handling 2
handling 3

由於tg.create_task要嘛是全部成功返回結果,要嘛是raise EG,所以我們使用try-except*-else的語法,於except* Exception as eg處理例外,於else中從tasks拿結果。

如果task不在乎回傳值的話,語法可以更簡潔。我們可以像# 2b一樣,將真正的操作獨立出去,只要處理except* Exception as eg就好,如# 03b

# 03b
...

async def do_some_stuff():
    coros = [coro1(), coro2(), coro3(), coro4()]
    tasks = []
    async with asyncio.TaskGroup() as tg:
        for coro in coros:
            task = tg.create_task(coro)
            tasks.append(task)


async def main():
    try:
        await do_some_stuff()
    except* Exception as eg:
        for exc in eg.exceptions:
            print(f'handling {exc}')


if __name__ == '__main__':
    asyncio.run(main())
handling 1
handling 2
handling 3

另外,由於我們的四個coroutine function同時執行,當遇到第一個例外時,其它task其實也都執行完了,所以看不出來asyncio.TaskGroup可以幫忙取消task

# 03中,我們將coro1coro2coro3加上時間不等的asyncio.sleep,這樣就可以看出,於coro1被發現有例外時,coro2coro3都被取消了,而coro4已經執行完成。所以最後回傳的EG只有coro1中的 ValueError('1')

# 03c
import asyncio


async def coro1():
    await asyncio.sleep(1)
    raise ValueError('1')


async def coro2():
    await asyncio.sleep(2)
    raise TypeError('2')


async def coro3():
    await asyncio.sleep(3)
    raise TypeError('3')


async def coro4():
    return 'coro4 is good'


async def do_some_stuff():
    coros = [coro1(), coro2(), coro3(), coro4()]
    tasks = []
    async with asyncio.TaskGroup() as tg:
        for coro in coros:
            task = tg.create_task(coro)
            tasks.append(task)


async def main():
    try:
        await do_some_stuff()
    except* Exception as eg:
        for exc in eg.exceptions:
            print(f'handling {exc}')


if __name__ == '__main__':
    asyncio.run(main())
handling 1

asyncio.TaskGroup實例

於結束應用1前,我們來舉一個使用asyncio.TaskGroup的實例。

我們的目標是以asyncio.TaskGroup建立三個task,同時對三個網址發出request,取回各自response後,以json格式儲存成三個json檔。當其中任一task報錯,asyncio.TaskGroup將會自動取消其它task

我們將使用JSONPlaceholder的免費API,感謝他們。

Happy path

我們先假設在不需要處理例外的情況下,如何解決這個問題。
我們將此問題拆成四個functionmaindownload_manydownloaddump_json

main

  • asyncio的入口。
  • 生成一個task_info,格式為list內含三個tuple。每個tuple的第一個元素為task的名字,第二個元素則為想下載的網址。
  • 使用httpxlibrary來發出request
  • 使用built-incontextvars.ContextVar功能來取得及設定httpx.AsyncClient,如此可以免去顯性傳遞client
  • await實際工作的download_many
  • 由於不需要處理例外,所以我們可以直接印出每個task
# 03d
import asyncio
import contextvars
import json
from pathlib import Path

import httpx

async_client_contextvar = contextvars.ContextVar('async_client')
...

async def main():
    taks_info = [('Get user_1_todos',
                  'https://jsonplaceholder.typicode.com/users/1/todos'),
                 ('Get user_1_posts',
                  'https://jsonplaceholder.typicode.com/users/1/posts'),
                 ('Get user_1_comments',
                  'https://jsonplaceholder.typicode.com/posts/1/comments')]

    async with httpx.AsyncClient() as client:
        async_client_contextvar.set(client)
        tasks = await download_many(taks_info)
        for task in tasks:
            print(f'{task=}')


if __name__ == '__main__':
    asyncio.run(main())
task=<Task finished name='Get user_1_todos' coro=<download() done, defined at xxx.py:17> result=None>
task=<Task finished name='Get user_1_posts' coro=<download() done, defined at xxx.py.py:17> result=None>
task=<Task finished name='Get user_1_comments' coro=<download() done, defined at xxx.py:17> result=None>
download_many

download_many則是一個非常制式的tg.create_task模式,功用為包住download,建立task

# 03d
...

async def download_many(taks_info):
    tasks = []
    async with asyncio.TaskGroup() as tg:
        for task_name, url in taks_info:
            task = tg.create_task(download(url), name=task_name)
            tasks.append(task)
        return tasks
download

download中我們針對給定網址發送GET request,並將response轉為json格式後,呼叫dump_json存為json檔案。

# 03d
...

async def download(url):
    file = Path('_'.join(url.split('/')[-3:])).with_suffix('.json')
    async_client = async_client_contextvar.get()
    resp = await async_client.get(url)
    content = resp.json()
    dump_json(file, content)
dump_json

呼叫json.dump寫入json檔案。

# 03d
...

def dump_json(file, content):
    with open(file, mode='w', encoding='utf-8') as f:
        json.dump(content, f, indent=4)

例外處理

由於Happy path的情況實在太樂觀了,正所謂不出意外的話,就要出意外了。

我們在download過程中,不可避免的會遭遇到各種意外,例如NetworkErrorTimeoutException

我們在download內加上一些程式碼,使得每次執行download時,可能會:

  • 正常執行。
  • raise httpx.NetworkError
  • raise httpx.TimeoutException
  • 發生預期外的例外。

await asyncio.sleep(0.1)是為了防止各task執行太快,當其中一個有報錯時,已經執行完畢,看不出asyncio.TaskGroup的取消效果。

# 03e
import random

def random_bool():
    return random.choice((True, False))


async def download(url):
    file = Path('_'.join(url.split('/')[-3:])).with_suffix('.json')

    # emulate exceptions happened
    filename = str(file)
    if '_todos' in filename and random_bool():
        raise httpx.NetworkError(f'Can not connect to {url=}')
    elif '_posts' in filename and random_bool():
        raise httpx.TimeoutException(f'Wait too long for check {url=}')
    elif '_comments' in filename and random_bool():
        if random_bool():
            raise httpx.NetworkError(f'Can not connect to {url=}')
        else:
            raise httpx.TimeoutException(f'Wait too long for check {url=}')
    await asyncio.sleep(0.1)

    async_client = async_client_contextvar.get()
    resp = await async_client.get(url)
    content = resp.json()
    dump_json(file, content)

main中,我們可以使用except*語法來捕捉所有發生的例外。

# 03e
...

async def main():
    ...
    async with httpx.AsyncClient() as client:
        async_client_contextvar.set(client)
        try:
            tasks = await download_many(taks_info)
        except* httpx.NetworkError as ne_group:
            print(ne_group.exceptions)
        except* httpx.TimeoutException as te_group:
            print(te_group.exceptions)
        except* Exception as other_group:
            print(other_group.exceptions)
        else:
            for task in tasks:
                print(f'{task=}')

下面列出一些可能的情況,供參考。

(NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/users/1/todos'"), NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/posts/1/comments'"))
(TimeoutException("Wait too long for check url='https://jsonplaceholder.typicode.com/users/1/posts'"),)
(NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/users/1/todos'"),)
(TimeoutException("Wait too long for check url='https://jsonplaceholder.typicode.com/posts/1/comments'"),)
(NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/posts/1/comments'"),)
task=<Task finished name='Get user_1_todos' coro=<download() done, defined at xxx.py> result=None>
task=<Task finished name='Get user_1_posts' coro=<download() done, defined at xxx.py> result=None>
task=<Task finished name='Get user_1_comments' coro=<download() done, defined at xxx.py> result=None>

應用2:retry

這個小節我們試著使用decorator並搭配EG

我們的目標是建立一個retrydecorator function

  • 其可利用@retry(max_retries=n)的語法,來對被裝飾的function,進行n次的retry。
  • 或是利用@retry的語法,來對被裝飾的function,執行預設次數(預設1次)的retry。
  • 若retry結束,被裝飾的function仍然無法成功完成的話,會收集所有retry過程中的例外,生成一個EG返回。

# 04中的my_func為被retry所裝飾的function,其可能會raise TypeError('1')raise ValueError('2')或成功返回ok

# 04
...

@retry
def my_func():
    lot = random.choice([TypeError('1'), ValueError('2'),  'ok'])
    match lot:
        case TypeError():
            raise lot
        case ValueError():
            raise lot
        case _:
            return lot

接著實作retry

  • 於一開始做一個max_retries的檢查,若無法通過的話,raise EG
  • 接下來的dec是真正接收my_funcdecorator function,其會返回內部真正執行計算的wrapper
  • wrapper內最多需執行max_retries+1次(1是指my_func本身要先執行一次,如果有不成功的情況,才會進行max_retries次的retry)。當成功得到結果後,立即返回,若有例外的話,就累積到exceptions中。在經過max_retries+1次重新呼叫my_func仍然沒有返回的話,代表有例外,於最後生成一個EG來包住exceptions並返回。
  • 至於最後return dec或是return dec(func)這段,是方便我們可以同時使用@retry@retry()兩種語法(可參考[Day05]的內容)。
# 04
import random
from functools import wraps

def retry(func=None, /, max_retries=1):
    if not isinstance(max_retries, int) or max_retries < 0:
        raise ExceptionGroup('invalid max_retries',
                             (ValueError('max_retries must be an integer and >=0'),))

    def dec(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            exceptions = []
            runs = max_retries+1  # add the first invocation
            for i, _ in enumerate(range(runs), 1):
                print(f'{func.__name__} is running ({i}/{runs})')
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    exceptions.append(e)
            raise ExceptionGroup(
                f'Retry {max_retries} times but still failed', exceptions)
        return wrapper

    if func is None:
        return dec
    return dec(func)
...

最後執行程式,可能會有多種結果。而透過except*這個新語法,我們可以捕捉到所有發生過的例外。

# 04
...

if __name__ == '__main__':
    try:
        print(f'{my_func()=}')  # 'ok'
    except* Exception as eg:
        print(eg.exceptions) 
        # 4 possibilities
        # (TypeError('1'), TypeError('1'))
        # (TypeError('1'), ValueError('2'))
        # (ValueError('2'), (TypeError('1'))
        # (ValueError('2'), ValueError('2'))

一些可能的結果供參考。

my_func is running (1/2)
my_func()='ok'
my_func is running (1/2)
my_func is running (2/2)
my_func()='ok'
my_func is running (1/2)
my_func is running (2/2)
(TypeError('1'), ValueError('2'))
my_func is running (1/2)
my_func is running (2/2)
(ValueError('2'), TypeError('1'))

應用3:context manager

這個小節我們準備建立一個實作有context mnager protocolclass,並觀察其於__exit__報錯時,於外層捕捉例外的行為。

try-except

我們先觀察傳統try-except的行為。

# 05a
from contextlib import AbstractContextManager


class DBCloseError(Exception):
    ...


class HTTPError(Exception):
    ...


class DBClient:
    def close(self):
        raise DBCloseError('Error occurred while closing db...')


class Connection(AbstractContextManager):
    def __init__(self):
        self._client = DBClient()

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._client.close()

    def do_something(self):
        return 'done'

    def send_report(self):
        raise HTTPError('Report is not sent.')


if __name__ == '__main__':
    try:
        with Connection() as conn:
            conn.do_something()
            conn.send_report()
    except HTTPError:
        print('handling HTTPError...')
    except DBCloseError:
        print('handling DBCloseError...')
handling DBCloseError...
  • conn.do_something() 正常執行,沒有例外。
  • conn.send_report()raise HTTPError
  • 於離開with Connection() as conn時,__exit__中的self._client.close()raise DBCloseError
  • 於最外層我們試著捕捉HTTPErrorDBCloseError,結果只會抓到DBCloseError

我們真正想做的操作是conn.do_something()(無例外)及conn.send_report()(有例外),但因為離開context manager時也有例外,導致我們於外層只能捕捉到context manager的例外,而無法捕捉到真正操作時,所發生的例外。

try-except*

except*語法可以改變這種行為。

# 05b中。我們改在__exit__中先使用try-except捕捉例外。如果有例外的話,我們將此例外與我們顯性raisee,一起用一個EG包起來後回傳。

# 05b
...
class Connection(AbstractContextManager):
    ...
    def __exit__(self, exc_type, exc_value, exc_tb):
        try:
            self._client.close()
        except Exception as e:
            raise ExceptionGroup(
                'Got Exception while closing connection', [e, exc_value])
 
            
if __name__ == '__main__':
    try:
        with Connection() as conn:
            conn.do_something()
            conn.send_report()
    except* HTTPError:
        print('handling HTTPError...')
    except* DBCloseError:
        print('handling DBCloseError...')

這麼一來,我們在外層就可以同時捕捉到兩種例外。

handling HTTPError...
handling DBCloseError...

參考資料

備註

註1:Łukasz Langa是Python基金會雇請的第一位CPython Developer in Residence,並進入第三年任期。此外,他也常在各地的PyCon演講,錄影大多可以在YouTube上找到。我們覺得他的講解十分清楚,在他身上學了很多,非常感謝他的分享。

註2:Will McGuganRichTextual兩個超酷library的創始人。如果您沒聽過他,又剛好有terminal方面的Python應用,絕對不要錯過這兩個library。

Code

本日程式碼傳送門


上一篇
[Day25] 九翼 - Exception Groups與except*:導讀PEP654
下一篇
[Day27] 末翼 - Term Projects:Project ECC - 建立EdgeDB Cloud Connection(1)
系列文
Python十翼:與未來的自己對話30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言