iT邦幫忙

0

幾段python的code看不懂 (使用asyncio和queue)

  • 分享至 

  • xImage

各位熱心的大神好,不知各位有沒有用過python的asyncio,下面這些我不確定有沒有弄懂它的意思:

import unittest
import asyncio
from multiprocessing import Process, Queue

def run_server(queue, modules):
    async def main():
        task_list = []
        for server in servers:
            # 每一個 server.start() 會 return 一個 list: [ event.wait() ], event 是一個asyncio.Event()
            task_list += await server.start()

        async def eventloop():
            while True:
                await asyncio.sleep(0.5)
                try:
                # message 的格式是 {"command":..., "server":..., "data":...}
                    message = queue.get(block=False)
                except Empty:
                    pass
                else:
                    if message["command"] == "stop":
                        return
                    elif message["command"] == "set_data":
                        servers[message["server"]].data = message["data"]

        task_list.append(eventloop())
        task_list = [
            t if isinstance(t, asyncio.Task) else asyncio.create_task(t) for t in tasks
        ]

        finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in finished:
            e = task.exception()
            if e:
                raise e

    asyncio.run(main())

然後這個run_server函數在unittest中是這樣執行的:

MODULES = ["test_module"]

class TestCase(unittest.IsolatedAsyncioTestCase):
    async def asyncSetUp(self):
        self.queue = Queue()
        self.process = Process(target=run_server, args=(self.queue, MODULES))
        self.process.start()
        
    ...
        
    async def asyncTearDown(self):
        self.queue.put({"type": "stop"})
        self.process.join()

我的理解是這樣,在run_server()中我們將servers中的server放入task_list,另外也將event_loop()放入task_list中,每隔段0.5秒的時間就判斷queue裡面有沒有message。沒有message的話,則停留在eventloop()的while loop中。如果有message,則看看要根據message內容做什麼操作。

而run_server()在unittest的class中是以Process來執行,我們可以一面執行run_server,一面做別的事(比如在queue裡面放入新的message,跑其它testcase)。

最後等到unittest中所有的case都跑完,在asyncTearDown中,我們在self.queue中放入{"type":"stop"}來停止run_server中的eventloop(),並用self.process.join()停止所有process。

不知道我上述的理解正不正確。然後想請問為什麼不論是unittest或是run_server()中的的操作都需用到asyncio? 有什麼必要性嗎?

自己對python的asyncio一直不太能理解,有點像無頭蒼蠅在亂問一通了,感謝各位看完我的問題。

froce iT邦大師 1 級 ‧ 2023-02-05 11:29:16 檢舉
> 不知道我上述的理解正不正確

應該大致正確

asyncio/coroutine 是為了效能,網路存取算是慢的io bound,coroutine 會在函式中有await的地方,在等待io bound時先將cpu交還其他程式碼做切換,來提高cpu利用率。
但coroutine 基本上是在單一process上運行的,加上測試本身發送的各種命令不需要互相交互,為了能利用你電腦硬體的完整能力,所以會用process搭配asyncio。

https://stackoverflow.com/questions/27435284/multiprocessing-vs-multithreading-vs-asyncio
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 個回答

0
JamesDoge
iT邦高手 1 級 ‧ 2023-02-06 08:22:27
最佳解答

加上註解

import unittest
import asyncio
from multiprocessing import Process, Queue

# 啟動 server 並進行必要的操作
def run_server(queue, modules):
    # 定義 main 函數,是真正負責啟動和管理 server 的協同程序
    async def main():
        task_list = []  # 存儲每一個 server.start() 返回的 task
        
        # 依次啟動每一個 server
        for server in servers:
            # server.start() 會返回一個 list: [ event.wait() ], event 是一個 asyncio.Event()
            task_list += await server.start()

        # 定義 eventloop 函數,是一個不斷迴圈的協同程序,不斷從 queue 接收 message
        async def eventloop():
            while True:
                # 每隔0.5秒執行一次
                await asyncio.sleep(0.5)
                try:
                    # 從 queue 中獲取 message
                    message = queue.get(block=False)
                except Empty:
                    # 如果 queue 為空,就跳過
                    pass
                else:
                    if message["command"] == "stop":
                        # 若 command 是 "stop",則結束整個程式
                        return
                    elif message["command"] == "set_data":
                            # 若 command 是 "set_data",則把 message 中的 data 設置到 servers 中相應的 serer
                        servers[message["server"]].data = message["data"]

        # 將 eventloop 加入 task_list 中
        task_list.append(eventloop())
        # 將 task_list 中的每一個 task 包裝為 asyncio.Task
        task_list = [
            t if isinstance(t, asyncio.Task) else asyncio.create_task(t) for t in tasks
        ]

        # 等待所有 task 完成
        finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in finished:
            e = task.exception()
            if e:
                # 如果 task 出現異常,則拋出異常
                raise e

    # 啟動asyncio 事件循環
    asyncio.run(main())

unittest:

# 定義要運行的模組
MODULES = ["test_module"]

# 繼承 unittest.IsolatedAsyncioTestCase 並實現 asyncio 的測試類
class TestCase(unittest.IsolatedAsyncioTestCase):
    # 在每次測試開始時運行
    async def asyncSetUp(self):
        # 建立一個 Queue,用來和運行在另一個 process 中的伺服器進行通訊
        self.queue = Queue()
        # 建立一個 Process,這個 process 會執行 run_server 函數
        self.process = Process(target=run_server, args=(self.queue, MODULES))
        # 啟動 process
        self.process.start()
        
    ...
        
    # 在每次測試結束時運行
    async def asyncTearDown(self):
        # 向 queue 中加入一個特殊的消息,通知運行在另一個 process 中的伺服器停止運行
        self.queue.put({"type": "stop"})
        # 等待 process 結束
        self.process.join()

然後想請問為什麼不論是unittest或是run_server()中的的操作都需用到asyncio?
有什麼必要性嗎?

這是因為 asyncio 是一個非同步 I/O 框架,它能夠支援多個任務同時執行並有效利用 CPU 時間。而 unittest 以及 run_server 都涉及到長時間等待或需要大量資源的操作,使用 asyncio 可以確保程式在這些操作過程中不會被阻塞,仍能夠做其他事情。因此在這樣的環境中使用 asyncio 是有必要的。

ffaanngg iT邦新手 5 級 ‧ 2023-03-15 16:30:34 檢舉

謝謝,抱歉忘了回覆你

我要發表回答

立即登入回答