這篇文章主要是在記錄,celery 的任務狀態以及該如何刪除在任務佇列中的任務
有問題歡迎留言討論喔!!
Celery 內部對於任務的狀態有 PENDING、STARTED、SUCCESS、FAILURE、RETRY、REVOKED,然而,筆者這邊不推薦直接讀取 Celery Backend 的狀態,因為不見得符合每個任務的需求,最好的方式是針對每個任務,撰寫一套任務的模組、狀態以及相對應的 CRUD,會更加體現不同任務的狀態,下面簡單針對各個狀態做一些定義
我們可以利用 celery 提供的控制模組來對任務進行控制,舉凡觀察、停止等,下面簡單介紹該如何將任務進行取消
import datetime
from celery import Celery
from setting import broker_url, backend_url
app = Celery("celery_start", broker=broker_url, backend=backend_url)
@app.task
def get_time(name: str):
return f"Hello {name}, today is {datetime.datetime.now().isoformat()}"
from task import get_time, app
# 送出任務
test_task = get_time.apply_async(("nick",), countdown=60)
import time
from task import get_time, app
from celery.app.control import Control
# 送出任務
test_task = get_time.apply_async(("nick",), countdown=60)
time.sleep(5)
control_obj = Control(app) # 記得要將 celery 物件帶入
control_obj.revoke(task_id=test_task.id)
可以看到 celery 在收到 revoke 指令後,會先將任務標記為 revoke,在 countdown or eta 到期後,才會正式將該任務刪除
若想直接刪除任務可以加上 terminate=True
參數,這個指令也可以強制關閉正在執行的任務,然而這個指令似乎只會在 linux 系統上產生作用,於 windows 上並沒有任何作用,若不幸的任務已經開始執行,只能等待任務執行完畢或手動強制關閉,才會停止
control_obj.revoke(task_id=test_task.id, terminate=True)