Celery 提供任務鏈結的功能,字面上的意思,就是將任務一個一個串聯在一起,下面的敘述 or 範例如果有錯誤,歡迎留言討論!!
from celery import Celery
from datetime import datetime
app = Celery("task", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
@app.task
def hello_world(name: str):
if name:
return f"hello {name}"
elif not name:
raise ValueError("請輸入名字")
@app.task
def show_time(name: str):
return f"{name} 您好,現在時間為 {datetime.now()}"
我們可以透過 apply_async 在送出任務的時候,使用 link 參數來進行任務的串聯
from task import hello_world, show_time
res = hello_world.apply_async(("nick",), link=show_time.s())
可以看到在 link 後面我們接了一個 show_time 的任務,其中 show_time 任務會需要一個名為 name 的參數,celery 會自動將 hello_world 這個任務執行完 return 的結果自動帶入到後面的任務中,下圖為執行結果
這邊要特別注意的是,只要前一個任務有回傳資料,後面一個任務若要再輸入參數的話,就必須在第二個任務當中多增加一個接收參數,否則會跳錯
我們可以看到下面這段程式,在 link 任務的時候有多輸入了一個參數,但此時我們還沒將 show_time 做更改,因此會跳出錯誤訊息告知你多了一個參數
from task import hello_world, show_time
res = hello_world.apply_async(("nick",), link=show_time.s("andy"))
我們簡單將 show_time 做一下更改,並重啟 worker,然後再利用剛剛那段程式送一次任務
注意: 任務有任何的修改,worker 都需要重新啟動,才會正常被讀取修改後的任務
@app.task
def show_time(name: str, name_2: str):
return f"{name} 以及 {name_2} 您好,現在時間為 {datetime.now()}"
link 除了接收單個 task 外,也接受 list 型態,可以將多個任務利用 list 串聯,我們先多建立一個任務,另外要注意的是,如果前一個任務有 return 值回 worker,celery 一定會將該 value 帶入下一個 link task 當中,因此若確定下一個任務沒有要接收任何參數,還是加上 *args or **kwargs
來接收,避免跳錯
@app.task
def show_message(*args, **kwargs):
return "非常歡迎您使用 Celery"
利用下面的程式送出任務
from task import hello_world, show_time, show_message
task_list = [show_time.s("andy"), show_message.s()]
res = hello_world.apply_async(("nick",), link=task_list)
下圖為測試結果
在 celery 當中也有提供 link_error 來做為當錯誤發生時,該進行的任務,該參數使用方式同 link,但筆者在進行測試的時候發現有問題,有時候會跳出來,有時候則不會,或是會產生一些奇怪的錯誤,上網搜尋了一下,發現好像是已知問題,若後續官方有將此 bug 修復或是有查到其他資料,會在上來補充,如果有知道的大神希望能留言幫小弟補充一下,會再將文章修改,非常感謝!!