今天我們分享三個Exception Groups
與except*
的相關應用。
asyncio
是開發Exception Groups
與except*
的主要推手之一。在Python3.11前,處理asyncio
相關問題最常使用的方法是asyncio.gather
與asyncio.wait
,而於Python3.11新添加了asyncio.TaskGroup
。以下我們將分別使用三種方法來同時處理# 00
內的四個coroutine function
。
# 00
async def coro1():
raise ValueError('1')
async def coro2():
raise TypeError('2')
async def coro3():
raise TypeError('3')
async def coro4():
return 'coro4 is good'
如果對asyncio
有想深入了解的朋友,我們相當推薦Łukasz Langa
(註1
)代表EdgeDB
錄製的asyncio介紹系列。
在開始分享之前,在asyncio
的世界裡,我們需要很清楚分別何謂coroutine
。
下面這段程式,是Python docs的例子。
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
此外,其也很明白地定義了coroutine function
與coroutine object
。
a coroutine function: an async def function.
a coroutine object: an object returned by calling a coroutine function.
簡單說,使用async def
來定義的function
稱為coroutine function
;而呼叫coroutine function
會返回一個coroutine object
。雖然為了方便溝通,我們常常使用coroutine
來同時代稱這兩種概念,但是作為優秀的Python開發者,我們一定要能清楚分辨兩者的不同。
asyncio.gather的signature
如下:
awaitable asyncio.gather(*aws, return_exceptions=False)
其可接收多個awaitable,並有一個return_exceptions
的flag。當return_exceptions
為True
時,會將例外與結果包在一個list
中返回。如果為False
的話,遇到第一個例外就會報錯,但是其它的aws
並不會取消,而是會繼續執行。
# 01a
...
async def main():
tasks = []
coros = [coro1(), coro2(), coro3(), coro4()]
for coro in coros:
task = asyncio.create_task(coro)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# results=[ValueError('1'), TypeError('2'), TypeError('3'), None]
for result in results:
match result:
case ValueError() as msg:
print(f'ValueError handling: {msg}')
case TypeError() as msg:
print(f'TypeError handling: {msg}')
case _ as others:
print(f'Rest: {others}')
if __name__ == '__main__':
asyncio.run(main())
ValueError handling: 1
TypeError handling: 2
TypeError handling: 3
Rest: coro4 is good
從results
中我們可以得到所以的結果及例外,但需要利用多個if
或是新的structural pattern matching
來分出各個情況。
# 01b
...
async def main():
tasks = []
coros = [coro1(), coro2(), coro3(), coro4()]
for coro in coros:
task = asyncio.create_task(coro)
tasks.append(task)
# raise ValueError('1')
await asyncio.gather(*tasks, return_exceptions=False)
if __name__ == '__main__':
asyncio.run(main())
Traceback (most recent call last):
....
raise ValueError('1')
ValueError: 1
# 01b
中,當遇到第一個例外ValueError('1')
即會返回,但是其它aws
的工作並不會取消。
在建立tasks
,很多人喜歡使用list comprehensions
來做。沒錯,大部份情況下list comprehensions
是個好點子,但在這邊或許不是...。有興趣了解為什麼的朋友可以參考Will McGugan
(註2
)寫的解釋。
asyncio.wait的signature
如下:
coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
其接受一個aws
的iterable
,Timeout
及return_when
的flag。return_when
共有三個選擇,預設為最常用的ALL_COMPLETED
,其餘兩個為FIRST_COMPLETED
與FIRST_EXCEPTION
。其會返回兩個set
,第一個set
包含已經完成的task
,第二個set
則是未完成的task
。一般使用的pattern會像這樣:
done, pending = await asyncio.wait(aws)
for p in pending:
p.cancel()
for d in done:
...
一般asyncio.wait
之後,我們會針對pending
打一個迴圈,來取消未完成的工作。然後再對done
打一個迴圈,取出其結果。
我們使用預設的return_when=ALL_COMPLETED
來解決問題。
pending
時,Or Chen
建議對每一個pending
task
都再await asyncio.wait
一小段時間,確保它們都能順利被取消。done
時,由於裡面同時有result
及Exception
,所以必須在try-except
中分開處理。# 02a
...
async def main():
tasks = []
coros = [coro1(), coro2(), coro3(), coro4()]
for coro in coros:
task = asyncio.create_task(coro)
tasks.append(task)
done, pending = await asyncio.wait(tasks)
for pt in pending:
pt.cancael()
await asyncio.wait(pt, timeout=1)
for task in done:
try:
if exc := task.exception():
raise exc
else:
print(f'{task.result()}')
except Exception as e:
print(f'handling {e}')
if __name__ == '__main__':
asyncio.run(main())
handling 3
handling 1
coro4 is good
handling 2
由於這樣的pattern非常常用,Or Chen
建議我們可以將大部份邏輯抽取到wait
coroutine function
並搭配ExceptionGroup
使用。
# 02b
...
async def wait(aws: Iterable[Awaitable]) -> set[asyncio.Future]:
# create tasks for aws
tasks = []
for aw in aws:
if isinstance(aw, asyncio.Future):
task = aw
elif asyncio.iscoroutine(aw):
task = asyncio.create_task(aw)
else:
raise TypeError('aws must all be awaitables')
tasks.append(task)
# wait
done, pending = await asyncio.wait(tasks)
for task in pending:
task.cancel()
await asyncio.wait(task, timeout=1) # gracefully wait again
# raise ExceptionGroup or return done
exceptions = [exc for t in done if (exc := t.exception())]
if exceptions:
raise ExceptionGroup('Erros in aws', exceptions)
return done
async def main():
coros = [coro1(), coro2(), coro3(), coro4()]
try:
done = await wait(coros)
for task in done:
print(f'{task.result()}')
except* Exception as eg:
for exc in eg.exceptions:
print(f'handling {exc}')
handling 2
handling 3
handling 1
wait
內,可以分為三段:
aws
建立task
。asyncio.wait
,並取消所有pending
task
,並多await asyncio.wait
一次。EG
返回;如果沒有的話就返回done
。於main
中,我們就可以使用try-except*
的語法來處理例外。
# 02a
與# 02b
其實並不完全相等。
# 02a
的寫法可以同時得到result
「以及」處理例外。# 02b
的寫法只能得到result
「或是」處理例外。實際上要用哪個方法,得視應用情況而定。
asyncio.TaskGroup
是一個class
,我們實際要用的是它的create_task
function
,其signature
如下:
create_task(coro, *, name=None, context=None)
create_task
接受單個coroutine
,並可接受name
及context
兩個參數。
其使用pattern會像:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro(...))
task2 = tg.create_task(another_coro(...))
print("Both tasks have completed now.")
async with
會自動await
,直到tg.create_task
所生成的task
完成為止。task
還是可以加入(例如可以將tg
可以傳到另一個coroutine
內,再次使用tg.create_task
)。task
完成,離開async with
的範圍後,就無法添加新task
。task
時,當遇到第一個非asyncio.CancelledError
的例外時,所有剩下的task
都會被取消,也不能加入新的task
。此時,若程式還在async with
的範圍內,則直接在async with
內的task
也會被取消。至於最後的asyncio.CancelledError
只會形成await
的情況,不會傳遞出async with
。task
完成後,若有例外發生的話,會集合成ExceptionGroup
或BaseExceptionGroup
(例外為KeyboardInterrupt
及SystemExit
時)。async with
出錯時(__aexit__
被一個exception set
呼叫時),將會被視為任何一個task
發生例外一樣,取消所有task
,最後將無法取消的task
集合成EG
再reraise
。傳入__aexit__
的例外除非是asyncio.CancelledError
,否則也會加入EG
中(KeyboardInterrupt
及SystemExit
一樣是例外)。# 03a
寫法相比於前面兩種精簡了不少,task
不需顯性的await
,當任一task
遇到錯誤時也會自動取消。
# 03a
async def main():
coros = [coro1(), coro2(), coro3(), coro4()]
tasks = []
try:
async with asyncio.TaskGroup() as tg:
for coro in coros:
task = tg.create_task(coro)
tasks.append(task)
except* Exception as eg:
for exc in eg.exceptions:
print(f'handling {exc}')
else:
for task in tasks:
print(task.result())
if __name__ == '__main__':
asyncio.run(main())
handling 1
handling 2
handling 3
由於tg.create_task
要嘛是全部成功返回結果,要嘛是raise EG
,所以我們使用try-except*-else
的語法,於except* Exception as eg
處理例外,於else
中從tasks
拿結果。
如果task
不在乎回傳值的話,語法可以更簡潔。我們可以像# 2b
一樣,將真正的操作獨立出去,只要處理except* Exception as eg
就好,如# 03b
。
# 03b
...
async def do_some_stuff():
coros = [coro1(), coro2(), coro3(), coro4()]
tasks = []
async with asyncio.TaskGroup() as tg:
for coro in coros:
task = tg.create_task(coro)
tasks.append(task)
async def main():
try:
await do_some_stuff()
except* Exception as eg:
for exc in eg.exceptions:
print(f'handling {exc}')
if __name__ == '__main__':
asyncio.run(main())
handling 1
handling 2
handling 3
另外,由於我們的四個coroutine
function
同時執行,當遇到第一個例外時,其它task
其實也都執行完了,所以看不出來asyncio.TaskGroup
可以幫忙取消task
。
# 03
中,我們將coro1
、coro2
及coro3
加上時間不等的asyncio.sleep
,這樣就可以看出,於coro1
被發現有例外時,coro2
及coro3
都被取消了,而coro4
已經執行完成。所以最後回傳的EG
只有coro1
中的 ValueError('1')
。
# 03c
import asyncio
async def coro1():
await asyncio.sleep(1)
raise ValueError('1')
async def coro2():
await asyncio.sleep(2)
raise TypeError('2')
async def coro3():
await asyncio.sleep(3)
raise TypeError('3')
async def coro4():
return 'coro4 is good'
async def do_some_stuff():
coros = [coro1(), coro2(), coro3(), coro4()]
tasks = []
async with asyncio.TaskGroup() as tg:
for coro in coros:
task = tg.create_task(coro)
tasks.append(task)
async def main():
try:
await do_some_stuff()
except* Exception as eg:
for exc in eg.exceptions:
print(f'handling {exc}')
if __name__ == '__main__':
asyncio.run(main())
handling 1
於結束應用1
前,我們來舉一個使用asyncio.TaskGroup
的實例。
我們的目標是以asyncio.TaskGroup
建立三個task
,同時對三個網址發出request
,取回各自response
後,以json
格式儲存成三個json
檔。當其中任一task
報錯,asyncio.TaskGroup
將會自動取消其它task
。
我們將使用JSONPlaceholder的免費API,感謝他們。
我們先假設在不需要處理例外的情況下,如何解決這個問題。
我們將此問題拆成四個function
、main
、download_many
、download
及dump_json
asyncio
的入口。task_info
,格式為list
內含三個tuple
。每個tuple
的第一個元素為task
的名字,第二個元素則為想下載的網址。httpx
library來發出request
。built-in
的contextvars.ContextVar
功能來取得及設定httpx.AsyncClient
,如此可以免去顯性傳遞client
。await
實際工作的download_many
。task
。# 03d
import asyncio
import contextvars
import json
from pathlib import Path
import httpx
async_client_contextvar = contextvars.ContextVar('async_client')
...
async def main():
taks_info = [('Get user_1_todos',
'https://jsonplaceholder.typicode.com/users/1/todos'),
('Get user_1_posts',
'https://jsonplaceholder.typicode.com/users/1/posts'),
('Get user_1_comments',
'https://jsonplaceholder.typicode.com/posts/1/comments')]
async with httpx.AsyncClient() as client:
async_client_contextvar.set(client)
tasks = await download_many(taks_info)
for task in tasks:
print(f'{task=}')
if __name__ == '__main__':
asyncio.run(main())
task=<Task finished name='Get user_1_todos' coro=<download() done, defined at xxx.py:17> result=None>
task=<Task finished name='Get user_1_posts' coro=<download() done, defined at xxx.py.py:17> result=None>
task=<Task finished name='Get user_1_comments' coro=<download() done, defined at xxx.py:17> result=None>
download_many
則是一個非常制式的tg.create_task
模式,功用為包住download
,建立task
。
# 03d
...
async def download_many(taks_info):
tasks = []
async with asyncio.TaskGroup() as tg:
for task_name, url in taks_info:
task = tg.create_task(download(url), name=task_name)
tasks.append(task)
return tasks
於download
中我們針對給定網址發送GET request
,並將response
轉為json
格式後,呼叫dump_json
存為json
檔案。
# 03d
...
async def download(url):
file = Path('_'.join(url.split('/')[-3:])).with_suffix('.json')
async_client = async_client_contextvar.get()
resp = await async_client.get(url)
content = resp.json()
dump_json(file, content)
呼叫json.dump
寫入json
檔案。
# 03d
...
def dump_json(file, content):
with open(file, mode='w', encoding='utf-8') as f:
json.dump(content, f, indent=4)
由於Happy path
的情況實在太樂觀了,正所謂不出意外的話,就要出意外了。
我們在download
過程中,不可避免的會遭遇到各種意外,例如NetworkError
或TimeoutException
。
我們在download
內加上一些程式碼,使得每次執行download
時,可能會:
raise httpx.NetworkError
。raise httpx.TimeoutException
。await asyncio.sleep(0.1)
是為了防止各task
執行太快,當其中一個有報錯時,已經執行完畢,看不出asyncio.TaskGroup
的取消效果。
# 03e
import random
def random_bool():
return random.choice((True, False))
async def download(url):
file = Path('_'.join(url.split('/')[-3:])).with_suffix('.json')
# emulate exceptions happened
filename = str(file)
if '_todos' in filename and random_bool():
raise httpx.NetworkError(f'Can not connect to {url=}')
elif '_posts' in filename and random_bool():
raise httpx.TimeoutException(f'Wait too long for check {url=}')
elif '_comments' in filename and random_bool():
if random_bool():
raise httpx.NetworkError(f'Can not connect to {url=}')
else:
raise httpx.TimeoutException(f'Wait too long for check {url=}')
await asyncio.sleep(0.1)
async_client = async_client_contextvar.get()
resp = await async_client.get(url)
content = resp.json()
dump_json(file, content)
於main
中,我們可以使用except*
語法來捕捉所有發生的例外。
# 03e
...
async def main():
...
async with httpx.AsyncClient() as client:
async_client_contextvar.set(client)
try:
tasks = await download_many(taks_info)
except* httpx.NetworkError as ne_group:
print(ne_group.exceptions)
except* httpx.TimeoutException as te_group:
print(te_group.exceptions)
except* Exception as other_group:
print(other_group.exceptions)
else:
for task in tasks:
print(f'{task=}')
下面列出一些可能的情況,供參考。
(NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/users/1/todos'"), NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/posts/1/comments'"))
(TimeoutException("Wait too long for check url='https://jsonplaceholder.typicode.com/users/1/posts'"),)
(NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/users/1/todos'"),)
(TimeoutException("Wait too long for check url='https://jsonplaceholder.typicode.com/posts/1/comments'"),)
(NetworkError("Can not connect to url='https://jsonplaceholder.typicode.com/posts/1/comments'"),)
task=<Task finished name='Get user_1_todos' coro=<download() done, defined at xxx.py> result=None>
task=<Task finished name='Get user_1_posts' coro=<download() done, defined at xxx.py> result=None>
task=<Task finished name='Get user_1_comments' coro=<download() done, defined at xxx.py> result=None>
這個小節我們試著使用decorator
並搭配EG
。
我們的目標是建立一個retry
的decorator function
:
@retry(max_retries=n)
的語法,來對被裝飾的function
,進行n
次的retry。@retry
的語法,來對被裝飾的function
,執行預設次數(預設1次)的retry。function
仍然無法成功完成的話,會收集所有retry過程中的例外,生成一個EG
返回。# 04
中的my_func
為被retry
所裝飾的function
,其可能會raise TypeError('1')
、raise ValueError('2')
或成功返回ok
。
# 04
...
@retry
def my_func():
lot = random.choice([TypeError('1'), ValueError('2'), 'ok'])
match lot:
case TypeError():
raise lot
case ValueError():
raise lot
case _:
return lot
接著實作retry
:
max_retries
的檢查,若無法通過的話,raise EG
。dec
是真正接收my_func
的decorator
function
,其會返回內部真正執行計算的wrapper
。wrapper
內最多需執行max_retries+1
次(1
是指my_func
本身要先執行一次,如果有不成功的情況,才會進行max_retries
次的retry)。當成功得到結果後,立即返回,若有例外的話,就累積到exceptions
中。在經過max_retries+1
次重新呼叫my_func
仍然沒有返回的話,代表有例外,於最後生成一個EG
來包住exceptions
並返回。return dec
或是return dec(func)
這段,是方便我們可以同時使用@retry
及@retry()
兩種語法(可參考[Day05]的內容)。# 04
import random
from functools import wraps
def retry(func=None, /, max_retries=1):
if not isinstance(max_retries, int) or max_retries < 0:
raise ExceptionGroup('invalid max_retries',
(ValueError('max_retries must be an integer and >=0'),))
def dec(func):
@wraps(func)
def wrapper(*args, **kwargs):
exceptions = []
runs = max_retries+1 # add the first invocation
for i, _ in enumerate(range(runs), 1):
print(f'{func.__name__} is running ({i}/{runs})')
try:
return func(*args, **kwargs)
except Exception as e:
exceptions.append(e)
raise ExceptionGroup(
f'Retry {max_retries} times but still failed', exceptions)
return wrapper
if func is None:
return dec
return dec(func)
...
最後執行程式,可能會有多種結果。而透過except*
這個新語法,我們可以捕捉到所有發生過的例外。
# 04
...
if __name__ == '__main__':
try:
print(f'{my_func()=}') # 'ok'
except* Exception as eg:
print(eg.exceptions)
# 4 possibilities
# (TypeError('1'), TypeError('1'))
# (TypeError('1'), ValueError('2'))
# (ValueError('2'), (TypeError('1'))
# (ValueError('2'), ValueError('2'))
一些可能的結果供參考。
my_func is running (1/2)
my_func()='ok'
my_func is running (1/2)
my_func is running (2/2)
my_func()='ok'
my_func is running (1/2)
my_func is running (2/2)
(TypeError('1'), ValueError('2'))
my_func is running (1/2)
my_func is running (2/2)
(ValueError('2'), TypeError('1'))
這個小節我們準備建立一個實作有context mnager protocol
的class
,並觀察其於__exit__
報錯時,於外層捕捉例外的行為。
我們先觀察傳統try-except
的行為。
# 05a
from contextlib import AbstractContextManager
class DBCloseError(Exception):
...
class HTTPError(Exception):
...
class DBClient:
def close(self):
raise DBCloseError('Error occurred while closing db...')
class Connection(AbstractContextManager):
def __init__(self):
self._client = DBClient()
def __exit__(self, exc_type, exc_value, exc_tb):
self._client.close()
def do_something(self):
return 'done'
def send_report(self):
raise HTTPError('Report is not sent.')
if __name__ == '__main__':
try:
with Connection() as conn:
conn.do_something()
conn.send_report()
except HTTPError:
print('handling HTTPError...')
except DBCloseError:
print('handling DBCloseError...')
handling DBCloseError...
conn.do_something()
正常執行,沒有例外。conn.send_report()
會raise
HTTPError
。with Connection() as conn
時,__exit__
中的self._client.close()
會raise DBCloseError
。HTTPError
與DBCloseError
,結果只會抓到DBCloseError
。我們真正想做的操作是conn.do_something()
(無例外)及conn.send_report()
(有例外),但因為離開context manager
時也有例外,導致我們於外層只能捕捉到context manager
的例外,而無法捕捉到真正操作時,所發生的例外。
except*
語法可以改變這種行為。
# 05b
中。我們改在__exit__
中先使用try-except
捕捉例外。如果有例外的話,我們將此例外與我們顯性raise
的e
,一起用一個EG
包起來後回傳。
# 05b
...
class Connection(AbstractContextManager):
...
def __exit__(self, exc_type, exc_value, exc_tb):
try:
self._client.close()
except Exception as e:
raise ExceptionGroup(
'Got Exception while closing connection', [e, exc_value])
if __name__ == '__main__':
try:
with Connection() as conn:
conn.do_something()
conn.send_report()
except* HTTPError:
print('handling HTTPError...')
except* DBCloseError:
print('handling DBCloseError...')
這麼一來,我們在外層就可以同時捕捉到兩種例外。
handling HTTPError...
handling DBCloseError...
註1:Łukasz Langa是Python基金會雇請的第一位CPython Developer in Residence
,並進入第三年任期。此外,他也常在各地的PyCon演講,錄影大多可以在YouTube上找到。我們覺得他的講解十分清楚,在他身上學了很多,非常感謝他的分享。
註2:Will McGugan
是Rich和Textual兩個超酷library的創始人。如果您沒聽過他,又剛好有terminal
方面的Python應用,絕對不要錯過這兩個library。