iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0
Software Development

用 FastAPI 打造你的 AI 服務系列 第 27

[Day 27] 實戰範例 (三):即時影像串流

  • 分享至 

  • xImage
  •  

今天我們要挑戰一個非常常見的應用:AI 即時影像串流。想像一下,你需要開發一個服務,它能接收 Webcam 的影像,透過 AI 模型進行即時分析(例如:物件追蹤、手勢識別、動作分析),然後將附帶分析結果的影像串流回傳給前端。

這個任務面臨兩個根本性的挑戰:

  1. 處理速度瓶頸:AI 模型的處理時間,往往遠大於影像來源的幀間隔時間
  2. 幀間相依性限制:許多 AI 應用需要前一幀的運算結果來處理當前幀(如物件追蹤、姿態分析),這使得我們無法簡單地透過平行處理來提升效能

核心問題分析

讓我們用具體的數字來理解這個困境:

一個標準的 Webcam 可能是 30 FPS (Frames Per Second),代表每幀的間隔時間約為 1 / 30 ≈ 33.3 毫秒。

一個中等複雜度的 AI 模型,處理一張圖片可能需要 100 毫秒。

這個數字差距帶來了根本性的挑戰:AI 處理速度永遠跟不上影像產生速度。無論我們採用什麼架構,都必須面對以下現實:

  1. 輸出 FPS 天花板:受限於 AI 處理時間,客戶端最多只能得到 1 / 0.1 = 10 FPS 的畫面
  2. 影像來源必然浪費:Webcam 產生 30 幀,但我們只能處理其中 10 幀,剩下 20 幀註定被捨棄

結合前面提到的幀間相依性限制,我們無法透過同時處理多幀來提升效能。

關鍵問題不是如何「解決」這些限制,而是如何在這些限制下做到最好:確保我們處理的那 10 幀,都是最接近當下時刻的新鮮畫面,而不是已經過時的舊畫面。

架構設計:解耦生產者與消費者

要在這個限制下做到最好,關鍵在於「解耦」(Decoupling)。我們需要將「影像的生產(從 Webcam 抓取)」與「影像的消費(AI 處理與串流)」分開。這種架構無法改變 FPS 天花板和影像浪費的現實,但能確保我們丟棄的是舊畫面,保留的是新畫面

  1. 影像擷取器 (Producer):我們會在 FastAPI 應用程式啟動時,透過 lifespan 啟動一個獨立的背景執行緒 (threading.Thread)。這個執行緒的唯一任務就是以最高效率不斷地從 Webcam 讀取最新的影像幀,並透過執行緒鎖 (threading.Lock) 安全地更新一個全域變數,這個變數只會保存最新的一幀畫面。舊的幀會被新幀覆蓋,確保影像的即時性。

    如果想要改用 API 控制,可以參考 Day 24 的範例

  2. FastAPI 處理與串流器 (Consumer):我們的 API endpoint 將會是一個串流回應 (StreamingResponse)。它會在一個非同步的迴圈中:

    • 透過執行緒鎖,安全地讀取全域變數中當下最新的一幀。
    • 執行耗時的 AI 模擬運算。
    • 將處理完的影像幀 yield 給前端。
    • 當應用程式關閉時,lifespan 會負責發送停止訊號,並確保背景執行緒優雅地退出。

    StreamingResponse 的介紹可以參考 Day 12

這種架構確保了影像擷取不會阻塞 FastAPI 的主事件迴圈,也讓資源的生命週期與應用程式本身綁定,更加安全可靠。

程式碼範例

以下是更新後的程式碼,使用 lifespanthreading 實現了背景影像擷取。

# main.py
import cv2
import time
import asyncio
import threading
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

# --- 共享資源 ---
camera = None
latest_frame = None
frame_lock = threading.Lock()
stop_event = threading.Event()
previous_frame_data = {"last_object_count": 0}
# -----------------

def capture_thread_func():
    """在背景執行緒中不斷擷取攝影機畫面的函數"""
    global latest_frame, camera
    while not stop_event.is_set():
        if camera and camera.isOpened():
            success, frame = camera.read()
            if success:
                with frame_lock:
                    latest_frame = frame.copy()
        # 稍微等待,避免 CPU 空轉
        time.sleep(1/60) # 嘗試以 60fps 擷取

@asynccontextmanager
async def lifespan(app: FastAPI):
    # --- 應用程式啟動 ---
    global camera
    print("應用程式啟動,正在開啟攝影機...")
    camera = cv2.VideoCapture(0)
    if not camera.isOpened():
        raise RuntimeError("無法開啟攝影機")
    
    # 啟動背景擷取執行緒
    capture_thread = threading.Thread(target=capture_thread_func)
    capture_thread.start()
    print("背景攝影機擷取執行緒已啟動")
    
    yield # 等待應用程式運行
    
    # --- 應用程式關閉 ---
    print("應用程式關閉,正在停止執行緒並釋放攝影機...")
    stop_event.set()
    capture_thread.join() # 等待執行緒完全停止
    if camera:
        camera.release()
    print("資源已釋放")

app = FastAPI(lifespan=lifespan)

