iT邦幫忙

2025 iThome 鐵人賽

DAY 28
0
Software Development

用 FastAPI 打造你的 AI 服務系列 第 28

[Day 28] 實戰範例 (四):多模型並行處理

  • 分享至 

  • xImage
  •  

在實際的 AI 應用場景中,我們經常需要整合多種不同的模型來完成複雜任務。例如,一個智慧監控系統可能需要同時進行物件偵測、動作偵測和場景分析。然而,並非所有模型都能同時處理所有任務,或者單一模型的性能可能無法滿足需求。

在這種情況下,我們需要設計一個系統來協調多個 AI 模型的運行。這些模型可能有不同的特性:有些運算速度快且可以並行執行,有些則需要依序處理以保持狀態的一致性,還有些運算極其耗時需要在背景執行。今天,我們將探討如何在 FastAPI 中優雅地處理這些複雜的多模型協調需求。

礙於篇幅關係 (後面的文章越寫越長...),LLM 應用只能被捨棄了QQ

如果大家真的很想看範例的話,在 Day 12 的第 2 個範例,就是一個簡單的 LLM 應用 (使用 Gemini API),請各位將就一下XD

需求

我們的目標是建立一個 FastAPI 應用程式,處理來自 Webcam 的即時影像串流,並對每個畫面進行三種不同特性的 AI 處理:

  1. 任務一:快速處理、可並行執行(例如:物件偵測)
  2. 任務二:中等速度、需要循序處理(例如:姿態追蹤,需要維持前後幀的狀態)
  3. 任務三:處理時間極長、適合背景執行(例如:場景分析),結果透過 SSE 即時推播

架構設計與非同步協調

為了妥善處理這三種不同特性的任務,我們需要精心設計系統架構:

1. 資源管理 (lifespan)

我們使用 lifespan 這個非同步上下文管理器在應用啟動時初始化所需資源:

  • ThreadPoolExecutor:供任務一的並行處理使用
  • queue.Queue:包含任務佇列與結果佇列,搭配專屬執行緒來處理任務二
  • shared_state 字典:儲存最新的影像幀,供任務三讀取
  • asyncio.Queue:供 SSE 推播功能使用

應用程式關閉時會優雅地釋放所有資源。

2. 任務一:並行處理

主串流生成器透過 loop.run_in_executor 將每個幀的處理任務提交到 ThreadPoolExecutor,並等待結果返回。這確保了 CPU 密集型計算不會阻塞 FastAPI 的事件迴圈。

3. 任務二:循序處理

採用生產者-消費者模式:

  • 生產者:主串流生成器將 (frame, state) 放入任務佇列
  • 消費者:獨立的背景執行緒持續從任務佇列取出工作,執行循序運算後將 (result, new_state) 放入結果佇列
  • 結果回收:主串流生成器使用 loop.run_in_executor 非阻塞地從結果佇列取回處理結果

4. 任務三:背景 SSE 推播

  • 主串流生成器在獲得新幀時,將其副本儲存至 shared_state
  • 獨立的背景協程(透過 asyncio.create_task 啟動)定期從 shared_state 讀取最新幀進行分析
  • 分析結果放入 asyncio.Queue,等待 SSE 連線取用

程式碼範例

以下程式碼展示了如何在 FastAPI 中優雅地協調多個具有不同特性的 AI 處理任務:

import asyncio
import queue
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager

import cv2
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

# --- 共享資源 ---
# 使用字典來管理共享資源,方便在 lifespan 中初始化
shared_state = {
    "executor": None,
    "sequential_worker_thread": None,
    "task_queue": queue.Queue(),
    "result_queue": queue.Queue(),
    "sse_queue": None,  # 將在 lifespan 中初始化
    "latest_frame": None,
    "sse_task": None,  # 追蹤 SSE 背景任務
    "shutdown_event": threading.Event()  # 用於優雅關閉
}

# --- 模擬 AI 處理 ---
def fast_object_detection(frame):
    time.sleep(0.02)
    return "Fast Detection Result"

