本篇內容主要在討論,該如何讓 celery 在指定時間執行任務
過程中如果有錯誤,歡迎留言討論喔 ~
from celery import Celery
from setting import broker_url, backend_url
from datetime import datetime
app = Celery("celery_start", broker=broker_url, backend=backend_url)
@app.task
def get_time(name: str):
now = datetime.now()
return f"Hello {name} 現在時間為 {now.strftime('%Y-%m-%d, %H:%M:%S')}"
countdown 參數單位為秒,使用這個參數可以讓 celery worker 在收到任務後
過了指定單位的秒數再執行
get_time.apply_async(("nick",), countdown=20)
下圖中我們可以看到 celery 在收到任務後,過了大約 20 秒才執行任務
eta 參數接收的資料型態須為 datetime 物件,但要特別注意,celery 內部所接收的時間
是 UTC 標準時間,雖然說 celery 本身有提供一些 config 可以讓使用者作設定,但是對
於時區的部分似乎是沒有效果,因此如果要利用 eta 的話我們必須使用 pytz 這個套件對
時間做轉換,下方為 pytz 的範例
from celery_schedule import get_time
from datetime import datetime, timedelta
import pytz
# 先設定時區
local_timezone = pytz.timezone("Asia/Taipei")
# 產生一筆時間物件,並計算想要甚麼時候執行任務
# 也可以直接利用 datetime 直接指定一個日期
now = datetime.now()
exec_time = now + timedelta(seconds=10)
# 利用 pytz 進行轉換
exec_time = local_timezone.localize(exec_time)
# 送出任務
get_time.apply_async(("nick",), eta=exec_time)
如下圖所示,可以看到大約在 10 秒鐘後任務就會被 celry worker 執行
expries 參數可以理解程,任務的到期時間,也就是這個任務最晚甚麼時候要被執行,
前面有提到,celery 可以自動將任務進行排隊,由於 worker 的 process 的數量有限
如果一次送太多任務,任務就必須進行排隊,因此可以透過設定這個參數來告訴 celery
,該任務最晚必須於甚麼時候執行
附註: 若 celery 收到 expries 已經過期的任務時,則 celery 會將任務狀態標記為 revoke,代表不會去執行這個任務,任務狀態相關的部分會於下一篇文章進行解說
附註: expries 可以接收的資料型態為 int (用於指定秒數) 以及 datetime (用於指定日期)
from celery_schedule import get_time
from datetime import datetime, timedelta
import pytz
# 先設定時區
local_timezone = pytz.timezone("Asia/Taipei")
# 產生一筆時間物件,並計算想要甚麼時候執行任務
# 也可以直接利用 datetime 直接指定一個日期
now = datetime.now()
exec_time = now + timedelta(seconds=10)
# 利用 pytz 進行轉換
exec_time = local_timezone.localize(exec_time)
# 送出任務
get_time.apply_async(("nick",), expries=exec_time)
如下圖所示,可以看到 celery 在收到任務後馬上執行,因為版主只有送了一個任務進入列隊
redis 對於長時間 eta 的處理不太優,常常會產生非預期的問題,例如重複派送任務等,可以看看這篇文章
若遇上此問題,可以考慮將 broker 換成 rabbitmq 或是利用 celery beat 重複檢查看看是否有任務到期
若到期再用 delay 送進任務列隊即可