在實際的 AI 應用場景中,我們經常需要整合多種不同的模型來完成複雜任務。例如,一個智慧監控系統可能需要同時進行物件偵測、動作偵測和場景分析。然而,並非所有模型都能同時處理所有任務,或者單一模型的性能可能無法滿足需求。
在這種情況下,我們需要設計一個系統來協調多個 AI 模型的運行。這些模型可能有不同的特性:有些運算速度快且可以並行執行,有些則需要依序處理以保持狀態的一致性,還有些運算極其耗時需要在背景執行。今天,我們將探討如何在 FastAPI 中優雅地處理這些複雜的多模型協調需求。
礙於篇幅關係 (後面的文章越寫越長...),LLM 應用只能被捨棄了QQ
如果大家真的很想看範例的話,在 Day 12 的第 2 個範例,就是一個簡單的 LLM 應用 (使用 Gemini API),請各位將就一下XD
我們的目標是建立一個 FastAPI 應用程式,處理來自 Webcam 的即時影像串流,並對每個畫面進行三種不同特性的 AI 處理:
為了妥善處理這三種不同特性的任務,我們需要精心設計系統架構:
lifespan
)我們使用 lifespan
這個非同步上下文管理器在應用啟動時初始化所需資源:
ThreadPoolExecutor
:供任務一的並行處理使用queue.Queue
:包含任務佇列與結果佇列,搭配專屬執行緒來處理任務二shared_state
字典:儲存最新的影像幀,供任務三讀取asyncio.Queue
:供 SSE 推播功能使用應用程式關閉時會優雅地釋放所有資源。
主串流生成器透過 loop.run_in_executor
將每個幀的處理任務提交到 ThreadPoolExecutor
,並等待結果返回。這確保了 CPU 密集型計算不會阻塞 FastAPI 的事件迴圈。
採用生產者-消費者模式:
(frame, state)
放入任務佇列(result, new_state)
放入結果佇列loop.run_in_executor
非阻塞地從結果佇列取回處理結果shared_state
asyncio.create_task
啟動)定期從 shared_state
讀取最新幀進行分析asyncio.Queue
,等待 SSE 連線取用以下程式碼展示了如何在 FastAPI 中優雅地協調多個具有不同特性的 AI 處理任務:
import asyncio
import queue
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
import cv2
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
# --- 共享資源 ---
# 使用字典來管理共享資源,方便在 lifespan 中初始化
shared_state = {
"executor": None,
"sequential_worker_thread": None,
"task_queue": queue.Queue(),
"result_queue": queue.Queue(),
"sse_queue": None, # 將在 lifespan 中初始化
"latest_frame": None,
"sse_task": None, # 追蹤 SSE 背景任務
"shutdown_event": threading.Event() # 用於優雅關閉
}
# --- 模擬 AI 處理 ---
def fast_object_detection(frame):
time.sleep(0.02)
return "Fast Detection Result"
def slow_pose_tracking(frame, previous_state):
time.sleep(0.1)
new_state = {"timestamp": time.time(), "last_frame_shape": frame.shape}
return "Slow Tracking Result", new_state
def very_slow_scene_analysis(frame):
time.sleep(1.0)
h, w, _ = frame.shape
return f"Scene Analysis: {w}x{h} @ {int(time.time())}"
# --- 循序任務的 Worker ---
def sequential_worker():
"""
這是一個獨立線程執行的函數,它是一個消費者。
它確保了 slow_pose_tracking 任務是按順序執行的。
"""
while not shared_state["shutdown_event"].is_set():
try:
frame, prev_state = shared_state["task_queue"].get(timeout=1.0)
if frame is None: # 收到結束信號
break
result, new_state = slow_pose_tracking(frame, prev_state)
shared_state["result_queue"].put((result, new_state))
shared_state["task_queue"].task_done()
except queue.Empty:
continue # 超時後繼續檢查 shutdown_event
# --- SSE 背景任務 ---
async def sse_background_task():
"""
獨立的背景協程,每秒讀取一次最新 frame 進行分析
"""
try:
while not shared_state["shutdown_event"].is_set():
await asyncio.sleep(1)
if shared_state["latest_frame"] is not None:
# 使用 run_in_executor 執行 blocking 函數 (相容 Python 3.7+)
loop = asyncio.get_running_loop()
analysis_result = await loop.run_in_executor(
None, very_slow_scene_analysis, shared_state["latest_frame"]
)
await shared_state["sse_queue"].put(analysis_result)
except asyncio.CancelledError:
print("SSE background task cancelled")
raise
# --- Lifespan 管理資源 ---
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- 應用啟動時 ---
print("Application starting up...")
# 1. 初始化 asyncio.Queue (必須在事件迴圈中初始化)
shared_state["sse_queue"] = asyncio.Queue()
# 2. 初始化線程池
shared_state["executor"] = ThreadPoolExecutor(max_workers=4)
# 3. 啟動循序任務的 Worker Thread
shared_state["sequential_worker_thread"] = threading.Thread(target=sequential_worker, daemon=True)
shared_state["sequential_worker_thread"].start()
# 4. 啟動 SSE 背景協程
shared_state["sse_task"] = asyncio.create_task(sse_background_task())
yield # 應用程式運行
# --- 應用關閉時 ---
print("Application shutting down...")
# 1. 設置關閉事件
shared_state["shutdown_event"].set()
# 2. 取消 SSE 背景任務
if shared_state["sse_task"]:
shared_state["sse_task"].cancel()
try:
await shared_state["sse_task"]
except asyncio.CancelledError:
pass
# 3. 關閉線程池
shared_state["executor"].shutdown(wait=True)
# 4. 發送信號讓循序 Worker 結束
shared_state["task_queue"].put((None, None))
if shared_state["sequential_worker_thread"].is_alive():
shared_state["sequential_worker_thread"].join(timeout=5.0)
app = FastAPI(lifespan=lifespan)
# --- 主影像串流生成器 ---
async def video_generator():
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Cannot open camera")
return
try:
loop = asyncio.get_running_loop()
previous_tracking_state = None
while True:
success, frame = cap.read()
if not success:
break
# 將最新 frame (的複本) 存起來供 SSE 任務使用
shared_state["latest_frame"] = frame.copy()
# 提交並行任務到線程池
future1 = loop.run_in_executor(shared_state["executor"], fast_object_detection, frame)
# 提交循序任務到佇列
shared_state["task_queue"].put((frame, previous_tracking_state))
# 等待結果
# 從佇列中 get 是 blocking 操作,也需要放入 executor
future2 = loop.run_in_executor(None, shared_state["result_queue"].get)
detect_result = await future1
track_result, new_state = await future2
previous_tracking_state = new_state # 更新狀態
# 將結果繪製到畫面上
cv2.putText(frame, detect_result, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame, track_result, (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
ret, buffer = cv2.imencode('.jpg', frame)
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
finally:
# 確保攝影機資源被釋放
cap.release()
# --- SSE 串流生成器 ---
async def sse_generator():
try:
while True:
message = await shared_state["sse_queue"].get()
yield f"data: {message}\n\n"
except asyncio.CancelledError:
print("SSE client disconnected.")
# --- API Endpoints ---
@app.get("/video_feed")
async def video_feed():
return StreamingResponse(video_generator(), media_type="multipart/x-mixed-replace; boundary=frame")
@app.get("/sse_feed")
async def sse_feed():
return StreamingResponse(sse_generator(), media_type="text/event-stream")
這個實作中有幾個重要的技術細節:
asyncio.Queue
初始化:必須在事件迴圈中初始化,因此放在 lifespan
函數內部threading.Event
和 asyncio.CancelledError
確保所有背景任務能正確終止lifespan
關閉階段和 video_generator
的 finally
區塊中確保資源被釋放loop.run_in_executor
而非 asyncio.to_thread
,確保相容 Python 3.7+在這個多模型協調架構中,資源生命週期管理是基礎核心。透過 lifespan
機制,我們確保所有資源在應用啟動時正確初始化,並在關閉時優雅釋放,有效避免資源洩漏問題。這種集中式的資源管理方式讓系統更加穩定可靠。
非同步協調機制則是整個架構的精髓所在。我們針對不同特性的任務採用了最適合的處理方式:快速的並行任務利用 ThreadPoolExecutor
充分發揮多核心優勢;需要維持狀態一致性的循序任務透過佇列機制確保處理順序;而耗時的背景任務則透過獨立協程執行,完全不影響主流程的響應性。
這種架構設計帶來了顯著的效能優勢。主串流永遠不會被任何耗時操作阻塞,因為所有可能造成延遲的操作都透過適當的非同步機制處理。同時,不同特性的任務都能使用最適合的執行方式,達到資源的有效利用。更重要的是,這種設計具有良好的擴展性,我們可以輕易調整各種任務的執行緒數量或處理頻率,以適應不同的效能需求。
以智慧監控系統為例,這種多模型協調架構能夠完美展現其實用價值。在一個典型的智慧監控場景中,系統需要同時處理物件偵測、動作偵測和場景分析等多項任務。物件偵測通常運算速度較快且可以並行處理多個目標,非常適合使用 ThreadPoolExecutor
來處理;動作偵測則需要追蹤物體在時間序列上的動作變化,必須按照時間順序處理每一幀的資料以維持追蹤狀態的連續性;而場景分析往往需要分析較長時間的行為模式,運算極其耗時,最適合放在背景持續運行,並透過 SSE 將檢測結果即時推送給監控人員。
這樣的架構設計讓監控系統能夠在保持即時性的同時,充分發揮每個 AI 模型的優勢,既不會因為某個模型的處理延遲而影響整體系統的響應速度,也能確保每個模型都在最適合的環境中運行,達到最佳的處理效果。
今天我們深入探討了如何在 FastAPI 中協調多個具有不同特性的 AI 模型。透過合理的架構設計,我們能夠:
這種設計模式不僅適用於 AI 應用,也可以應用到其他需要協調多種不同特性任務的場景中~