iT邦幫忙

2025 iThome 鐵人賽

DAY 22
0

昨天我們探討了如何使用長輪詢 (Long Polling) 來追蹤背景任務進度。長輪詢雖然改善了傳統輪詢的缺點,但它本質上仍然是基於「一問一答」的 HTTP 模型,每次通訊都免不了建立連線的開銷。

如果我們的 AI 應用需要非常頻繁、低延遲的雙向溝通(例如:即時的 Log 輸出、互動式 AI 聊天機器人),那麼長輪詢可能就不敷使用了。今天,我們要介紹一個為此而生的技術:WebSocket。

之前我也有寫過一篇 WebSocket 介紹文章,有興趣的話歡迎各位去看看 XD

什麼是 WebSocket?

WebSocket 是一種在單一 TCP 連線上進行全雙工 (Full-duplex) 通訊的協定。讓我們拆解一下這個定義:

  • 單一 TCP 連線:客戶端與伺服器之間只需要建立一次連線,之後就可以一直使用這個連線來傳輸資料,直到其中一方關閉為止。這大大減少了 HTTP 那樣每次請求都要重新交握的延遲與開銷。
  • 全雙工通訊:這代表客戶端和伺服器可以同時互相發送訊息,沒有誰先誰後的問題。這就像打電話一樣,雙方可以同時說話;而 HTTP 則像寫信,必須等待對方回信後才能再寄下一封。

這個特性使得 WebSocket 非常適合需要高即時性與互動性的場景。

WebSocket 的運作流程

  1. 交握 (Handshake):WebSocket 的連線是從一個看起來像普通 HTTP GET 請求的「升級 (Upgrade)」請求開始的。客戶端會發送一個帶有特殊標頭 (Header) 的請求,告訴伺服器:「嗨,我想把這個 HTTP 連線升級成 WebSocket 連線」。
  2. 建立連線:如果伺服器支援並同意,它會回覆一個特定的狀態碼 (101 Switching Protocols),表示「好的,我們現在開始用 WebSocket 協定通訊」。
  3. 數據傳輸:一旦連線建立,這個 TCP 通道就打開了。之後,雙方就可以自由地、非同步地向對方發送「訊息幀 (Frame)」,直到連線被關閉。

FastAPI 中的 WebSocket 基礎

FastAPI 對 WebSocket 提供了非常好的支援,讓實作變得非常簡單。讓我們來看一個極簡單的範例:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message text was: {data}")

程式碼說明:

  • 我們使用 @app.websocket(...) 而不是 @app.get(...) 來定義 WebSocket 端點。
  • 函式參數中,我們宣告了一個 WebSocket 型別的參數,FastAPI 會自動注入這個物件,讓我們可以與客戶端互動。
  • await websocket.accept() 是必須的第一步,用來完成與客戶端的交握。
  • while True 建立一個無窮迴圈來持續處理訊息。
  • receive_text() 用來接收客戶端發送的文字訊息。
  • send_text() 用來向客戶端發送文字訊息。

測試

如果想要測試 WebSocket,可以使用 Postman。

首先,要選擇建立新的 WebSocket (預設應該都是 HTTP Request)

接著,在 url 的地方輸入 ws://127.0.0.1:8000/ws,再點擊右邊的「Connect」按鈕就可以連線了 (連線後,按鈕變成「Disconnect」)。接著,可以試著在下方輸入訊息,並點擊「Send」按鈕送出,就可以更下方看到送出的訊息以及接收到的訊息。

用 WebSocket 通知任務進度

接下來,來看一個稍微複雜一點的例子:有一個 API 可以啟動背景任務,而背景任務在執行時會透過 WebSocket 傳送任務進度。

WebSocket 連線管理

為了能夠讓其他地方也可以使用 WebSocket 物件,我們需要一個物件來統一管理:

from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                pass

manager = ConnectionManager()

設定 API 路由與背景任務

接下來就是設定 API 路由與背景任務:

import asyncio
import logging
from datetime import datetime
from fastapi import FastAPI, BackgroundTasks

# 設定 logger
logger = logging.getLogger("uvicorn")

app = FastAPI()

# 背景任務函數
async def run_background_task(task_id: str):
    """執行背景任務:每秒完成 10%,共 10 秒"""
    for progress in range(10, 110, 10):  # 10, 20, 30, ... 100
        await asyncio.sleep(1)  # 等待 1 秒
        
        # 透過 WebSocket 廣播進度
        message = f"任務 {task_id} 進度: {progress}%"
        await manager.broadcast(message)
        
        logger.info(message)
    
    # 任務完成通知
    completion_message = f"任務 {task_id} 已完成!"
    await manager.broadcast(completion_message)
    logger.info(completion_message)


@app.post("/start-task")
async def start_task(background_tasks: BackgroundTasks):
    """啟動背景任務並回傳任務 ID"""
    task_id = "TASK-001"  # 固定任務 ID
    
    # 使用 FastAPI BackgroundTasks 啟動背景任務
    background_tasks.add_task(run_background_task, task_id)
    
    current_time = datetime.now().strftime("%H:%M:%S")
    logger.info(f"[{current_time}] 啟動背景任務: {task_id}")
    
    return {"task_id": task_id, "message": "背景任務已啟動"}


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket 端點,用於接收任務進度更新"""
    await manager.connect(websocket)
    try:
        await websocket.send_text("WebSocket 連線已建立,等待任務進度更新...")
        
        # 保持連線
        while True:
            await websocket.receive_text()
    except Exception:
        manager.disconnect(websocket)

最後,執行起來會長這樣:

可以看到,隨著任務的進行,Postman 的 WebSocket 連線都有隨時收到任務進度更新~

小結

WebSocket 提供了一個真正意義上的「持續性連線」,實現了低延遲、高效率的雙向通訊。它非常適合需要即時互動的 AI 應用,例如:

  • 即時顯示 AI 模型推理的 Log。
  • 串流式地回傳大型語言模型 (LLM) 生成的文字。
  • 建立一個可以與後端 AI Agent 即時對話的聊天室。

然而,WebSocket 也有一些需要考慮的地方。它的狀態管理比無狀態的 HTTP 複雜,且在某些較嚴格的網路環境(例如某些企業防火牆)下可能會被阻擋。

明天,我們將介紹最後一種技術:Server-Sent Events (SSE)。它在某些方面可以看作是 WebSocket 的輕量級替代方案。


上一篇
[Day 21] 進度追蹤 (一):Long Polling
系列文
用 FastAPI 打造你的 AI 服務22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言