各位熱心的大神好,不知各位有沒有用過python的asyncio,下面這些我不確定有沒有弄懂它的意思:
import unittest
import asyncio
from multiprocessing import Process, Queue
def run_server(queue, modules):
async def main():
task_list = []
for server in servers:
# 每一個 server.start() 會 return 一個 list: [ event.wait() ], event 是一個asyncio.Event()
task_list += await server.start()
async def eventloop():
while True:
await asyncio.sleep(0.5)
try:
# message 的格式是 {"command":..., "server":..., "data":...}
message = queue.get(block=False)
except Empty:
pass
else:
if message["command"] == "stop":
return
elif message["command"] == "set_data":
servers[message["server"]].data = message["data"]
task_list.append(eventloop())
task_list = [
t if isinstance(t, asyncio.Task) else asyncio.create_task(t) for t in tasks
]
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in finished:
e = task.exception()
if e:
raise e
asyncio.run(main())
然後這個run_server函數在unittest中是這樣執行的:
MODULES = ["test_module"]
class TestCase(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.queue = Queue()
self.process = Process(target=run_server, args=(self.queue, MODULES))
self.process.start()
...
async def asyncTearDown(self):
self.queue.put({"type": "stop"})
self.process.join()
我的理解是這樣,在run_server()中我們將servers中的server放入task_list,另外也將event_loop()放入task_list中,每隔段0.5秒的時間就判斷queue裡面有沒有message。沒有message的話,則停留在eventloop()的while loop中。如果有message,則看看要根據message內容做什麼操作。
而run_server()在unittest的class中是以Process來執行,我們可以一面執行run_server,一面做別的事(比如在queue裡面放入新的message,跑其它testcase)。
最後等到unittest中所有的case都跑完,在asyncTearDown中,我們在self.queue中放入{"type":"stop"}來停止run_server中的eventloop(),並用self.process.join()停止所有process。
不知道我上述的理解正不正確。然後想請問為什麼不論是unittest或是run_server()中的的操作都需用到asyncio? 有什麼必要性嗎?
自己對python的asyncio一直不太能理解,有點像無頭蒼蠅在亂問一通了,感謝各位看完我的問題。
加上註解
import unittest
import asyncio
from multiprocessing import Process, Queue
# 啟動 server 並進行必要的操作
def run_server(queue, modules):
# 定義 main 函數,是真正負責啟動和管理 server 的協同程序
async def main():
task_list = [] # 存儲每一個 server.start() 返回的 task
# 依次啟動每一個 server
for server in servers:
# server.start() 會返回一個 list: [ event.wait() ], event 是一個 asyncio.Event()
task_list += await server.start()
# 定義 eventloop 函數,是一個不斷迴圈的協同程序,不斷從 queue 接收 message
async def eventloop():
while True:
# 每隔0.5秒執行一次
await asyncio.sleep(0.5)
try:
# 從 queue 中獲取 message
message = queue.get(block=False)
except Empty:
# 如果 queue 為空,就跳過
pass
else:
if message["command"] == "stop":
# 若 command 是 "stop",則結束整個程式
return
elif message["command"] == "set_data":
# 若 command 是 "set_data",則把 message 中的 data 設置到 servers 中相應的 serer
servers[message["server"]].data = message["data"]
# 將 eventloop 加入 task_list 中
task_list.append(eventloop())
# 將 task_list 中的每一個 task 包裝為 asyncio.Task
task_list = [
t if isinstance(t, asyncio.Task) else asyncio.create_task(t) for t in tasks
]
# 等待所有 task 完成
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in finished:
e = task.exception()
if e:
# 如果 task 出現異常,則拋出異常
raise e
# 啟動asyncio 事件循環
asyncio.run(main())
unittest:
# 定義要運行的模組
MODULES = ["test_module"]
# 繼承 unittest.IsolatedAsyncioTestCase 並實現 asyncio 的測試類
class TestCase(unittest.IsolatedAsyncioTestCase):
# 在每次測試開始時運行
async def asyncSetUp(self):
# 建立一個 Queue,用來和運行在另一個 process 中的伺服器進行通訊
self.queue = Queue()
# 建立一個 Process,這個 process 會執行 run_server 函數
self.process = Process(target=run_server, args=(self.queue, MODULES))
# 啟動 process
self.process.start()
...
# 在每次測試結束時運行
async def asyncTearDown(self):
# 向 queue 中加入一個特殊的消息,通知運行在另一個 process 中的伺服器停止運行
self.queue.put({"type": "stop"})
# 等待 process 結束
self.process.join()
然後想請問為什麼不論是unittest或是run_server()中的的操作都需用到asyncio?
有什麼必要性嗎?
這是因為 asyncio 是一個非同步 I/O 框架,它能夠支援多個任務同時執行並有效利用 CPU 時間。而 unittest 以及 run_server 都涉及到長時間等待或需要大量資源的操作,使用 asyncio 可以確保程式在這些操作過程中不會被阻塞,仍能夠做其他事情。因此在這樣的環境中使用 asyncio 是有必要的。