iT邦幫忙

2025 iThome 鐵人賽

DAY 21
0

在我們之前的章節中,我們已經學會了如何使用 Background Task 來處理耗時的 AI 任務。但這引出了一個新的問題:前端的使用者該如何知道後端任務的執行進度?總不能讓使用者一直盯著一個不會動的載入圈圈,然後無止盡地等待吧。

從今天開始,我們將連續探討三種常見的解決方案,讓前端可以即時地追蹤後端的任務進度。今天要介紹的是最傳統、也最直觀的方法:長輪詢 (Long Polling)。

什麼是輪詢 (Polling)?

在介紹長輪詢之前,我們得先理解什麼是「輪詢」。

想像一下,你有一個朋友正在烤餅乾,你非常想知道餅乾烤好了沒。最簡單的方法就是,你每隔 30 秒就跑去問他:「餅乾好了嗎?」。這個「固定間隔重複詢問」的動作,就是輪詢。

轉換成 Web 開發的場景,就是客戶端 (Frontend) 每隔一個固定的時間(例如每 2 秒)就向伺服器 (Backend) 發送一個 HTTP Request,詢問「任務處理完了嗎?」。

這種做法雖然簡單,但缺點非常明顯:

  1. 延遲性:如果任務在兩次輪詢之間就完成了,客戶端還是要等到下一次輪詢才會知道。
  2. 資源浪費:如果任務需要很長時間,絕大多數的輪詢請求都是無效的,因為伺服器只會回覆「還沒好」。這會產生大量的 HTTP 請求,浪費網路頻寬與伺服器資源。

長輪詢 (Long Polling) 如何改進?

為了解決傳統輪詢的缺點,長輪詢應運而生。

長輪詢的流程是這樣的:

  1. 客戶端發送一個請求給伺服器,詢問任務狀態。
  2. 伺服器如果發現任務還沒完成,它不會立刻回覆,而是會「扣住」這個請求不放。
  3. 伺服器會持續檢查任務狀態,直到任務完成(或是有進度更新),才會把最新的狀態回傳給客戶端。
  4. 客戶端收到回應後,立刻再發送一個新的長輪詢請求,重複這個循環。
  5. 如果請求在一定時間內(例如 30 秒)都沒有等到更新,伺服器會回覆一個 timeout 的訊息,客戶端收到後再重新發起請求。

這種做法的好處是,客戶端幾乎可以在任務完成的「當下」就收到通知,大大降低了延遲性。同時,因為沒有了那些無效的「還沒好」的空請求,也減少了網路流量的浪費。

FastAPI 基礎範例

讓我們來看一個非常簡單的 FastAPI 長輪詢概念範例。假設我們有一個全域變數來模擬任務的進度。

import asyncio
import time
from fastapi import FastAPI

app = FastAPI()

# 模擬一個任務狀態的資料庫或快取
tasks_status = {
    "task_123": {"status": "processing", "progress": 20}
}

@app.get("/task/{task_id}/status")
async def get_task_status(task_id: str):
    """
    使用長輪詢來獲取任務狀態
    """
    # 檢查任務是否存在
    if task_id not in tasks_status:
        return {"error": "Task not found"}
    
    # 設定長輪詢的超時時間(30秒)
    start_time = time.time()
    timeout = 30.0
    
    # 實際應用中,這裡會查詢資料庫或 Redis
    current_progress = tasks_status.get(task_id, {}).get("progress", 0)

    # 如果進度還沒到 100,就等待
    while current_progress < 100:
        # 檢查是否超時
        if time.time() - start_time >= timeout:
            return {"status": "processing", "progress": current_progress}
        
        # 讓出執行權,每 0.5 秒檢查一次狀態
        # 這可以避免 CPU 空轉,並允許伺服器處理其他請求
        await asyncio.sleep(0.5) 
        
        # 重新從某處獲取任務進度
        # 在這個範例中,我們需要另一個 API 來更新進度
        current_progress = tasks_status.get(task_id, {}).get("progress", 0)

    return {"status": "completed", "progress": 100}

# 另外需要一個 API 來模擬更新任務進度
@app.put("/task/{task_id}/progress")
async def update_task_progress(task_id: str, progress: int):
    if task_id in tasks_status:
        tasks_status[task_id]["progress"] = progress
        return {"message": f"Task {task_id} progress updated to {progress}%"}
    return {"message": "Task not found"}

