iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
Software Development

用 FastAPI 打造你的 AI 服務系列 第 18

[Day 18] 任務管理 (四):任務佇列範例

  • 分享至 

  • xImage
  •  

在上一篇觀念文章中,我們理解了為何需要一個專業的任務佇列系統來保證背景任務的可靠性與擴展性。今天,我們就將理論付諸實踐,動手打造一個由 FastAPI、Celery 和 Redis 組成的背景任務處理系統。

想特別說明一下,這篇文章將以簡單易懂的範例為主,重點在於快速上手和理解基本架構。Celery 和 Redis 實際上擁有豐富的進階功能,包括任務路由、失敗重試機制、任務優先級設定、集群部署、監控與管理等,這些進階主題不在本篇討論範圍內。

系統架構回顧

我們的目標是建立這樣一個流程:

  1. FastAPI 接收到 HTTP 請求,作為 Producer 將一個耗時任務發布出去。
  2. Redis 作為 Broker,接收並儲存這個任務訊息。
  3. Celery Worker 作為 Consumer,從 Redis 獲取任務並在背景執行。

環境設定

安裝套件

首先,你需要安裝必要的套件,並確保你的機器上已經安裝並啟動了 Redis 服務。

# 安裝 celery,並指定使用 redis 作為 broker
pip install "celery[redis]"

pip install eventlet

eventlet 的部分後面會進行說明。

啟用 Redis

這邊就直接用 Docker 建立 Redis 服務就好:

docker run -d -p 6379:6379 redis

接著,我們規劃一個簡單的專案結構:

/myproject
├── main.py       # FastAPI 應用
├── tasks.py      # Celery 設定與任務定義

撰寫程式碼

1. 設定 Celery (tasks.py)

這個檔案是 Celery 的核心,我們在這裡定義 Celery 實例和所有背景任務。

# tasks.py

import time
from celery import Celery

app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@app.task
def add(x, y):
    print("Starting a long-running task...")
    time.sleep(10)  # 模擬耗時操作(同步阻塞,但只會阻塞這個 worker)
    print("Task completed.")
    return x + y

2. 在 FastAPI 中發布任務 (main.py)

FastAPI 應用現在只需要負責呼叫任務即可。

# main.py

from fastapi import FastAPI
from tasks import add, app as celery_app

app = FastAPI()

@app.post("/task/{x}/{y}")
def create_task(x: int, y: int):
    task = add.delay(x, y)
    return {"task_id": task.id}


@app.get("/task-status/{task_id}")
def get_task_status(task_id: str):
    task = celery_app.AsyncResult(task_id)

    if task.state == "PENDING":
        return {"status": "pending"}
    elif task.state == "SUCCESS":
        return {"status": "completed", "result": task.result}
    else:
        return {"status": "failed", "error": task.info}

我知道這 2 個路由設計得有點醜,但我一時之間沒有想到什麼簡單好用的例子,還請大家先將就一下XD

啟動整個系統

現在,最關鍵的一步來了。我們需要分別啟動三個元件:

1. 啟動 Redis (Broker)

請確保你的 Redis 伺服器正在運行

2. 啟動 Celery Worker (Consumer)

在專案根目錄下,打開一個新的終端機,執行以下指令:

# -A 指定 Celery 應用實例的位置 (模組名稱)
# worker 表示啟動工作者模式
# --loglevel=info 顯示詳細日誌
celery -A tasks worker --pool=eventlet --loglevel=info

你會看到 Celery Worker 啟動成功並顯示 Ready 狀態。

Windows 系統注意事項

在 Windows 系統中,如果只執行 celery -A tasks worker --loglevel=info,可能會遇到 PermissionError 或其他 Windows 特定的錯誤。

解決方案

  • --pool=solo:單一行程模式,適合開發測試,但無法並行處理多個任務
  • --pool=eventlet--pool=gevent:使用協程池,可以並行處理多個 I/O 密集型任務
  • 對於 CPU 密集型任務,建議在 Linux 環境下使用預設的 prefork

這裡使用 eventlet 是因為它在 Windows 上相容性較好,且適合大部分的 Web 應用場景。

3. 啟動 FastAPI (Producer)

再打開一個終端機,用我們熟悉的方式啟動 FastAPI:

fastapi dev main.py

測試系統

現在你可以測試整個系統了:

  1. 發起任務:發送 POST 請求到 http://127.0.0.1:8000/task/1/2

    • 系統會立刻回應任務ID和狀態
    • Celery Worker 會在背景開始處理任務
  2. 查詢任務狀態:使用回傳的 task_id 發送 GET 請求到 http://127.0.0.1:8000/task-status/{task_id}

    • 可以即時查看任務執行狀態
    • 任務完成後可以獲取結果

在 Celery Worker 的終端機視窗中,你會看到它接收到任務並在 10 秒後完成的詳細日誌。

這邊附上一張 10 秒內發出 3 個不同任務的截圖:

根據 log 時間可以看到這三個任務是並行處理的~

小結

今天我們搭建了一個簡單的背景任務處理系統範例。雖然初始設定比 BackgroundTasks 複雜許多,但換來的是更高的可靠性、擴展性與系統彈性,這對於任何嚴肅的 Web 應用都是不可或缺的一環。

當然,這只是一個簡單範例,實際上要考慮的東西還有很多 (e.g. 任務失敗處理),Celery 和 Redis 的功能也遠比我們今天展示的豐富,有興趣的讀者可以進一步探索官方文檔中的進階配置。


上一篇
[Day 17] 任務管理 (三):任務佇列 (Task Queue)
下一篇
[Day 19] 錯誤處理 (一):基礎
系列文
用 FastAPI 打造你的 AI 服務22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言