iT邦幫忙

2025 iThome 鐵人賽

DAY 21
0

一句話版本:把慢又不穩的事情踢出請求路徑,用可觀測、可恢復、可替換的背景作業把系統撐住。

為什麼今天要談這個

在 Day 20 我們把 API 層做了 Ports/Adapters 抽象,服務邏輯和外部世界分隔乾淨。接著最常「拖慢回應」的,其實是寄信、產報表、對第三方 API 回補、批次同步、特徵抽取這類 I/O 工作。把它們丟到背景作業,就能讓同步請求只負責接單與回覆 202,餘下交給 worker 處理。

本文聚焦三條路線與落地做法:

  • Celery:生態完整、工作流能力強,偏企業級。
  • Dramatiq:輕量、直覺、預設合理,維運心智成本低。
  • asyncio 任務:零外部 broker 的極簡派,適合單節點或邊車任務。

同時銜接前幾天的主題:設定/祕密、結構化日誌、重試與降級、快取策略、測試與一鍵化工作流,讓背景系統不再是黑箱。


你應該把哪些工作「丟背景」

判斷清單:

  1. 慢 I/O 或高不確定性:第三方 API、外郵件、檔案上傳轉碼、Webhook 回填。
  2. 可重試且可延後:失敗不影響當下使用者體驗,可透過重試或補償達成一致。
  3. CPU 吃重且離線可接受:報表產出、壓縮轉檔、ML 特徵抽取。
  4. 需要排程/節流/聚合:每 5 分鐘同步一次、夜間批次、Rate limit 保護。

同步路徑負責:驗證輸入 → 簡短商業檢查 → 立刻回 202/accepted。其餘交給任務系統與告警。


選型地圖:Celery vs Dramatiq vs asyncio

面向 Celery Dramatiq asyncio 任務
Broker Redis、RabbitMQ… Redis、RabbitMQ 無需外部 broker(自管佇列或記憶體/DB)
工作流 group/chord/map 等 canvas 豐富 輕量,藉由 middleware/程式邏輯組裝 以 TaskGroup/自寫 DAG
重試/延遲 內建策略完整 直覺、裝飾器即可 自行用 tenacity/策略
排程 celery beat 或外掛 dramatiq-cron APScheduler、系統 cron
運維 生態完整,配置面多 簡單、腳本好維護 最輕,但可靠性要自己補
典型場景 複雜工作流、大團隊、路由與節流 中小團隊、80% 常見需求 邊車任務、單機服務、PoC

小結:要工作流與治理,選 Celery;想輕快省腦,選 Dramatiq;最小依賴與單節點,選 asyncio。


專案骨架(延續 Ports/Adapters)

my_project/
├─ src/my_project/
│  ├─ domain/                # 用例/介面(Ports)
│  ├─ services/              # 應用服務
│  ├─ adapters/
│  │  ├─ tasks_celery.py     # Celery 實作(Adapter)
│  │  ├─ tasks_dramatiq.py   # Dramatiq 實作(Adapter)
│  │  └─ tasks_asyncio.py    # asyncio 任務實作(Adapter)
│  ├─ settings.py            # 設定/祕密
│  └─ logging_config.py      # 結構化日誌
└─ tests/

設計重點:服務層只表達「意圖」,不依賴特定任務框架。 Adapter 可自由替換。


Celery:成熟工兵的最短路徑

相依與一鍵化

pyproject.toml

[project.optional-dependencies]
worker = ["celery>=5.4", "redis>=5", "orjson>=3"]

[tool.hatch.envs.worker]
features = ["worker", "dev"]

[tool.hatch.envs.worker.scripts]
worker = "celery -A my_project.adapters.tasks_celery:app worker -Q default -l info"
beat   = "celery -A my_project.adapters.tasks_celery:app beat -l info"

Adapter(src/my_project/adapters/tasks_celery.py

from celery import Celery
import structlog
from my_project.settings import AppSettings

log = structlog.get_logger()
s = AppSettings()

app = Celery("myproj", broker=s.redis_url, backend=s.redis_url)
app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_default_queue="default",
    task_acks_late=True,
    worker_prefetch_multiplier=1,
)

@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True,
          retry_kwargs={"max_retries": 5})
def send_email_task(self, to: str, subject: str, body: str) -> None:
    log.info("task_start", task="send_email", to=to, retry=self.request.retries)
    do_send_email(to, subject, body)
    log.info("task_done", task="send_email", to=to)

def do_send_email(to, subject, body): ...

在服務層觸發

# src/my_project/services/mail_service.py
from my_project.adapters.tasks_celery import send_email_task

def enqueue_welcome_mail(email: str) -> None:
    send_email_task.delay(email, "Welcome", "Thanks for joining!")

排程(Beat)

# app.conf.beat_schedule 可在模組底部或獨立檔
app.conf.beat_schedule = {
    "rebuild-cached-metrics": {
        "task": "my_project.adapters.tasks_celery:rebuild_metrics",
        "schedule": 300,  # 每 5 分鐘
    }
}

什麼時候首選 Celery:你需要 chord/group、路由、Rate limit、路線複雜度高,或團隊已經有既有運維手冊。


