各位熱心的大神好,不知各位有沒有用過python的asyncio,下面這些我不確定有沒有弄懂它的意思:
import unittest
import asyncio
from multiprocessing import Process, Queue
def run_server(queue, modules):
    async def main():
        task_list = []
        for server in servers:
            # 每一個 server.start() 會 return 一個 list: [ event.wait() ], event 是一個asyncio.Event()
            task_list += await server.start()
        async def eventloop():
            while True:
                await asyncio.sleep(0.5)
                try:
                # message 的格式是 {"command":..., "server":..., "data":...}
                    message = queue.get(block=False)
                except Empty:
                    pass
                else:
                    if message["command"] == "stop":
                        return
                    elif message["command"] == "set_data":
                        servers[message["server"]].data = message["data"]
        task_list.append(eventloop())
        task_list = [
            t if isinstance(t, asyncio.Task) else asyncio.create_task(t) for t in tasks
        ]
        finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in finished:
            e = task.exception()
            if e:
                raise e
    asyncio.run(main())
然後這個run_server函數在unittest中是這樣執行的:
MODULES = ["test_module"]
class TestCase(unittest.IsolatedAsyncioTestCase):
    async def asyncSetUp(self):
        self.queue = Queue()
        self.process = Process(target=run_server, args=(self.queue, MODULES))
        self.process.start()
        
    ...
        
    async def asyncTearDown(self):
        self.queue.put({"type": "stop"})
        self.process.join()
我的理解是這樣,在run_server()中我們將servers中的server放入task_list,另外也將event_loop()放入task_list中,每隔段0.5秒的時間就判斷queue裡面有沒有message。沒有message的話,則停留在eventloop()的while loop中。如果有message,則看看要根據message內容做什麼操作。
而run_server()在unittest的class中是以Process來執行,我們可以一面執行run_server,一面做別的事(比如在queue裡面放入新的message,跑其它testcase)。
最後等到unittest中所有的case都跑完,在asyncTearDown中,我們在self.queue中放入{"type":"stop"}來停止run_server中的eventloop(),並用self.process.join()停止所有process。
不知道我上述的理解正不正確。然後想請問為什麼不論是unittest或是run_server()中的的操作都需用到asyncio? 有什麼必要性嗎?
自己對python的asyncio一直不太能理解,有點像無頭蒼蠅在亂問一通了,感謝各位看完我的問題。
加上註解
import unittest
import asyncio
from multiprocessing import Process, Queue
# 啟動 server 並進行必要的操作
def run_server(queue, modules):
    # 定義 main 函數,是真正負責啟動和管理 server 的協同程序
    async def main():
        task_list = []  # 存儲每一個 server.start() 返回的 task
        
        # 依次啟動每一個 server
        for server in servers:
            # server.start() 會返回一個 list: [ event.wait() ], event 是一個 asyncio.Event()
            task_list += await server.start()
        # 定義 eventloop 函數,是一個不斷迴圈的協同程序,不斷從 queue 接收 message
        async def eventloop():
            while True:
                # 每隔0.5秒執行一次
                await asyncio.sleep(0.5)
                try:
                    # 從 queue 中獲取 message
                    message = queue.get(block=False)
                except Empty:
                    # 如果 queue 為空,就跳過
                    pass
                else:
                    if message["command"] == "stop":
                        # 若 command 是 "stop",則結束整個程式
                        return
                    elif message["command"] == "set_data":
                            # 若 command 是 "set_data",則把 message 中的 data 設置到 servers 中相應的 serer
                        servers[message["server"]].data = message["data"]
        # 將 eventloop 加入 task_list 中
        task_list.append(eventloop())
        # 將 task_list 中的每一個 task 包裝為 asyncio.Task
        task_list = [
            t if isinstance(t, asyncio.Task) else asyncio.create_task(t) for t in tasks
        ]
        # 等待所有 task 完成
        finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in finished:
            e = task.exception()
            if e:
                # 如果 task 出現異常,則拋出異常
                raise e
    # 啟動asyncio 事件循環
    asyncio.run(main())
unittest:
# 定義要運行的模組
MODULES = ["test_module"]
# 繼承 unittest.IsolatedAsyncioTestCase 並實現 asyncio 的測試類
class TestCase(unittest.IsolatedAsyncioTestCase):
    # 在每次測試開始時運行
    async def asyncSetUp(self):
        # 建立一個 Queue,用來和運行在另一個 process 中的伺服器進行通訊
        self.queue = Queue()
        # 建立一個 Process,這個 process 會執行 run_server 函數
        self.process = Process(target=run_server, args=(self.queue, MODULES))
        # 啟動 process
        self.process.start()
        
    ...
        
    # 在每次測試結束時運行
    async def asyncTearDown(self):
        # 向 queue 中加入一個特殊的消息,通知運行在另一個 process 中的伺服器停止運行
        self.queue.put({"type": "stop"})
        # 等待 process 結束
        self.process.join()
然後想請問為什麼不論是unittest或是run_server()中的的操作都需用到asyncio?
有什麼必要性嗎?
這是因為 asyncio 是一個非同步 I/O 框架,它能夠支援多個任務同時執行並有效利用 CPU 時間。而 unittest 以及 run_server 都涉及到長時間等待或需要大量資源的操作,使用 asyncio 可以確保程式在這些操作過程中不會被阻塞,仍能夠做其他事情。因此在這樣的環境中使用 asyncio 是有必要的。