iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0
Software Development

用 FastAPI 打造你的 AI 服務系列 第 24

[Day 24] 實戰範例 (一):API 控制長時間任務

  • 分享至 

  • xImage
  •  

經過了前面理論與基礎知識的鋪墊,從今天開始,我們將進入一系列的實戰範例。我們的目標是將先前學到的概念,如非同步、任務管理、生命週期等,應用於更貼近真實世界的場景中。

今天的主題是「API 控制長時間任務」。在 AI 應用中,這是一個極為常見的情境。例如,啟動一個需要數分鐘甚至數小時的模型訓練、執行一段長時間的數據批次處理,或如我們今天要實作的——控制一段影片的錄製。

情境說明

想像一個情境:我們需要建立一個遠端監控服務,可以透過 API 指令來「開始錄影」和「結束錄影」。錄影本身是一個持續性的 I/O 操作,會長時間佔用資源。

這個服務必須滿足一個關鍵需求:在錄影期間,FastAPI 服務必須保持回應。我們不能因為正在錄影,就導致其他 API (例如查詢系統狀態) 被卡住無法回應。這正是考驗我們對長時間任務處理能力的時候。

程式碼架構

要解決這個問題,核心思想就是不能讓長時間執行的錄影任務阻塞 FastAPI 的主事件循環 (Event Loop)。

一個直觀但錯誤的做法是,直接在 API 處理函式中寫一個 while 迴圈來讀取並儲存影像幀。這會導致整個服務在迴圈結束前,無法處理任何其他請求。

正確的作法是將這個長時間任務丟到背景的另一個執行緒 (Thread) 中。這樣,API 函式的職責就變得很單純:

  1. /start_recording:建立並啟動一個錄影執行緒,然後立即回傳「已開始錄影」的訊息。

  2. /stop_recording:修改一個共享的狀態變數 (例如一個布林值 is_recording),通知背景執行緒該停止了,然後等待執行緒確實結束後,回傳「已停止錄影」。

  3. /:一個簡單的 API,用於在錄影期間測試服務是否仍然可以正常回應。

我們需要一個全域變數來管理錄影執行緒的實例和錄影狀態,以便不同的 API 之間可以共享和控制。

範例程式碼

import cv2
import threading
from fastapi import FastAPI, HTTPException

app = FastAPI()

# --- 狀態管理 ---
# 使用一個字典來存放共享狀態,未來更容易擴充
recording_status = {
    "is_recording": False,
    "thread": None
}

# --- 長時間執行的任務 ---
def record_webcam():
    """
    執行 webcam 錄影的函式,這是一個長時間的阻塞任務。
    """
    global recording_status
    
    # 嘗試打開預設的 webcam
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        # 在實際應用中,應有更完善的錯誤通知機制
        recording_status["is_recording"] = False
        return

    # 取得影像的寬高
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # 定義影片編碼器與建立 VideoWriter 物件
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用 MP4 編碼
    out = cv2.VideoWriter('output.mp4', fourcc, 20.0, (width, height))

    print("Recording started...")
    # 只要 is_recording 為 True,就持續錄影
    while recording_status["is_recording"]:
        ret, frame = cap.read()
        if not ret:
            print("Can't receive frame (stream end?). Exiting ...")
            break
        out.write(frame)

    # 釋放資源
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print("Recording stopped and resources released.")


# --- API Endpoints ---
@app.get("/")
async def root():
    """ 用於測試服務是否仍在回應 """
    return {"message": "Hello World"}

@app.post("/start")
async def start_recording():
    """ 開始錄影 """
    global recording_status
    if recording_status["is_recording"]:
        raise HTTPException(status_code=400, detail="Already recording.")

    recording_status["is_recording"] = True
    # 建立一個新的執行緒來執行 record_webcam 函式
    recording_status["thread"] = threading.Thread(target=record_webcam)
    recording_status["thread"].start()
    
    return {"message": "Started recording."}

@app.post("/stop")
async def stop_recording():
    """ 停止錄影 """
    global recording_status
    if not recording_status["is_recording"]:
        raise HTTPException(status_code=400, detail="Not currently recording.")

    # 設定旗標為 False,通知執行緒停止
    recording_status["is_recording"] = False
    # 等待執行緒完全結束
    if recording_status["thread"]:
        recording_status["thread"].join()
        recording_status["thread"] = None
        
    return {"message": "Stopped recording."}

記得確認是否有 webcam,以及需要安裝 OpenCV (pip install opencv-python)

程式碼說明

  1. 狀態管理 (recording_status):我們用一個全域字典來追蹤錄影狀態 (is_recording) 和執行緒物件 (thread)。這使得不同 API 呼叫之間可以共享資訊。

  2. 錄影函式 (record_webcam):這是一個標準的 Python 函式,不是 async def。因為它內部充滿了 cv2 的同步阻塞操作。它的行為完全由 while recording_status["is_recording"]: 這個迴圈控制。一旦外部將 is_recording 設為 False,迴圈就會結束,並執行後續的資源釋放。

  3. /start API:當收到請求時,它會先檢查是否已在錄影中。若否,則將 is_recording 設為 True,並建立一個 threading.Thread 物件,將 record_webcam 函式作為目標。thread.start() 會立即在背景啟動這個執行緒,而 API 本身則可以馬上回傳訊息,不會被卡住。

  4. /stop API:它將 is_recording 設為 False,這會讓背景執行緒中的 while 迴圈停止。thread.join() 是個關鍵步驟,它會阻塞 /stop 這個 API 直到錄影執行緒真正執行完畢(釋放攝影機、關閉檔案),確保資源被妥善清理。

啟動服務後,可以先呼叫 /start,然後在錄影期間嘗試呼叫 /。你會發現 / 依然能夠迅速回傳 "Hello World",證明我們的服務沒有被阻塞。最後再呼叫 /stop 來結束錄影。

小結

透過這個簡單的範例,我們成功地使用 threading 模組,將一個長時間的阻塞任務與 FastAPI 的非同步核心分離,打造了一個具備基本控制能力且能保持回應的應用服務。這是未來處理複雜 AI 任務時,一個非常重要的基礎模式。


上一篇
[Day 23] 進度追蹤 (三):Server-Sent Event (SSE)
系列文
用 FastAPI 打造你的 AI 服務24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言