iT邦幫忙

2025 iThome 鐵人賽

DAY 16
0

在上一篇文章中,我們學習了如何用 BackgroundTasks 處理「即發即棄」的任務。今天,我們要面對另一個在非同步世界中常見的難題:如何與「同步阻塞 (Synchronous Blocking)」的程式碼共存?

同步與非同步的共存挑戰

async/await 的世界裡,最忌諱的就是執行一個會長時間佔用 CPU 或等待 I/O 的同步函式,例如傳統的資料庫查詢、圖片處理、或是呼叫一個老舊的第三方 SDK。這樣的函式會霸佔整個事件循環 (Event Loop),導致伺服器無法處理任何其他連線請求,應用程式看起來就像「卡住」了一樣。

對此,FastAPI 對此有很好的解決方案。

def 與 async def

FastAPI 聰明地意識到開發者不可能將所有程式碼都改為非同步。因此,它做了一個巧妙的設計:

  • 當你用 async def 定義路由時,它會直接在主事件循環中執行。
  • 當你用 def 定義路由時,FastAPI 會自動將這個同步函式丟到一個外部的執行緒池 (Thread Pool) 中去執行。

這意味著,即使你的 def 路由函式內部有阻塞操作,它也只會阻塞那個背景執行緒,而不會影響主事件循環,FastAPI 伺服器依然能順暢地處理其他請求。

run_in_threadpool

如果我們想在一個 async def 的路由中,呼叫某個特定的阻塞函式呢?Starlette (FastAPI 的底層框架) 提供了一個非常有用的工具:run_in_threadpool

這個函式可以將任何普通的同步函式,包裝成一個可等待 (awaitable) 的物件,並將其提交到執行緒池中執行。

基礎程式碼範例

假設我們有一個無法修改的、會阻塞的函式庫。

from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool
import time
import logging
from datetime import datetime

# 設定 logger
logger = logging.getLogger("uvicorn")

app = FastAPI()

def sync_blocking_call(name: str):
    """一個模擬 CPU 密集或阻塞 I/O 的同步函式"""
    current_time = datetime.now().strftime("%H:%M:%S")
    logger.info(f"[{current_time}] 開始執行阻塞任務 {name}...")
    time.sleep(5) # 模擬 blocking
    result = f"任務 {name} 執行完畢!"
    current_time = datetime.now().strftime("%H:%M:%S")
    logger.info(f"[{current_time}] {result}")
    return result

@app.get("/block")
async def run_blocking_task():
    current_time = datetime.now().strftime("%H:%M:%S")
    logger.info(f"[{current_time}] 接收到請求")

    # 直接呼叫 sync_blocking_call("範例任務") 會阻塞整個伺服器!
    # result = sync_blocking_call("範例任務")

    # 正確的方式是將它交給 thread pool
    result = await run_in_threadpool(sync_blocking_call, name="範例任務")
    # 主事件循環在等待的 5 秒內是自由的,可以接收其他請求
    return {"message": "阻塞任務已在背景執行緒池完成", "result": result}

當你呼叫這個 API 時,雖然需要等待 5 秒才能收到回應,但在這 5 秒期間,你的 FastAPI 應用是完全可以回應其他請求的(例如你可以開另一個分頁訪問其他 API)。

替代方案:使用原生 threading

除了 run_in_threadpool,我們也可以使用 Python 原生的 threading 模組來手動管理執行緒。這種方法提供更多的控制權,但需要自己處理執行緒的生命週期:

import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor

# ...

@app.get("/block-with-threading")
async def run_blocking_task_with_threading():
    current_time = datetime.now().strftime("%H:%M:%S")
    logger.info(f"[{current_time}] 接收到請求")

    # 方法一:使用 ThreadPoolExecutor
    with ThreadPoolExecutor() as executor:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(executor, sync_blocking_call, "執行緒池任務")
    
    return {"message": "使用 ThreadPoolExecutor 完成", "result": result}

@app.get("/block-fire-and-forget")
async def run_blocking_task_fire_and_forget():
    current_time = datetime.now().strftime("%H:%M:%S")
    logger.info(f"[{current_time}] 接收到請求")

    # 方法二:使用 threading.Thread(火力全開模式)
    def run_task():
        sync_blocking_call("背景執行緒任務")
    
    thread = threading.Thread(target=run_task)
    thread.daemon = True  # 主程式結束時,背景執行緒也會跟著結束
    thread.start()
    
    return {"message": "任務已在背景執行緒啟動,不等待結果"}

使用時機比較:

  • run_in_threadpool:最簡單,適用於大多數情況
  • ThreadPoolExecutor:需要更精細的執行緒池控制時
  • threading.Thread:需要完全自定義執行緒行為,或是「即發即棄」的背景任務

小結

使用 def 路由或 run_in_threadpool 是在非同步框架中整合傳統同步程式碼的最佳實踐。它解決了「阻塞」的問題。

然而,它和 BackgroundTasks 一樣,任務的執行依然受限於當前的伺服器。如果任務量過大,會耗盡執行緒池;如果伺服器重啟,正在執行的任務也會中斷。

如果我們希望任務的執行不僅不阻塞,還要具備高可靠性與擴展性,該怎麼辦?這就需要引入更專業的架構了,我們會在下一篇文章進行探討。


上一篇
[Day 15] 任務管理 (一):Background Task
下一篇
[Day 17] 任務管理 (三):任務佇列 (Task Queue)
系列文
用 FastAPI 打造你的 AI 服務22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言