在上一篇文章中,我們快速搭建了一個影片處理服務的原型。儘管它能夠正常運作,但無論是基於記憶體的即時處理,還是採用 BackgroundTasks 的磁碟方案,都暴露出明顯的架構瓶頸:所有任務都被限制在單一服務實例中執行,不僅併發處理能力受限,更缺乏企業級的任務管理與容錯機制。
今天,我們將進行一次架構層面的重大升級,引入業界成熟的分散式任務佇列系統——Celery,徹底重構我們的服務架構。本次升級的核心目標包括:
在處理大型影片檔案時,我們面臨著一個關鍵的架構決策:應該將上傳的檔案暫存在記憶體中,還是直接寫入磁碟?
記憶體暫存方案的最大優勢在於其無與倫比的存取速度。記憶體的讀寫效能遠超磁碟 I/O,這使得小檔案的處理極為高效。同時,這種方案在實作上相對簡潔,開發者無需處理檔案系統權限、磁碟空間管理等複雜問題,且檔案不會在磁碟上留下痕跡,特別適合處理敏感資料的場景。
然而,記憶體暫存的限制在大檔案處理場景中會被放大。大型影片檔案(動輒數 GB)會對系統記憶體造成巨大壓力,當多個用戶同時上傳檔案時,很容易觸發記憶體不足錯誤(OOM),導致整個服務崩潰。更根本的問題是記憶體資源的有限性,這使得此方案難以應對高併發或超大檔案的商業場景。
相較之下,磁碟暫存提供了更為穩健的解決方案。磁碟空間通常遠大於可用記憶體,能夠從容處理任意大小的檔案。更重要的是,通過串流方式讀寫檔案,系統的記憶體佔用量能始終保持在可控的低水平,即使處理數十 GB 的影片檔案也不會對系統穩定性造成威脅。
磁碟暫存的另一個顯著優勢是其優秀的併發特性。多個任務可以同時處理各自的檔案而不會互相影響,這為水平擴展奠定了基礎。此外,磁碟暫存具備良好的容錯性,即使系統意外重啟,磁碟上的檔案仍然可以繼續處理,不會造成用戶資料的遺失。
當然,磁碟暫存也有其考量之處。磁碟 I/O 的速度確實不如記憶體存取,但對於影片處理這類 CPU 密集型任務而言,這種差異的影響相當有限。另一個需要注意的是暫存檔案的生命週期管理,必須建立完善的清理機制,避免磁碟空間的洩漏。
綜合考量穩定性、擴展性與實際部署需求,我們選擇磁碟暫存方案。這個決策基於以下理由:
雖然磁碟 I/O 會帶來些微的效能開銷,但相較於系統穩定性和可擴展性的收益,這個取捨是非常值得的。
Celery 是一套成熟的分散式任務佇列系統,基於訊息傳遞架構設計。它的核心組件協同運作,形成一個強大的非同步處理生態系統:
影片處理的邏輯現在被定義為一個 Celery Task。bind=True
允許我們在任務內部存取 self
,從而可以更新狀態和檢查是否被撤銷。
# tasks.py
import os
import time
from celery import Celery
from celery.exceptions import Ignore
# 使用 Redis 作為 Broker 和 Backend
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task(bind=True)
def process_video_task(self, temp_file_path: str):
"""Celery Worker 執行的影片處理任務"""
try:
total_frames = 100
for i in range(total_frames):
# 檢查是否接收到撤銷指令
result = AsyncResult(self.request.id, app=app)
if result.state == 'REVOKED':
# 清理資源後,拋出 Ignore 使任務狀態變為 REVOKED
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
raise Ignore()
# ... 模擬影片處理 ...
time.sleep(0.5)
# 使用 update_state 更新自定義的進度元數據
self.update_state(state="PROGRESS", meta={"progress": (i + 1)})
# 處理完成
return {"status": "Completed", "result_path": "..."}
finally:
# 確保無論如何暫存檔都會被刪除
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
FastAPI 現在只負責接收檔案、儲存並發布任務,然後就可以立即回應。
# main.py
import os
import uuid
import shutil
from fastapi import FastAPI, UploadFile, File
from celery.result import AsyncResult
from tasks import process_video_task
app = FastAPI()
TEMP_DIR = "." # 或其他適合的暫存目錄
@app.post("/upload/celery")
async def upload_video_celery(file: UploadFile = File(...)):
task_id = str(uuid.uuid4())
temp_file_path = os.path.join(TEMP_DIR, f"{task_id}_{file.filename}")
# 以串流方式將檔案寫入本地
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 異步發布任務到 Celery,並立即獲得 task_id
task = process_video_task.delay(temp_file_path)
return {"task_id": task.id}
@app.post("/tasks/{task_id}/cancel")
async def cancel_task(task_id: str):
# 發送撤銷指令,需要指定 Celery app 實例
result = AsyncResult(task_id, app=celery_app)
result.revoke(terminate=True)
return {"message": "已請求取消任務"}
# main.py
import asyncio
import json
from starlette.responses import StreamingResponse
from celery.result import AsyncResult
from tasks import process_video_task, app as celery_app # 導入 celery app 實例
async def progress_generator(task_id: str):
"""
一個非同步生成器,用於從 Celery Result Backend 查詢進度並推播。
"""
# 透過 task_id 取得 AsyncResult 物件,app=celery_app 確保使用正確的 backend 配置
result = AsyncResult(task_id, app=celery_app)
while not result.ready():
# result.ready() 會在任務完成、失敗或被撤銷時返回 True
state = result.state
progress = 0
# 我們在 Celery Task 中使用 'PROGRESS' 作為自定義的進行中狀態
if state == 'PROGRESS':
# result.info 是我們在 update_state 中存入的 meta 字典
progress = result.info.get('progress', 0)
elif state == 'PENDING':
# 任務還在佇列中等待被 Worker 領取
state = 'PENDING'
progress = 0
elif state == 'STARTED':
# 任務已被 Worker 領取,但還未回報進度
state = 'STARTED'
progress = 0
# 建立 JSON 格式的回應資料
response_data = {
"state": state,
"progress": progress
}
# 遵循 SSE 格式: "data: <json_string>\n\n"
yield f"data: {json.dumps(response_data)}\n\n"
# 每秒查詢一次狀態,避免過於頻繁地請求 Redis
await asyncio.sleep(1)
# 當迴圈結束,代表任務已完成,發送最後一次的最終狀態
final_state = {
"state": result.state,
"progress": 100,
}
# 如果任務成功,可以附帶結果
if result.successful():
final_state["result"] = result.get()
elif result.failed():
final_state["error"] = str(result.info) if result.info else "任務執行失敗"
yield f"data: {json.dumps(final_state)}\n\n"
# SSE 端點需要修改為從 Celery Result Backend 查詢狀態
@app.get("/progress/{task_id}")
async def get_progress(task_id: str):
"""
提供 SSE 端點,讓前端可以即時追蹤 Celery 任務的進度。
"""
# StreamingResponse 會保持連線,並持續調用 progress_generator 來發送資料
return StreamingResponse(progress_generator(task_id), media_type="text/event-stream")
引入 Celery 後,我們得以思考更複雜、更高效的處理架構。
一種有趣的設計是讓前端(瀏覽器)負責將影片拆分成 Frame,後端只接收並處理單張圖片。
一個更專業、更強大的架構是在後端實現平行處理。利用 Celery 的工作流(Canvas)功能,我們可以這樣設計:
ffmpeg
將影片拆分成數千個 Frame。這種架構能最大限度地利用運算資源,極大地縮短大型任務的處理時間。
上述的平行處理架構有一個重要前提:每個 Frame 的處理是獨立的,例如套用濾鏡、單幀物件偵測等。
然而,在許多高階 AI 應用中,任務需要上下文資訊 (Context),無法這樣完全平行化。例如:
在這些情境下,雖然仍可做一定程度的平行化(例如,將影片切成有重疊的片段處理),但無法像簡單濾鏡那樣將每一幀完全獨立對待。
透過引入 Celery 分散式任務佇列,我們成功將影片處理服務從單體原型升級為企業級分散式系統。我們不僅實現了可靠的任務管理機制,更探討了磁碟暫存策略與平行處理架構的實務考量,為構建高可靠 AI 服務奠定了重要的技術基礎。