Dramatiq:輕量好懂的日常解

相依與一鍵化

[project.optional-dependencies]
dq = ["dramatiq[redis,binaries]>=1.15", "orjson>=3"]

[tool.hatch.envs.dq]
features = ["dq", "dev"]

[tool.hatch.envs.dq.scripts]
worker = "dramatiq my_project.adapters.tasks_dramatiq --processes 2 --threads 4"

Adapter(src/my_project/adapters/tasks_dramatiq.py

import dramatiq
from dramatiq.brokers.redis import RedisBroker
import structlog
from my_project.settings import AppSettings

log = structlog.get_logger()
s = AppSettings()
dramatiq.set_broker(RedisBroker(url=s.redis_url))

@dramatiq.actor(max_retries=5, retry_when=lambda e: isinstance(e, Exception))
def send_email_actor(to: str, subject: str, body: str):
    log.info("task_start", task="send_email", to=to)
    do_send_email(to, subject, body)
    log.info("task_done", task="send_email", to=to)

服務層觸發

from my_project.adapters.tasks_dramatiq import send_email_actor

def enqueue_welcome_mail(email: str) -> None:
    send_email_actor.send(email, "Welcome", "Thanks for joining!")

什麼時候首選 Dramatiq:你要的是 80% 場景的順手工具,不想在設定與教學成本上花太多力氣。


asyncio 任務:零外部依賴的邊車方案

適用場景

  • 單節點服務或內部工具,任務失敗可容忍,量級中小。
  • 已在專案廣泛使用 asyncio/anyio,想走最少依賴。

任務匯流排(src/my_project/adapters/tasks_asyncio.py

import anyio, structlog
log = structlog.get_logger()

class AsyncTaskBus:
    def __init__(self):
        self._tg = None

    async def start(self):
        self._tg = anyio.create_task_group()
        await self._tg.__aenter__()

    async def stop(self):
        if self._tg:
            await self._tg.__aexit__(None, None, None)

    async def enqueue_send_email(self, to: str, subject: str, body: str):
        async def _job():
            log.info("task_start", task="send_email", to=to)
            await do_send_email_async(to, subject, body)
            log.info("task_done", task="send_email", to=to)
        self._tg.start_soon(_job)

將 TaskBus 綁進 FastAPI lifespan 啟停。重試用 tenacity 包一層;排程交給 APScheduler 或系統 cron。注意:沒有獨立進程,不抗崩,需搭配監控與告警。


可觀測與可靠性:不要再養黑箱

結構化日誌

輸出 JSON log,至少包含:

  • event/task/task_id
  • args_hash(避免洩敏)
  • retry_count/queue/latency_ms
  • resulterror(含錯誤類型與堆疊摘要)

建議在 Adapter 放一層 decorator,把上述欄位統一注入。

重試與降級策略

  • 指數回退 + 抖動,給最大重試上限與死信佇列。
  • 失敗後降級:回舊快取、排進人工清單、或發補償事件。
  • 不可重試錯誤直接打標,避免風暴(例如參數驗證錯、永久 4xx)。

快取與壓力管理

  • 重建型任務使用「版本化 key」,避免 stampede。
  • 高頻任務加 Rate limit 或批次聚合。
  • 大任務拆小塊、以 idempotency key 確保重試安全。

設定與祕密

  • broker URL、Queue 名稱、重試上限、告警閾值全部走設定層。
  • .env.example 提供模板,正式值進秘密管控服務。

與 API 的整合:接單就走

新增一條提交任務的端點,立刻回 202:

from fastapi import APIRouter, status
from pydantic import BaseModel, EmailStr
from my_project.services.mail_service import enqueue_welcome_mail

router = APIRouter()

class WelcomeIn(BaseModel):
    email: EmailStr

@router.post("/jobs/welcome", status_code=status.HTTP_202_ACCEPTED)
def enqueue_welcome(payload: WelcomeIn):
    enqueue_welcome_mail(payload.email)
    return {"accepted": True}

搭配請求日誌寫入 request_idtask_id,形成端到端的追蹤鏈。


測試策略與一鍵化工作流

  • 單元測試:服務層注入假的任務 Adapter(fake/spy),驗證「意圖有被送出」,不在 unit 測框架本身。
  • 整合測試:在 CI 起 Redis,啟一個 worker,打 API 後觀察副作用(例如信件 mock、資料庫旗標)。
  • 一鍵化
    • hatch run api:serve 啟 API
    • hatch run worker:workerhatch run dq:worker 啟用對應 worker
    • hatch run worker:beat 啟排程


結論

背景作業不是把髒事掃到地毯底下,而是以工程化的方式,讓慢 I/O 與不確定性有地方安放。當使用者拿到立即的 202,而你在側翼穩定處理重試、降級、告警和觀測,整個系統的「心理安全感」才算補齊。今天選好你的隊列,明天就能開始睡得比較好。


上一篇
Day 20 -API 層:用 FastAPI 落實 Ports/Adapters(六邊形味道)
下一篇
Day 22 -發佈與版本:Hatch build wheel、SemVer、Changelog
系列文
30 天 Python 專案工坊:環境、結構、測試到部署全打通23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言