程式碼說明:

  • 當客戶端呼叫 /task/{task_id}/status 時,首先檢查任務是否存在。

  • 如果進度不到 100%,while 迴圈會開始檢查進度,並透過 asyncio.sleep(0.5) 讓這個請求「暫停」並等待,同時不會阻塞整個伺服器。這就是長輪詢中「扣住」請求的核心。

  • 加入了超時機制:如果 30 秒內任務還沒完成,會回傳當前進度,避免請求無限期等待。

  • 在實際應用中,背景任務會不斷更新資料庫或快取中的進度,而長輪詢的端點則是不斷去讀取那個進度。

另一個範例

除了上面的做法之外,也可以用 asyncio.Event() 來實作:

import asyncio
from fastapi import FastAPI

app = FastAPI()

# 模擬一個任務狀態的資料庫或快取
tasks_status = {
    "task_123": {"event": asyncio.Event(), "progress": 20}
}

@app.get("/task/{task_id}/status")
async def get_task_status(task_id: str):
    """
    使用長輪詢來獲取任務狀態
    """
    # 檢查任務是否存在
    if task_id not in tasks_status:
        return {"error": "Task not found"}
    
    try:
        # 等待任務完成的事件,設定超時時間為 30 秒
        await asyncio.wait_for(tasks_status[task_id]["event"].wait(), timeout=30.0)
    except asyncio.TimeoutError:
        # 實際應用中,這裡會查詢資料庫或 Redis
        current_progress = tasks_status.get(task_id, {}).get("progress", 0)

        # 超時後返回當前進度
        return {"status": "processing", "progress": current_progress}

    return {"status": "completed", "progress": 100}

# 另外需要一個 API 來模擬更新任務進度
@app.put("/task/{task_id}/progress")
async def update_task_progress(task_id: str, progress: int):
    if task_id in tasks_status:
        tasks_status[task_id]["progress"] = progress
        if progress == 100:
            tasks_status[task_id]["event"].set()
        return {"message": f"Task {task_id} progress updated to {progress}%"}
    return {"message": "Task not found"}

程式碼說明:

這個範例使用了 asyncio.Event() 來取代第一個範例中的 while 迴圈不斷檢查的做法:

Event 機制的運作原理:

  • asyncio.Event() 是一個 asyncio 提供的事件通知機制,它有兩個重要的方法:
    • .wait():等待事件被觸發
    • .set():觸發事件,讓所有等待的協程繼續執行

長輪詢端點的流程:

  1. 當客戶端呼叫 /task/{task_id}/status 時,首先檢查任務是否存在
  2. 然後執行 await asyncio.wait_for(tasks_status[task_id]["event"].wait(), timeout=30.0)
  3. 這行程式會讓請求「掛在那裡」等待事件觸發,最長等待 30 秒
  4. 如果在 30 秒內事件被觸發(任務完成),就會回傳完成狀態
  5. 如果 30 秒過了事件還沒被觸發,會拋出 asyncio.TimeoutError,然後回傳當前進度

進度更新端點的流程:

  1. /task/{task_id}/progress 被呼叫時,更新進度
  2. 如果進度達到 100%,就呼叫 tasks_status[task_id]["event"].set() 來觸發事件
  3. 這會立刻喚醒所有正在等待這個事件的長輪詢請求

相比第一個範例的優勢:

  • 更有效率:不需要每 0.5 秒檢查一次狀態,而是「一次到位」
  • 更即時:任務完成的瞬間就能立刻通知等待中的客戶端
  • 更節省 CPU:不會有空轉的 while 迴圈

小結

長輪詢是一種基於傳統 HTTP 「請求-回應」模型,透過伺服器端「延遲回應」來實現的「類即時」通訊技術。它的優點是實作相對簡單,且幾乎相容於所有的網路環境與代理伺服器。然而,它的缺點也很明顯:每次更新都需要建立一次完整的 HTTP 連線,開銷較大;同時,伺服器需要為每個等待的客戶端維持一個掛起的請求,當客戶端數量多時,會消耗較多資源。

明天,我們將會介紹一個更強大、更有效率的雙向通訊技術:WebSocket。


上一篇
[Day 20] 錯誤處理 (二):進階
下一篇
[Day 22] 進度追蹤 (二):WebSocket
系列文
用 FastAPI 打造你的 AI 服務22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言