iT邦幫忙

2025 iThome 鐵人賽

DAY 25
0
Software Development

用 FastAPI 打造你的 AI 服務系列 第 25

[Day 25] 實戰範例 (二):影片處理 SaaS (上)

  • 分享至 

  • xImage
  •  

今天,我們將迎來一個綜合性的實戰範例,目標是打造一個迷你的影片處理 SaaS 平台。

現代許多服務都需要處理這類需求:從 YouTube 的影片轉檔、到企業的監控錄影系統,甚至是線上會議的錄影功能。這些應用場景都有一個共同特點:上傳檔案 → 長時間背景處理 → 即時進度回饋。舉例來說,當使用者上傳一段 500MB 的 4K 影片到平台時,系統需要在背景進行格式轉換、壓縮、浮水印添加等處理,同時讓使用者透過網頁即時看到「轉檔進度 65%」這樣的更新狀態。

在這個系列的上篇中,我們將先建立一個「原型版本」。這個版本會實現所有核心功能:使用者上傳影片,伺服器在背景進行處理,並透過 SSE 即時回報進度。

架構設計與核心思路

我們的「原型」服務需要滿足以下處理流程:

  • 上傳階段:使用者上傳影片檔案,API 接收檔案後將其完整讀入記憶體,成為一個 bytes 物件。
  • 啟動背景任務:API 立即回傳一個任務 ID,並將 bytes 物件傳遞給背景任務函式進行處理。
  • 處理階段:背景任務模擬影片處理流程,逐步處理(想像成對 bytes 資料進行運算),並不斷更新全域字典中該任務的進度。
  • 進度追蹤:前端透過 SSE 連接特定端點,伺服器從全域字典讀取進度,並持續推播給前端。

這種架構的核心優勢在於利用記憶體作為中介,省去了檔案讀寫的步驟,使初期開發變得非常迅速。

此外,這個服務還必須滿足一個關鍵需求:在影片處理期間,FastAPI 服務必須保持回應能力。我們不能因為正在處理影片,就導致其他 API(例如查詢系統狀態)被阻塞而無法回應。

需要特別注意的是:這個做法雖然簡單,但有其嚴重的局限性,只適用於內部工具或預期流量極小、檔案極小的特定場景。在下篇文章中,我們將對其進行徹底的優化改進。

核心程式碼範例

1. 任務管理與記憶體處理邏輯

我們定義一個全域字典 tasks 來追蹤狀態,以及一個接收 bytes 物件的處理函式。

# 警告:此為簡易範例,生產環境應使用 Redis 等更穩健的方案
tasks = {}

def process_video_in_memory(task_id: str, video_bytes: bytes):
    """
    模擬影片處理函式,直接操作記憶體中的 bytes。
    """
    total_steps = 100
    
    for i in range(total_steps):
        # 模擬 CPU bound 工作: 進行一些無意義但耗時的計算
        _ = [x**2 for x in range(1_000_000)] 
        tasks[task_id]['progress'] = (i + 1) / total_steps * 100

    tasks[task_id]['status'] = 'completed'
    tasks[task_id]['result'] = b"processed video data..." # 處理結果

2. API 端點:上傳並讀入記憶體

這是此架構的關鍵,我們使用 await file.read() 將檔案一次性讀完。

import uuid
from fastapi import BackgroundTasks, FastAPI, UploadFile, File
from starlette.responses import JSONResponse

app = FastAPI()

@app.post("/upload")
async def upload_video_memory(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
    # 將檔案完整讀入記憶體,這一步是記憶體消耗的關鍵點!
    video_contents = await file.read()

    task_id = str(uuid.uuid4())
    tasks[task_id] = {"status": "processing", "progress": 0}

    # 將 bytes 物件傳給背景任務
    background_tasks.add_task(process_video_in_memory, task_id, video_contents)
    
    return JSONResponse(status_code=202, content={"task_id": task_id})

3. API 端點:SSE 進度追蹤

此部分與之前的設計相同,它只關心 tasks 字典中的狀態,不關心任務是如何執行的。

import asyncio
from starlette.responses import StreamingResponse

async def progress_generator(task_id: str):
    while task_id in tasks and tasks[task_id]['status'] != 'completed':
        progress = tasks[task_id].get('progress', 0)
        yield f"data: {{ \"progress\": {progress:.2f} }}\n\n"
        await asyncio.sleep(1)
    yield "data: {{ \"progress\": 100.00, \"status\": \"completed\" }}\n\n"

@app.get("/progress/{task_id}")
async def get_progress(task_id: str):
    return StreamingResponse(progress_generator(task_id), media_type="text/event-stream")

侷限

記憶體使用問題

將整個影片檔案完整載入記憶體的做法存在重大問題:

  • 記憶體成本昂貴:伺服器的記憶體資源遠比硬碟空間珍貴且有限。
  • 擴展性極差:假設 10 個使用者同時上傳 100MB 的影片,伺服器記憶體會瞬間暴增 1GB,極易導致服務因記憶體耗盡(OOM)而崩潰。
  • 檔案大小限制:現實中的影片檔案動輒數 GB,這種做法完全無法應對。

BackgroundTasks 的架構限制

雖然 BackgroundTasks 對於單一服務內的非同步任務處理非常便利,但它也有顯著的限制。由於它依賴於執行緒池(預設最多 40 個執行緒),這帶來了以下問題:

  1. 併發數量受限:同時運行的任務數受限於執行緒池大小,無法應對高併發需求。
  2. 無法跨機器擴展:所有任務都綁定在同一個服務實例中運行,如果服務崩潰,進行中的任務也會遺失。
  3. 功能相對簡陋:不支援任務排程、自動重試、複雜的任務鏈、優先級管理等進階功能。

對於需要大規模、高可靠性的非同步任務處理系統,業界的標準解決方案是使用專業的任務佇列(例如 Celery),搭配 Redis 或 RabbitMQ 作為訊息代理。這套架構能將任務分發到獨立的 Worker 叢集中執行,實現真正的水平擴展與故障容錯。

小結

今天,我們快速搭建了一個功能完整的影片處理服務原型。透過將檔案直接載入記憶體,我們大幅簡化了程式碼邏輯,實現了快速開發與概念驗證。

然而,這種方法存在許多嚴重缺陷:

記憶體濫用問題:將整個檔案載入記憶體不僅浪費寶貴的系統資源,更可能在高併發或大檔案情況下導致服務崩潰。在真實環境中,我們應該使用串流處理(streaming)的方式,分塊讀取和處理檔案,避免記憶體爆炸。

任務管理過於簡陋:使用全域字典儲存任務狀態缺乏持久性和可靠性,服務重啟會導致所有任務資訊遺失。BackgroundTasks 雖然方便,但無法提供任務重試、優先級管理、分散式處理等企業級功能。

缺乏錯誤處理機制:當前版本沒有完善的錯誤處理邏輯,無法應對檔案損壞、處理超時、系統資源不足等異常狀況。

安全性考量不足:沒有檔案類型檢查、大小限制、病毒掃描等安全機制,存在潛在的安全風險。

儘管如此,這個原型成功驗證了核心概念的可行性。在下一篇文章中,我們將正視這些問題,對其進行「生產級」改造,導入專業的檔案處理、任務佇列系統,並加強錯誤處理與安全機制,讓它能真正應對真實世界的挑戰。


上一篇
[Day 24] 實戰範例 (一):API 控制長時間任務
系列文
用 FastAPI 打造你的 AI 服務25
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言