在我們之前的章節中,我們已經學會了如何使用 Background Task 來處理耗時的 AI 任務。但這引出了一個新的問題:前端的使用者該如何知道後端任務的執行進度?總不能讓使用者一直盯著一個不會動的載入圈圈,然後無止盡地等待吧。
從今天開始,我們將連續探討三種常見的解決方案,讓前端可以即時地追蹤後端的任務進度。今天要介紹的是最傳統、也最直觀的方法:長輪詢 (Long Polling)。
在介紹長輪詢之前,我們得先理解什麼是「輪詢」。
想像一下,你有一個朋友正在烤餅乾,你非常想知道餅乾烤好了沒。最簡單的方法就是,你每隔 30 秒就跑去問他:「餅乾好了嗎?」。這個「固定間隔重複詢問」的動作,就是輪詢。
轉換成 Web 開發的場景,就是客戶端 (Frontend) 每隔一個固定的時間(例如每 2 秒)就向伺服器 (Backend) 發送一個 HTTP Request,詢問「任務處理完了嗎?」。
這種做法雖然簡單,但缺點非常明顯:
為了解決傳統輪詢的缺點,長輪詢應運而生。
長輪詢的流程是這樣的:
這種做法的好處是,客戶端幾乎可以在任務完成的「當下」就收到通知,大大降低了延遲性。同時,因為沒有了那些無效的「還沒好」的空請求,也減少了網路流量的浪費。
讓我們來看一個非常簡單的 FastAPI 長輪詢概念範例。假設我們有一個全域變數來模擬任務的進度。
import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
# 模擬一個任務狀態的資料庫或快取
tasks_status = {
"task_123": {"status": "processing", "progress": 20}
}
@app.get("/task/{task_id}/status")
async def get_task_status(task_id: str):
"""
使用長輪詢來獲取任務狀態
"""
# 檢查任務是否存在
if task_id not in tasks_status:
return {"error": "Task not found"}
# 設定長輪詢的超時時間(30秒)
start_time = time.time()
timeout = 30.0
# 實際應用中,這裡會查詢資料庫或 Redis
current_progress = tasks_status.get(task_id, {}).get("progress", 0)
# 如果進度還沒到 100,就等待
while current_progress < 100:
# 檢查是否超時
if time.time() - start_time >= timeout:
return {"status": "processing", "progress": current_progress}
# 讓出執行權,每 0.5 秒檢查一次狀態
# 這可以避免 CPU 空轉,並允許伺服器處理其他請求
await asyncio.sleep(0.5)
# 重新從某處獲取任務進度
# 在這個範例中,我們需要另一個 API 來更新進度
current_progress = tasks_status.get(task_id, {}).get("progress", 0)
return {"status": "completed", "progress": 100}
# 另外需要一個 API 來模擬更新任務進度
@app.put("/task/{task_id}/progress")
async def update_task_progress(task_id: str, progress: int):
if task_id in tasks_status:
tasks_status[task_id]["progress"] = progress
return {"message": f"Task {task_id} progress updated to {progress}%"}
return {"message": "Task not found"}
當客戶端呼叫 /task/{task_id}/status
時,首先檢查任務是否存在。
如果進度不到 100%,while
迴圈會開始檢查進度,並透過 asyncio.sleep(0.5)
讓這個請求「暫停」並等待,同時不會阻塞整個伺服器。這就是長輪詢中「扣住」請求的核心。
加入了超時機制:如果 30 秒內任務還沒完成,會回傳當前進度,避免請求無限期等待。
在實際應用中,背景任務會不斷更新資料庫或快取中的進度,而長輪詢的端點則是不斷去讀取那個進度。
除了上面的做法之外,也可以用 asyncio.Event()
來實作:
import asyncio
from fastapi import FastAPI
app = FastAPI()
# 模擬一個任務狀態的資料庫或快取
tasks_status = {
"task_123": {"event": asyncio.Event(), "progress": 20}
}
@app.get("/task/{task_id}/status")
async def get_task_status(task_id: str):
"""
使用長輪詢來獲取任務狀態
"""
# 檢查任務是否存在
if task_id not in tasks_status:
return {"error": "Task not found"}
try:
# 等待任務完成的事件,設定超時時間為 30 秒
await asyncio.wait_for(tasks_status[task_id]["event"].wait(), timeout=30.0)
except asyncio.TimeoutError:
# 實際應用中,這裡會查詢資料庫或 Redis
current_progress = tasks_status.get(task_id, {}).get("progress", 0)
# 超時後返回當前進度
return {"status": "processing", "progress": current_progress}
return {"status": "completed", "progress": 100}
# 另外需要一個 API 來模擬更新任務進度
@app.put("/task/{task_id}/progress")
async def update_task_progress(task_id: str, progress: int):
if task_id in tasks_status:
tasks_status[task_id]["progress"] = progress
if progress == 100:
tasks_status[task_id]["event"].set()
return {"message": f"Task {task_id} progress updated to {progress}%"}
return {"message": "Task not found"}
這個範例使用了 asyncio.Event()
來取代第一個範例中的 while
迴圈不斷檢查的做法:
Event 機制的運作原理:
asyncio.Event()
是一個 asyncio 提供的事件通知機制,它有兩個重要的方法:
.wait()
:等待事件被觸發.set()
:觸發事件,讓所有等待的協程繼續執行長輪詢端點的流程:
/task/{task_id}/status
時,首先檢查任務是否存在await asyncio.wait_for(tasks_status[task_id]["event"].wait(), timeout=30.0)
asyncio.TimeoutError
,然後回傳當前進度進度更新端點的流程:
/task/{task_id}/progress
被呼叫時,更新進度tasks_status[task_id]["event"].set()
來觸發事件相比第一個範例的優勢:
while
迴圈長輪詢是一種基於傳統 HTTP 「請求-回應」模型,透過伺服器端「延遲回應」來實現的「類即時」通訊技術。它的優點是實作相對簡單,且幾乎相容於所有的網路環境與代理伺服器。然而,它的缺點也很明顯:每次更新都需要建立一次完整的 HTTP 連線,開銷較大;同時,伺服器需要為每個等待的客戶端維持一個掛起的請求,當客戶端數量多時,會消耗較多資源。
明天,我們將會介紹一個更強大、更有效率的雙向通訊技術:WebSocket。