在上一篇觀念文章中,我們理解了為何需要一個專業的任務佇列系統來保證背景任務的可靠性與擴展性。今天,我們就將理論付諸實踐,動手打造一個由 FastAPI、Celery 和 Redis 組成的背景任務處理系統。
想特別說明一下,這篇文章將以簡單易懂的範例為主,重點在於快速上手和理解基本架構。Celery 和 Redis 實際上擁有豐富的進階功能,包括任務路由、失敗重試機制、任務優先級設定、集群部署、監控與管理等,這些進階主題不在本篇討論範圍內。
我們的目標是建立這樣一個流程:
首先,你需要安裝必要的套件,並確保你的機器上已經安裝並啟動了 Redis 服務。
# 安裝 celery,並指定使用 redis 作為 broker
pip install "celery[redis]"
pip install eventlet
eventlet
的部分後面會進行說明。
這邊就直接用 Docker 建立 Redis 服務就好:
docker run -d -p 6379:6379 redis
接著,我們規劃一個簡單的專案結構:
/myproject
├── main.py # FastAPI 應用
├── tasks.py # Celery 設定與任務定義
tasks.py
)這個檔案是 Celery 的核心,我們在這裡定義 Celery 實例和所有背景任務。
# tasks.py
import time
from celery import Celery
app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@app.task
def add(x, y):
print("Starting a long-running task...")
time.sleep(10) # 模擬耗時操作(同步阻塞,但只會阻塞這個 worker)
print("Task completed.")
return x + y
main.py
)FastAPI 應用現在只需要負責呼叫任務即可。
# main.py
from fastapi import FastAPI
from tasks import add, app as celery_app
app = FastAPI()
@app.post("/task/{x}/{y}")
def create_task(x: int, y: int):
task = add.delay(x, y)
return {"task_id": task.id}
@app.get("/task-status/{task_id}")
def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
if task.state == "PENDING":
return {"status": "pending"}
elif task.state == "SUCCESS":
return {"status": "completed", "result": task.result}
else:
return {"status": "failed", "error": task.info}
我知道這 2 個路由設計得有點醜,但我一時之間沒有想到什麼簡單好用的例子,還請大家先將就一下XD
現在,最關鍵的一步來了。我們需要分別啟動三個元件:
請確保你的 Redis 伺服器正在運行
在專案根目錄下,打開一個新的終端機,執行以下指令:
# -A 指定 Celery 應用實例的位置 (模組名稱)
# worker 表示啟動工作者模式
# --loglevel=info 顯示詳細日誌
celery -A tasks worker --pool=eventlet --loglevel=info
你會看到 Celery Worker 啟動成功並顯示 Ready
狀態。
Windows 系統注意事項:
在 Windows 系統中,如果只執行
celery -A tasks worker --loglevel=info
,可能會遇到PermissionError
或其他 Windows 特定的錯誤。解決方案:
--pool=solo
:單一行程模式,適合開發測試,但無法並行處理多個任務--pool=eventlet
或--pool=gevent
:使用協程池,可以並行處理多個 I/O 密集型任務- 對於 CPU 密集型任務,建議在 Linux 環境下使用預設的
prefork
池這裡使用
eventlet
是因為它在 Windows 上相容性較好,且適合大部分的 Web 應用場景。
再打開一個終端機,用我們熟悉的方式啟動 FastAPI:
fastapi dev main.py
現在你可以測試整個系統了:
發起任務:發送 POST
請求到 http://127.0.0.1:8000/task/1/2
查詢任務狀態:使用回傳的 task_id 發送 GET
請求到 http://127.0.0.1:8000/task-status/{task_id}
在 Celery Worker 的終端機視窗中,你會看到它接收到任務並在 10 秒後完成的詳細日誌。
這邊附上一張 10 秒內發出 3 個不同任務的截圖:
根據 log 時間可以看到這三個任務是並行處理的~
今天我們搭建了一個簡單的背景任務處理系統範例。雖然初始設定比 BackgroundTasks
複雜許多,但換來的是更高的可靠性、擴展性與系統彈性,這對於任何嚴肅的 Web 應用都是不可或缺的一環。
當然,這只是一個簡單範例,實際上要考慮的東西還有很多 (e.g. 任務失敗處理),Celery 和 Redis 的功能也遠比我們今天展示的豐富,有興趣的讀者可以進一步探索官方文檔中的進階配置。