iT邦幫忙

2025 iThome 鐵人賽

DAY 20
0

在昨天的文章中,我們探討了如何在 FastAPI 的請求-回應週期 (Request-Response Cycle) 中處理錯誤。透過 HTTPException 和 Middleware,我們可以有效地捕捉和紀錄發生在 API 路由內的錯誤。

然而,現代 Web 應用程式的複雜度遠不止於此。我們經常需要處理一些耗時的任務,例如寄送電子郵件、處理影片轉檔、或是呼叫第三方服務。這些任務通常會被放在背景執行,以免阻塞主流程。但這也帶來了一個新的挑戰:當錯誤發生在這些「看不見」的背景任務中時,該怎麼辦?

背景任務錯誤的棘手之處

想像一下,你透過 API 觸發一個在背景執行緒 (Thread) 中執行的複雜資料庫操作。如果這個操作因為某些原因失敗並拋出例外,會發生什麼事?

  1. 錯誤是沉默的:這個例外發生在執行緒內部,FastAPI 的 Middleware 完全感知不到它的存在,因為它不在主應用程式的 try...except 監控範圍內。
  2. 執行緒死亡:未被處理的例外會導致該執行緒終止。如果這是一個長期運行的 worker thread,它的突然死亡可能會導致後續的任務無人處理。
  3. 應用程式卡死:最嚴重的問題是,主程式可能還在等待這個已經「死亡」的執行緒回傳結果 (例如透過 thread.join())。這會導致主程式永遠等待下去,即使你按下 Ctrl+C,也因為主執行緒被阻塞而無法正常關閉,造成所謂的「殭屍程序」。

這就是為什麼處理背景任務的錯誤至關重要的原因。

常見的耗時任務錯誤類型

1. BackgroundTasks 的錯誤陷阱

BackgroundTasks 雖然簡單易用,但它有一個特點:錯誤會被靜默忽略

這個「特點」到底是好事還是壞事是取決於任務本身,以下就假設大家希望至少知道任務是否有發生錯誤,並記錄。

from fastapi import FastAPI, BackgroundTasks
import logging

logger = logging.getLogger("uvicorn")
app = FastAPI()

def dangerous_task():
    """這個任務會拋出錯誤,但不會被捕獲"""
    raise ValueError("背景任務發生錯誤!")
    # 這行永遠不會執行
    logger.info("任務完成")

@app.post("/dangerous-background")
async def run_dangerous_background(background_tasks: BackgroundTasks):
    background_tasks.add_task(dangerous_task)
    return {"message": "任務已提交"}  # API 會正常回應,但背景任務會失敗

問題:使用者會收到「任務已提交」的成功回應,但背景任務實際上失敗了,而且沒有任何錯誤日誌。

解決方案:包裝背景任務,捕獲並記錄所有例外。

import traceback

def safe_background_wrapper(func, *args, **kwargs):
    """安全的背景任務包裝器"""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logger.error(f"背景任務失敗: {func.__name__}")
        logger.error(f"錯誤訊息: {str(e)}")
        logger.error(f"完整錯誤: {traceback.format_exc()}")

@app.post("/safe-background")
async def run_safe_background(background_tasks: BackgroundTasks):
    background_tasks.add_task(safe_background_wrapper, dangerous_task)
    return {"message": "任務已提交(含錯誤處理)"}

2. Thread Pool 的執行緒死亡問題

當我們使用執行緒池來執行背景任務時,未處理的例外會導致執行緒無預警終止。

from fastapi import FastAPI
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

app = FastAPI()

# 模擬會失敗的資料處理任務
def process_data_item(item_id: int):
    time.sleep(1)  # 模擬耗時的資料處理
    if item_id % 3 == 0:  # 每第三個項目會失敗
        raise RuntimeError(f"處理項目 {item_id} 時發生錯誤")
    return {"id": item_id, "status": "processed", "result": f"data_{item_id}_processed"}

# 危險的做法:沒有錯誤處理
@app.post("/process-batch-dangerous")
def process_batch_dangerous(item_count: int = 5):
    """危險的批次處理 - 可能導致應用程式崩潰"""
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(process_data_item, i) for i in range(item_count)]
        
        results = []
        for future in as_completed(futures):
            # 這裡會拋出例外,導致 API 回應 500 錯誤
            result = future.result()  # 危險!未處理的例外會被重新拋出
            results.append(result)
        
        return {"message": "批次處理完成", "results": results}

問題:當 future.result() 被呼叫時,如果任務拋出了例外,這個例外會被重新拋出到主執行緒,導致 API 回應 500 內部伺服器錯誤。

解決方案:使用 future.exception() 來檢查是否有例外發生,並優雅地處理失敗的任務。

