我有一個基於asyncio的程式,程式中有三個task各自功能如下:
task1: 隨機時間將"task1"字串堆入queue1
task2: 隨機時間將"task2"字串堆入queue2
task3: 使用await queue.get()等待直到queue1或queue2中任一個有資料就取出並印出來
task1跟task2已經寫好了,task3不知道怎麼寫,如果用
while True:
msg = await queue1.get()
print(msg)
msg = await queue2.get()
print(msg)
則其中一個queue要是空的就會卡住不檢查另一個,請問這種像案例要怎麼寫呢?
謝謝
import asyncio
from random import randint
import datetime
class EventQueue(asyncio.Queue):
def __init__(self, nextQueue=None) -> None:
super().__init__()
self.nextQueue = nextQueue
async def put(self, item):
await super().put(item)
await self.onPut()
async def onPut(self):
item = await self.get()
if not self.nextQueue:
# 負責處理最終工作
print(item)
else:
await self.nextQueue.put(f'{item}')
class Worker:
def __init__(self, queue:EventQueue, name:str) -> None:
self.queue = queue
self._working = False
self.name = name
async def run(self):
self._working = True
while self._working:
processTime = randint(3, 9)
await asyncio.sleep(processTime)
print(self.name, processTime)
await self.queue.put(datetime.datetime.now())
def cancel(self):
self._working = False
async def main():
q3 = EventQueue()
q1 = EventQueue(q3)
q2 = EventQueue(q3)
w1 = Worker(q1, "w1")
w2 = Worker(q2, "w2")
group = asyncio.gather(w1.run(), w2.run())
await group
asyncio.run(main())
這樣?
不保證thread-safe,以及其實沒有q3也行。
可以使用 asyncio 的 gather 函數解決這個問題:
async def task3():
while True:
done, pending = await asyncio.wait([queue1.get(), queue2.get()], return_when=asyncio.FIRST_COMPLETED)
for future in done:
msg = future.result()
print(msg)
gather 函數可以同時等待多個 Future 實例的完成,
並返回已完成的 Future 實例的集合。
參數 return_when=asyncio.FIRST_COMPLETED 表示等待直到有任意一個 Future 實例完成即可。