昨天我們探討了如何使用長輪詢 (Long Polling) 來追蹤背景任務進度。長輪詢雖然改善了傳統輪詢的缺點,但它本質上仍然是基於「一問一答」的 HTTP 模型,每次通訊都免不了建立連線的開銷。
如果我們的 AI 應用需要非常頻繁、低延遲的雙向溝通(例如:即時的 Log 輸出、互動式 AI 聊天機器人),那麼長輪詢可能就不敷使用了。今天,我們要介紹一個為此而生的技術:WebSocket。
之前我也有寫過一篇 WebSocket 介紹文章,有興趣的話歡迎各位去看看 XD
WebSocket 是一種在單一 TCP 連線上進行全雙工 (Full-duplex) 通訊的協定。讓我們拆解一下這個定義:
這個特性使得 WebSocket 非常適合需要高即時性與互動性的場景。
FastAPI 對 WebSocket 提供了非常好的支援,讓實作變得非常簡單。讓我們來看一個極簡單的範例:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
@app.websocket(...)
而不是 @app.get(...)
來定義 WebSocket 端點。WebSocket
型別的參數,FastAPI 會自動注入這個物件,讓我們可以與客戶端互動。await websocket.accept()
是必須的第一步,用來完成與客戶端的交握。while True
建立一個無窮迴圈來持續處理訊息。receive_text()
用來接收客戶端發送的文字訊息。send_text()
用來向客戶端發送文字訊息。如果想要測試 WebSocket,可以使用 Postman。
首先,要選擇建立新的 WebSocket (預設應該都是 HTTP Request)
接著,在 url 的地方輸入 ws://127.0.0.1:8000/ws
,再點擊右邊的「Connect」按鈕就可以連線了 (連線後,按鈕變成「Disconnect」)。接著,可以試著在下方輸入訊息,並點擊「Send」按鈕送出,就可以更下方看到送出的訊息以及接收到的訊息。
接下來,來看一個稍微複雜一點的例子:有一個 API 可以啟動背景任務,而背景任務在執行時會透過 WebSocket 傳送任務進度。
為了能夠讓其他地方也可以使用 WebSocket 物件,我們需要一個物件來統一管理:
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception:
pass
manager = ConnectionManager()
接下來就是設定 API 路由與背景任務:
import asyncio
import logging
from datetime import datetime
from fastapi import FastAPI, BackgroundTasks
# 設定 logger
logger = logging.getLogger("uvicorn")
app = FastAPI()
# 背景任務函數
async def run_background_task(task_id: str):
"""執行背景任務:每秒完成 10%,共 10 秒"""
for progress in range(10, 110, 10): # 10, 20, 30, ... 100
await asyncio.sleep(1) # 等待 1 秒
# 透過 WebSocket 廣播進度
message = f"任務 {task_id} 進度: {progress}%"
await manager.broadcast(message)
logger.info(message)
# 任務完成通知
completion_message = f"任務 {task_id} 已完成!"
await manager.broadcast(completion_message)
logger.info(completion_message)
@app.post("/start-task")
async def start_task(background_tasks: BackgroundTasks):
"""啟動背景任務並回傳任務 ID"""
task_id = "TASK-001" # 固定任務 ID
# 使用 FastAPI BackgroundTasks 啟動背景任務
background_tasks.add_task(run_background_task, task_id)
current_time = datetime.now().strftime("%H:%M:%S")
logger.info(f"[{current_time}] 啟動背景任務: {task_id}")
return {"task_id": task_id, "message": "背景任務已啟動"}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket 端點,用於接收任務進度更新"""
await manager.connect(websocket)
try:
await websocket.send_text("WebSocket 連線已建立,等待任務進度更新...")
# 保持連線
while True:
await websocket.receive_text()
except Exception:
manager.disconnect(websocket)
最後,執行起來會長這樣:
可以看到,隨著任務的進行,Postman 的 WebSocket 連線都有隨時收到任務進度更新~
WebSocket 提供了一個真正意義上的「持續性連線」,實現了低延遲、高效率的雙向通訊。它非常適合需要即時互動的 AI 應用,例如:
然而,WebSocket 也有一些需要考慮的地方。它的狀態管理比無狀態的 HTTP 複雜,且在某些較嚴格的網路環境(例如某些企業防火牆)下可能會被阻擋。
明天,我們將介紹最後一種技術:Server-Sent Events (SSE)。它在某些方面可以看作是 WebSocket 的輕量級替代方案。