@app.post("/process-batch-safe")
def process_batch_safe(item_count: int = 5):
    """安全的批次處理 - 妥善處理執行緒錯誤"""
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(process_data_item, i) for i in range(item_count)]
        
        results = []
        errors = []
        
        for future in as_completed(futures):
            exception = future.exception()
            if exception:
                error_info = {
                    "error": str(exception),
                    "type": type(exception).__name__
                }
                errors.append(error_info)
                logger.error(f"執行緒任務失敗: {exception}")
            else:
                result = future.result()
                results.append(result)
                logger.info(f"任務完成: {result}")
        
        return {
            "message": "批次處理完成",
            "successful_results": results,
            "failed_count": len(errors),
            "errors": errors
        }

3. 應用程式卡死的殭屍程序

最嚴重的情況是主程式因為等待已死亡的執行緒而被卡死。

from threading import Thread
import time

def problematic_worker():
    """有問題的 worker - 無窮迴圈"""
    while True:  # 危險!無法停止的無窮迴圈
        time.sleep(1)
        logger.info("執行中...")
        # 這個迴圈永遠不會結束

@app.post("/start-dangerous-task")
def start_dangerous_task():
    """危險的 API:會導致應用程式卡死"""
    
    # 啟動背景執行緒
    worker_thread = Thread(target=problematic_worker)
    worker_thread.start()
    
    # 等待執行緒完成
    worker_thread.join()  # 危險!會永遠等待下去
    
    return {"message": "任務完成"}  # 這行永遠不會執行到

問題:當呼叫這個 API 時,程式會卡在 worker_thread.join() 這一行,因為 worker 執行緒陷入無窮迴圈永遠不會結束。API 請求會一直 pending,應用程式也無法正常關閉。

解決方案:設定 timeout 參數

@app.post("/start-safe-task")
def start_safe_task():
    """安全的 API:避免卡死,但執行緒仍會繼續運行"""
    
    worker_thread = Thread(target=problematic_worker, daemon=True)
    worker_thread.start()
    
    # 關鍵:設定 timeout,最多等 3 秒
    worker_thread.join(timeout=3)
    
    if worker_thread.is_alive():
        logger.warning("Worker 未能在時限內結束,但仍在背景執行")
        return {"message": "任務啟動,執行時間超過預期", "status": "timeout"}
    
    return {"message": "任務完成", "status": "success"}

注意:上面的解決方案只是避免了 API 卡死,但 problematic_worker 執行緒仍會一直在背景執行。如果要真正停止執行緒,需要修改 worker 函式本身:

from threading import Thread, Event

def better_worker(stop_event: Event):
    """改良的 worker - 可以被停止"""
    while not stop_event.is_set():  # 檢查停止信號
        time.sleep(1)
        logger.info("執行中...")
    logger.info("Worker 收到停止信號,正在退出...")

@app.post("/start-better-task")
def start_better_task():
    """更好的解決方案:可控制的執行緒"""
    
    # 為每個任務建立獨立的停止事件
    task_stop_event = Event()
    
    worker_thread = Thread(target=better_worker, args=(task_stop_event,), daemon=True)
    worker_thread.start()
    
    # 等待 3 秒,如果還在執行就發送停止信號
    worker_thread.join(timeout=3)
    
    if worker_thread.is_alive():
        logger.info("任務執行超時,發送停止信號...")
        task_stop_event.set()  # 發送停止信號給這個特定的任務
        worker_thread.join(timeout=1)  # 再等 1 秒讓執行緒優雅退出
        
        if worker_thread.is_alive():
            logger.warning("執行緒未能在時限內停止,但已設為 daemon 執行緒")
            return {"message": "任務已發送停止信號,但可能仍在背景執行", "status": "force_stopped"}
        
    return {"message": "任務處理完成", "status": "success"}

核心重點

  • 無窮迴圈的執行緒 + 沒有 timeout 的 join() = 應用程式卡死
  • 加上 join(timeout=3) 可以避免 API 卡死,但執行緒仍會繼續運行
  • 要真正停止執行緒,需要在 worker 函式中加入停止機制(如 Event

小結

處理背景任務的錯誤需要主動建立監控機制,不能依賴框架自動處理:

  • BackgroundTasks:使用包裝函式捕捉靜默失敗
  • ThreadPool:用 future.exception() 檢查任務狀態
  • 執行緒管理:設定 timeout 避免卡死,為每個任務建立獨立的停止機制

透過這兩天的學習,我們從基礎 HTTPException 到複雜背景任務錯誤管理,建立了完整的錯誤處理體系。希望能幫助大家面對更複雜的實際環境~


上一篇
[Day 19] 錯誤處理 (一):基礎
下一篇
[Day 21] 進度追蹤 (一):Long Polling
系列文
用 FastAPI 打造你的 AI 服務22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言