今天我們要挑戰一個非常常見的應用:AI 即時影像串流。想像一下,你需要開發一個服務,它能接收 Webcam 的影像,透過 AI 模型進行即時分析(例如:物件追蹤、手勢識別、動作分析),然後將附帶分析結果的影像串流回傳給前端。
這個任務面臨兩個根本性的挑戰:
讓我們用具體的數字來理解這個困境:
一個標準的 Webcam 可能是 30 FPS (Frames Per Second),代表每幀的間隔時間約為 1 / 30 ≈ 33.3
毫秒。
一個中等複雜度的 AI 模型,處理一張圖片可能需要 100 毫秒。
這個數字差距帶來了根本性的挑戰:AI 處理速度永遠跟不上影像產生速度。無論我們採用什麼架構,都必須面對以下現實:
1 / 0.1 = 10
FPS 的畫面結合前面提到的幀間相依性限制,我們無法透過同時處理多幀來提升效能。
關鍵問題不是如何「解決」這些限制,而是如何在這些限制下做到最好:確保我們處理的那 10 幀,都是最接近當下時刻的新鮮畫面,而不是已經過時的舊畫面。
要在這個限制下做到最好,關鍵在於「解耦」(Decoupling)。我們需要將「影像的生產(從 Webcam 抓取)」與「影像的消費(AI 處理與串流)」分開。這種架構無法改變 FPS 天花板和影像浪費的現實,但能確保我們丟棄的是舊畫面,保留的是新畫面。
影像擷取器 (Producer):我們會在 FastAPI 應用程式啟動時,透過 lifespan
啟動一個獨立的背景執行緒 (threading.Thread
)。這個執行緒的唯一任務就是以最高效率不斷地從 Webcam 讀取最新的影像幀,並透過執行緒鎖 (threading.Lock
) 安全地更新一個全域變數,這個變數只會保存最新的一幀畫面。舊的幀會被新幀覆蓋,確保影像的即時性。
如果想要改用 API 控制,可以參考 Day 24 的範例
FastAPI 處理與串流器 (Consumer):我們的 API endpoint 將會是一個串流回應 (StreamingResponse
)。它會在一個非同步的迴圈中:
yield
給前端。lifespan
會負責發送停止訊號,並確保背景執行緒優雅地退出。
StreamingResponse
的介紹可以參考 Day 12
這種架構確保了影像擷取不會阻塞 FastAPI 的主事件迴圈,也讓資源的生命週期與應用程式本身綁定,更加安全可靠。
以下是更新後的程式碼,使用 lifespan
和 threading
實現了背景影像擷取。
# main.py
import cv2
import time
import asyncio
import threading
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
# --- 共享資源 ---
camera = None
latest_frame = None
frame_lock = threading.Lock()
stop_event = threading.Event()
previous_frame_data = {"last_object_count": 0}
# -----------------
def capture_thread_func():
"""在背景執行緒中不斷擷取攝影機畫面的函數"""
global latest_frame, camera
while not stop_event.is_set():
if camera and camera.isOpened():
success, frame = camera.read()
if success:
with frame_lock:
latest_frame = frame.copy()
# 稍微等待,避免 CPU 空轉
time.sleep(1/60) # 嘗試以 60fps 擷取
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- 應用程式啟動 ---
global camera
print("應用程式啟動,正在開啟攝影機...")
camera = cv2.VideoCapture(0)
if not camera.isOpened():
raise RuntimeError("無法開啟攝影機")
# 啟動背景擷取執行緒
capture_thread = threading.Thread(target=capture_thread_func)
capture_thread.start()
print("背景攝影機擷取執行緒已啟動")
yield # 等待應用程式運行
# --- 應用程式關閉 ---
print("應用程式關閉,正在停止執行緒並釋放攝影機...")
stop_event.set()
capture_thread.join() # 等待執行緒完全停止
if camera:
camera.release()
print("資源已釋放")
app = FastAPI(lifespan=lifespan)
async def generate_ai_stream():
"""影像生成器,負責從共享變數讀取畫面、模擬 AI 處理並串流"""
global previous_frame_data
while True:
frame_to_process = None
with frame_lock:
if latest_frame is not None:
frame_to_process = latest_frame.copy()
if frame_to_process is None:
await asyncio.sleep(0.1) # 如果還沒有畫面,稍等一下
continue
# --- 模擬 AI 處理 ---
current_object_count = previous_frame_data["last_object_count"]
if int(time.time()) % 2 == 0:
current_object_count += 1
processing_time_ms = 100
cv2.putText(frame_to_process, f"Processing Time: {processing_time_ms}ms", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame_to_process, f"Object Count: {current_object_count}", (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
await asyncio.sleep(processing_time_ms / 1000.0)
previous_frame_data["last_object_count"] = current_object_count
# --- AI 處理結束 ---
ret, buffer = cv2.imencode('.jpg', frame_to_process)
if not ret: continue
frame_bytes = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
@app.get("/ai-video-stream")
async def video_feed():
"""提供 AI 處理後的影像串流"""
return StreamingResponse(generate_ai_stream(),
media_type='multipart/x-mixed-replace; boundary=frame')
如果想要確認當前 FPS 的話,也可以把它顯示在畫面上:
+ fps_start_time = 0
+ fps_frame_count = 0
+ fps = 0
async def generate_ai_stream():
"""影像生成器,負責從共享變數讀取畫面、模擬 AI 處理並串流"""
- global previous_frame_data
+ global previous_frame_data, fps_start_time, fps_frame_count, fps
while True:
frame_to_process = None
with frame_lock:
if latest_frame is not None:
frame_to_process = latest_frame.copy()
if frame_to_process is None:
await asyncio.sleep(0.1) # 如果還沒有畫面,稍等一下
continue
+ # FPS 計算
+ if fps_frame_count == 0:
+ fps_start_time = time.time()
+
+ fps_frame_count += 1
+ # 每 10 幀計算一次 FPS
+ if fps_frame_count >= 10:
+ elapsed_time = time.time() - fps_start_time
+ if elapsed_time > 0:
+ fps = fps_frame_count / elapsed_time
+ fps_frame_count = 0
# --- 模擬 AI 處理 ---
current_object_count = previous_frame_data["last_object_count"]
if int(time.time()) % 2 == 0:
current_object_count += 1
processing_time_ms = 100
cv2.putText(frame_to_process, f"Processing Time: {processing_time_ms}ms", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame_to_process, f"Object Count: {current_object_count}", (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
+ # 顯示 FPS
+ cv2.putText(frame_to_process, f"FPS: {fps:.2f}", (10, 110),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
await asyncio.sleep(processing_time_ms / 1000.0)
previous_frame_data["last_object_count"] = current_object_count
# --- AI 處理結束 ---
ret, buffer = cv2.imencode('.jpg', frame_to_process)
if not ret:
continue
frame_bytes = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
執行起來效果是這樣:
(這是照片)
可以看到 FPS 約為 9-10,這符合我們的預期(100ms 處理時間 ≈ 10 FPS),但重要的是畫面是即時的,沒有明顯的延遲累積。
我們目前的架構完美地處理了「幀之間有相依性」的場景。但如果你的 AI 應用中,每一幀的處理都是完全獨立的(例如:單純的物件偵測),那麼我們就可以解開封印,透過平行處理來大幅提升輸出 FPS。
這時架構可以演變成「生產者-任務佇列-消費者池」模型:
生產者 (Producer):背景執行緒依舊高速抓取畫面,但這次是將每一幀都放入一個任務佇列 (Queue
)。
消費者池 (Consumer Pool):根據 CPU 核心數,我們啟動一個由多個獨立 Worker Process 組成的池 (multiprocessing.Pool
)。每個 Worker 從任務佇列中取出幀,獨立進行 AI 運算。
串流器 (Collector):Worker 將處理好的結果放入一個「結果佇列」,FastAPI 則從結果佇列中取出成品,串流給前端。
由於 AI 運算是 CPU 密集型任務,使用 multiprocessing
可以繞過 Python 的 GIL (全局解釋器鎖),實現真正的平行計算。
透過這種方式,若單一 Worker 的處理能力是 10 FPS,只要你部署 3-4 個 Worker,就能實現 30 FPS 的即時 AI 影像分析,達到與來源影像同步的驚人效能。
在這個實戰範例中,我們面對了即時 AI 影像串流的現實挑戰:當 AI 處理時間(100ms)遠大於影像幀間隔(33ms)時,FPS 限制和影像浪費是無法避免的。我們的解決方案專注於在這個限制下做到最好:確保處理的總是最新畫面,避免處理過時影像造成的額外延遲感受。
關鍵技術包括:
threading.Lock
保護共享資源