def slow_pose_tracking(frame, previous_state):
    time.sleep(0.1)
    new_state = {"timestamp": time.time(), "last_frame_shape": frame.shape}
    return "Slow Tracking Result", new_state

def very_slow_scene_analysis(frame):
    time.sleep(1.0)
    h, w, _ = frame.shape
    return f"Scene Analysis: {w}x{h} @ {int(time.time())}"

# --- 循序任務的 Worker ---
def sequential_worker():
    """
    這是一個獨立線程執行的函數,它是一個消費者。
    它確保了 slow_pose_tracking 任務是按順序執行的。
    """
    while not shared_state["shutdown_event"].is_set():
        try:
            frame, prev_state = shared_state["task_queue"].get(timeout=1.0)
            if frame is None: # 收到結束信號
                break
            result, new_state = slow_pose_tracking(frame, prev_state)
            shared_state["result_queue"].put((result, new_state))
            shared_state["task_queue"].task_done()
        except queue.Empty:
            continue  # 超時後繼續檢查 shutdown_event

# --- SSE 背景任務 ---
async def sse_background_task():
    """
    獨立的背景協程,每秒讀取一次最新 frame 進行分析
    """
    try:
        while not shared_state["shutdown_event"].is_set():
            await asyncio.sleep(1)
            if shared_state["latest_frame"] is not None:
                # 使用 run_in_executor 執行 blocking 函數 (相容 Python 3.7+)
                loop = asyncio.get_running_loop()
                analysis_result = await loop.run_in_executor(
                    None, very_slow_scene_analysis, shared_state["latest_frame"]
                )
                await shared_state["sse_queue"].put(analysis_result)
    except asyncio.CancelledError:
        print("SSE background task cancelled")
        raise

# --- Lifespan 管理資源 ---
@asynccontextmanager
async def lifespan(app: FastAPI):
    # --- 應用啟動時 ---
    print("Application starting up...")
    # 1. 初始化 asyncio.Queue (必須在事件迴圈中初始化)
    shared_state["sse_queue"] = asyncio.Queue()
    
    # 2. 初始化線程池
    shared_state["executor"] = ThreadPoolExecutor(max_workers=4)
    
    # 3. 啟動循序任務的 Worker Thread
    shared_state["sequential_worker_thread"] = threading.Thread(target=sequential_worker, daemon=True)
    shared_state["sequential_worker_thread"].start()

    # 4. 啟動 SSE 背景協程
    shared_state["sse_task"] = asyncio.create_task(sse_background_task())

    yield # 應用程式運行

    # --- 應用關閉時 ---
    print("Application shutting down...")
    # 1. 設置關閉事件
    shared_state["shutdown_event"].set()
    
    # 2. 取消 SSE 背景任務
    if shared_state["sse_task"]:
        shared_state["sse_task"].cancel()
        try:
            await shared_state["sse_task"]
        except asyncio.CancelledError:
            pass
    
    # 3. 關閉線程池
    shared_state["executor"].shutdown(wait=True)
    
    # 4. 發送信號讓循序 Worker 結束
    shared_state["task_queue"].put((None, None))
    if shared_state["sequential_worker_thread"].is_alive():
        shared_state["sequential_worker_thread"].join(timeout=5.0)

app = FastAPI(lifespan=lifespan)

