iT邦幫忙

0

[Python] 等待多個asyncio.qurue

  • 分享至 

  • xImage

我有一個基於asyncio的程式,程式中有三個task各自功能如下:

task1: 隨機時間將"task1"字串堆入queue1
task2: 隨機時間將"task2"字串堆入queue2
task3: 使用await queue.get()等待直到queue1queue2中任一個有資料就取出並印出來

task1跟task2已經寫好了,task3不知道怎麼寫,如果用

while True:
    msg = await queue1.get()
    print(msg)
    msg = await queue2.get()
    print(msg)

則其中一個queue要是空的就會卡住不檢查另一個,請問這種像案例要怎麼寫呢?

謝謝

johncoc iT邦新手 3 級 ‧ 2023-02-01 17:06:45 檢舉
這應該要用多執行序吧
akitect iT邦新手 5 級 ‧ 2023-02-01 21:29:11 檢舉
參考看看
https://stackoverflow.com/questions/19130986/python-equivalent-of-golangs-select-on-channels
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

2 個回答

不明
【**此則訊息已被站方移除**】
0
froce
iT邦大師 1 級 ‧ 2023-02-02 12:07:35
import asyncio
from random import randint
import datetime

class EventQueue(asyncio.Queue):
    def __init__(self, nextQueue=None) -> None:
        super().__init__()
        self.nextQueue = nextQueue
        
    async def put(self, item):
        await super().put(item)
        await self.onPut()
        
    async def onPut(self):
        item = await self.get()
        if not self.nextQueue:
            # 負責處理最終工作
            print(item)
        else:
            await self.nextQueue.put(f'{item}')
            
            
class Worker:
    def __init__(self, queue:EventQueue, name:str) -> None:
        self.queue = queue
        self._working = False
        self.name = name
        
    async def run(self):
        self._working = True
        while self._working:
            processTime = randint(3, 9)
            await asyncio.sleep(processTime)
            print(self.name, processTime)
            await self.queue.put(datetime.datetime.now())
            
    def cancel(self):
        self._working = False
            
    

async def main():
    q3 = EventQueue()
    q1 = EventQueue(q3)
    q2 = EventQueue(q3)

    w1 = Worker(q1, "w1")
    w2 = Worker(q2, "w2")
    group = asyncio.gather(w1.run(), w2.run())
    await group

asyncio.run(main())

這樣?
不保證thread-safe,以及其實沒有q3也行。

1
JamesDoge
iT邦高手 1 級 ‧ 2023-02-05 07:56:13

可以使用 asyncio 的 gather 函數解決這個問題:

async def task3():
    while True:
        done, pending = await asyncio.wait([queue1.get(), queue2.get()], return_when=asyncio.FIRST_COMPLETED)
        for future in done:
            msg = future.result()
            print(msg)

gather 函數可以同時等待多個 Future 實例的完成,
並返回已完成的 Future 實例的集合。
參數 return_when=asyncio.FIRST_COMPLETED 表示等待直到有任意一個 Future 實例完成即可。

我要發表回答

立即登入回答