async def generate_ai_stream():
    """影像生成器,負責從共享變數讀取畫面、模擬 AI 處理並串流"""
    global previous_frame_data
    
    while True:
        frame_to_process = None
        with frame_lock:
            if latest_frame is not None:
                frame_to_process = latest_frame.copy()

        if frame_to_process is None:
            await asyncio.sleep(0.1) # 如果還沒有畫面,稍等一下
            continue
        
        # --- 模擬 AI 處理 ---
        current_object_count = previous_frame_data["last_object_count"]
        if int(time.time()) % 2 == 0:
            current_object_count += 1
        
        processing_time_ms = 100
        cv2.putText(frame_to_process, f"Processing Time: {processing_time_ms}ms", (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame_to_process, f"Object Count: {current_object_count}", (10, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        await asyncio.sleep(processing_time_ms / 1000.0)

        previous_frame_data["last_object_count"] = current_object_count
        # --- AI 處理結束 ---

        ret, buffer = cv2.imencode('.jpg', frame_to_process)
        if not ret: continue
        frame_bytes = buffer.tobytes()

        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')

@app.get("/ai-video-stream")
async def video_feed():
    """提供 AI 處理後的影像串流"""
    return StreamingResponse(generate_ai_stream(), 
                             media_type='multipart/x-mixed-replace; boundary=frame')

如果想要確認當前 FPS 的話,也可以把它顯示在畫面上:

+ fps_start_time = 0
+ fps_frame_count = 0
+ fps = 0

async def generate_ai_stream():
    """影像生成器,負責從共享變數讀取畫面、模擬 AI 處理並串流"""
-   global previous_frame_data
+   global previous_frame_data, fps_start_time, fps_frame_count, fps
    
    while True:
        frame_to_process = None
        with frame_lock:
            if latest_frame is not None:
                frame_to_process = latest_frame.copy()

        if frame_to_process is None:
            await asyncio.sleep(0.1) # 如果還沒有畫面,稍等一下
            continue
        
+       # FPS 計算
+       if fps_frame_count == 0:
+           fps_start_time = time.time()
+       
+       fps_frame_count += 1
+       # 每 10 幀計算一次 FPS
+       if fps_frame_count >= 10:
+           elapsed_time = time.time() - fps_start_time
+           if elapsed_time > 0:
+               fps = fps_frame_count / elapsed_time
+               fps_frame_count = 0
        
        # --- 模擬 AI 處理 ---
        current_object_count = previous_frame_data["last_object_count"]
        if int(time.time()) % 2 == 0:
            current_object_count += 1
        
        processing_time_ms = 100
        cv2.putText(frame_to_process, f"Processing Time: {processing_time_ms}ms", (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame_to_process, f"Object Count: {current_object_count}", (10, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
+       # 顯示 FPS
+       cv2.putText(frame_to_process, f"FPS: {fps:.2f}", (10, 110), 
+                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)

        await asyncio.sleep(processing_time_ms / 1000.0)

        previous_frame_data["last_object_count"] = current_object_count
        # --- AI 處理結束 ---

        ret, buffer = cv2.imencode('.jpg', frame_to_process)
        if not ret:
            continue
        frame_bytes = buffer.tobytes()

        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')

執行起來效果是這樣:


(這是照片)

可以看到 FPS 約為 9-10,這符合我們的預期(100ms 處理時間 ≈ 10 FPS),但重要的是畫面是即時的,沒有明顯的延遲累積。

進階優化:當幀處理無需前後依賴時

我們目前的架構完美地處理了「幀之間有相依性」的場景。但如果你的 AI 應用中,每一幀的處理都是完全獨立的(例如:單純的物件偵測),那麼我們就可以解開封印,透過平行處理來大幅提升輸出 FPS。

這時架構可以演變成「生產者-任務佇列-消費者池」模型:

  1. 生產者 (Producer):背景執行緒依舊高速抓取畫面,但這次是將每一幀都放入一個任務佇列 (Queue)。

  2. 消費者池 (Consumer Pool):根據 CPU 核心數,我們啟動一個由多個獨立 Worker Process 組成的池 (multiprocessing.Pool)。每個 Worker 從任務佇列中取出幀,獨立進行 AI 運算。

  3. 串流器 (Collector):Worker 將處理好的結果放入一個「結果佇列」,FastAPI 則從結果佇列中取出成品,串流給前端。

由於 AI 運算是 CPU 密集型任務,使用 multiprocessing 可以繞過 Python 的 GIL (全局解釋器鎖),實現真正的平行計算。

透過這種方式,若單一 Worker 的處理能力是 10 FPS,只要你部署 3-4 個 Worker,就能實現 30 FPS 的即時 AI 影像分析,達到與來源影像同步的驚人效能。

小結

在這個實戰範例中,我們面對了即時 AI 影像串流的現實挑戰:當 AI 處理時間(100ms)遠大於影像幀間隔(33ms)時,FPS 限制和影像浪費是無法避免的。我們的解決方案專注於在這個限制下做到最好:確保處理的總是最新畫面,避免處理過時影像造成的額外延遲感受。

關鍵技術包括:

  • 解耦設計:透過生產者-消費者模型,讓影像擷取持續進行,不被 AI 處理阻塞
  • 即時性優化:始終處理最新幀,自動捨棄過時的舊幀
  • FastAPI lifespan:管理背景執行緒的生命週期
  • 執行緒安全:使用 threading.Lock 保護共享資源
  • 擴展可能:當幀間無依賴時,可透過多進程池突破 FPS 限制

上一篇
[Day 26] 實戰範例 (二):影片處理 SaaS (下)
系列文
用 FastAPI 打造你的 AI 服務27
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言