# --- 主影像串流生成器 ---
async def video_generator():
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        return
        
    try:
        loop = asyncio.get_running_loop()
        previous_tracking_state = None

        while True:
            success, frame = cap.read()
            if not success:
                break
            
            # 將最新 frame (的複本) 存起來供 SSE 任務使用
            shared_state["latest_frame"] = frame.copy()

            # 提交並行任務到線程池
            future1 = loop.run_in_executor(shared_state["executor"], fast_object_detection, frame)
            
            # 提交循序任務到佇列
            shared_state["task_queue"].put((frame, previous_tracking_state))
            
            # 等待結果
            # 從佇列中 get 是 blocking 操作,也需要放入 executor
            future2 = loop.run_in_executor(None, shared_state["result_queue"].get)

            detect_result = await future1
            track_result, new_state = await future2
            
            previous_tracking_state = new_state # 更新狀態

            # 將結果繪製到畫面上
            cv2.putText(frame, detect_result, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.putText(frame, track_result, (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            
            ret, buffer = cv2.imencode('.jpg', frame)
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
    
    finally:
        # 確保攝影機資源被釋放
        cap.release()

# --- SSE 串流生成器 ---
async def sse_generator():
    try:
        while True:
            message = await shared_state["sse_queue"].get()
            yield f"data: {message}\n\n"
    except asyncio.CancelledError:
        print("SSE client disconnected.")

# --- API Endpoints ---
@app.get("/video_feed")
async def video_feed():
    return StreamingResponse(video_generator(), media_type="multipart/x-mixed-replace; boundary=frame")

@app.get("/sse_feed")
async def sse_feed():
    return StreamingResponse(sse_generator(), media_type="text/event-stream")

程式碼重點說明

這個實作中有幾個重要的技術細節:

  1. asyncio.Queue 初始化:必須在事件迴圈中初始化,因此放在 lifespan 函數內部
  2. 優雅關閉機制:使用 threading.Eventasyncio.CancelledError 確保所有背景任務能正確終止
  3. 資源清理:在 lifespan 關閉階段和 video_generatorfinally 區塊中確保資源被釋放
  4. 版本相容性:使用 loop.run_in_executor 而非 asyncio.to_thread,確保相容 Python 3.7+

關鍵設計要點

在這個多模型協調架構中,資源生命週期管理是基礎核心。透過 lifespan 機制,我們確保所有資源在應用啟動時正確初始化,並在關閉時優雅釋放,有效避免資源洩漏問題。這種集中式的資源管理方式讓系統更加穩定可靠。

非同步協調機制則是整個架構的精髓所在。我們針對不同特性的任務採用了最適合的處理方式:快速的並行任務利用 ThreadPoolExecutor 充分發揮多核心優勢;需要維持狀態一致性的循序任務透過佇列機制確保處理順序;而耗時的背景任務則透過獨立協程執行,完全不影響主流程的響應性。

這種架構設計帶來了顯著的效能優勢。主串流永遠不會被任何耗時操作阻塞,因為所有可能造成延遲的操作都透過適當的非同步機制處理。同時,不同特性的任務都能使用最適合的執行方式,達到資源的有效利用。更重要的是,這種設計具有良好的擴展性,我們可以輕易調整各種任務的執行緒數量或處理頻率,以適應不同的效能需求。

實際應用場景

以智慧監控系統為例,這種多模型協調架構能夠完美展現其實用價值。在一個典型的智慧監控場景中,系統需要同時處理物件偵測、動作偵測和場景分析等多項任務。物件偵測通常運算速度較快且可以並行處理多個目標,非常適合使用 ThreadPoolExecutor 來處理;動作偵測則需要追蹤物體在時間序列上的動作變化,必須按照時間順序處理每一幀的資料以維持追蹤狀態的連續性;而場景分析往往需要分析較長時間的行為模式,運算極其耗時,最適合放在背景持續運行,並透過 SSE 將檢測結果即時推送給監控人員。

這樣的架構設計讓監控系統能夠在保持即時性的同時,充分發揮每個 AI 模型的優勢,既不會因為某個模型的處理延遲而影響整體系統的響應速度,也能確保每個模型都在最適合的環境中運行,達到最佳的處理效果。

小結

今天我們深入探討了如何在 FastAPI 中協調多個具有不同特性的 AI 模型。透過合理的架構設計,我們能夠:

  1. 確保系統響應性:快速任務不被慢速任務拖累
  2. 維持狀態一致性:需要順序處理的任務得到妥善管理
  3. 提供即時反饋:透過 SSE 將背景處理結果即時推送給用戶
  4. 優雅管理資源:避免記憶體洩漏和資源競爭問題

這種設計模式不僅適用於 AI 應用,也可以應用到其他需要協調多種不同特性任務的場景中~


上一篇
[Day 27] 實戰範例 (三):即時影像串流
系列文
用 FastAPI 打造你的 AI 服務28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言