在昨天的文章中,我們探討了如何在 FastAPI 的請求-回應週期 (Request-Response Cycle) 中處理錯誤。透過 HTTPException
和 Middleware,我們可以有效地捕捉和紀錄發生在 API 路由內的錯誤。
然而,現代 Web 應用程式的複雜度遠不止於此。我們經常需要處理一些耗時的任務,例如寄送電子郵件、處理影片轉檔、或是呼叫第三方服務。這些任務通常會被放在背景執行,以免阻塞主流程。但這也帶來了一個新的挑戰:當錯誤發生在這些「看不見」的背景任務中時,該怎麼辦?
想像一下,你透過 API 觸發一個在背景執行緒 (Thread) 中執行的複雜資料庫操作。如果這個操作因為某些原因失敗並拋出例外,會發生什麼事?
try...except
監控範圍內。thread.join()
)。這會導致主程式永遠等待下去,即使你按下 Ctrl+C
,也因為主執行緒被阻塞而無法正常關閉,造成所謂的「殭屍程序」。這就是為什麼處理背景任務的錯誤至關重要的原因。
BackgroundTasks
雖然簡單易用,但它有一個特點:錯誤會被靜默忽略。
這個「特點」到底是好事還是壞事是取決於任務本身,以下就假設大家希望至少知道任務是否有發生錯誤,並記錄。
from fastapi import FastAPI, BackgroundTasks
import logging
logger = logging.getLogger("uvicorn")
app = FastAPI()
def dangerous_task():
"""這個任務會拋出錯誤,但不會被捕獲"""
raise ValueError("背景任務發生錯誤!")
# 這行永遠不會執行
logger.info("任務完成")
@app.post("/dangerous-background")
async def run_dangerous_background(background_tasks: BackgroundTasks):
background_tasks.add_task(dangerous_task)
return {"message": "任務已提交"} # API 會正常回應,但背景任務會失敗
問題:使用者會收到「任務已提交」的成功回應,但背景任務實際上失敗了,而且沒有任何錯誤日誌。
解決方案:包裝背景任務,捕獲並記錄所有例外。
import traceback
def safe_background_wrapper(func, *args, **kwargs):
"""安全的背景任務包裝器"""
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"背景任務失敗: {func.__name__}")
logger.error(f"錯誤訊息: {str(e)}")
logger.error(f"完整錯誤: {traceback.format_exc()}")
@app.post("/safe-background")
async def run_safe_background(background_tasks: BackgroundTasks):
background_tasks.add_task(safe_background_wrapper, dangerous_task)
return {"message": "任務已提交(含錯誤處理)"}
當我們使用執行緒池來執行背景任務時,未處理的例外會導致執行緒無預警終止。
from fastapi import FastAPI
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
app = FastAPI()
# 模擬會失敗的資料處理任務
def process_data_item(item_id: int):
time.sleep(1) # 模擬耗時的資料處理
if item_id % 3 == 0: # 每第三個項目會失敗
raise RuntimeError(f"處理項目 {item_id} 時發生錯誤")
return {"id": item_id, "status": "processed", "result": f"data_{item_id}_processed"}
# 危險的做法:沒有錯誤處理
@app.post("/process-batch-dangerous")
def process_batch_dangerous(item_count: int = 5):
"""危險的批次處理 - 可能導致應用程式崩潰"""
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(process_data_item, i) for i in range(item_count)]
results = []
for future in as_completed(futures):
# 這裡會拋出例外,導致 API 回應 500 錯誤
result = future.result() # 危險!未處理的例外會被重新拋出
results.append(result)
return {"message": "批次處理完成", "results": results}
問題:當 future.result()
被呼叫時,如果任務拋出了例外,這個例外會被重新拋出到主執行緒,導致 API 回應 500 內部伺服器錯誤。
解決方案:使用 future.exception()
來檢查是否有例外發生,並優雅地處理失敗的任務。
@app.post("/process-batch-safe")
def process_batch_safe(item_count: int = 5):
"""安全的批次處理 - 妥善處理執行緒錯誤"""
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(process_data_item, i) for i in range(item_count)]
results = []
errors = []
for future in as_completed(futures):
exception = future.exception()
if exception:
error_info = {
"error": str(exception),
"type": type(exception).__name__
}
errors.append(error_info)
logger.error(f"執行緒任務失敗: {exception}")
else:
result = future.result()
results.append(result)
logger.info(f"任務完成: {result}")
return {
"message": "批次處理完成",
"successful_results": results,
"failed_count": len(errors),
"errors": errors
}
最嚴重的情況是主程式因為等待已死亡的執行緒而被卡死。
from threading import Thread
import time
def problematic_worker():
"""有問題的 worker - 無窮迴圈"""
while True: # 危險!無法停止的無窮迴圈
time.sleep(1)
logger.info("執行中...")
# 這個迴圈永遠不會結束
@app.post("/start-dangerous-task")
def start_dangerous_task():
"""危險的 API:會導致應用程式卡死"""
# 啟動背景執行緒
worker_thread = Thread(target=problematic_worker)
worker_thread.start()
# 等待執行緒完成
worker_thread.join() # 危險!會永遠等待下去
return {"message": "任務完成"} # 這行永遠不會執行到
問題:當呼叫這個 API 時,程式會卡在 worker_thread.join()
這一行,因為 worker 執行緒陷入無窮迴圈永遠不會結束。API 請求會一直 pending,應用程式也無法正常關閉。
解決方案:設定 timeout 參數
@app.post("/start-safe-task")
def start_safe_task():
"""安全的 API:避免卡死,但執行緒仍會繼續運行"""
worker_thread = Thread(target=problematic_worker, daemon=True)
worker_thread.start()
# 關鍵:設定 timeout,最多等 3 秒
worker_thread.join(timeout=3)
if worker_thread.is_alive():
logger.warning("Worker 未能在時限內結束,但仍在背景執行")
return {"message": "任務啟動,執行時間超過預期", "status": "timeout"}
return {"message": "任務完成", "status": "success"}
注意:上面的解決方案只是避免了 API 卡死,但 problematic_worker
執行緒仍會一直在背景執行。如果要真正停止執行緒,需要修改 worker 函式本身:
from threading import Thread, Event
def better_worker(stop_event: Event):
"""改良的 worker - 可以被停止"""
while not stop_event.is_set(): # 檢查停止信號
time.sleep(1)
logger.info("執行中...")
logger.info("Worker 收到停止信號,正在退出...")
@app.post("/start-better-task")
def start_better_task():
"""更好的解決方案:可控制的執行緒"""
# 為每個任務建立獨立的停止事件
task_stop_event = Event()
worker_thread = Thread(target=better_worker, args=(task_stop_event,), daemon=True)
worker_thread.start()
# 等待 3 秒,如果還在執行就發送停止信號
worker_thread.join(timeout=3)
if worker_thread.is_alive():
logger.info("任務執行超時,發送停止信號...")
task_stop_event.set() # 發送停止信號給這個特定的任務
worker_thread.join(timeout=1) # 再等 1 秒讓執行緒優雅退出
if worker_thread.is_alive():
logger.warning("執行緒未能在時限內停止,但已設為 daemon 執行緒")
return {"message": "任務已發送停止信號,但可能仍在背景執行", "status": "force_stopped"}
return {"message": "任務處理完成", "status": "success"}
核心重點:
join()
= 應用程式卡死join(timeout=3)
可以避免 API 卡死,但執行緒仍會繼續運行Event
)處理背景任務的錯誤需要主動建立監控機制,不能依賴框架自動處理:
future.exception()
檢查任務狀態透過這兩天的學習,我們從基礎 HTTPException 到複雜背景任務錯誤管理,建立了完整的錯誤處理體系。希望能幫助大家面對更複雜的實際環境~