一句話版本:把慢又不穩的事情踢出請求路徑,用可觀測、可恢復、可替換的背景作業把系統撐住。
在 Day 20 我們把 API 層做了 Ports/Adapters 抽象,服務邏輯和外部世界分隔乾淨。接著最常「拖慢回應」的,其實是寄信、產報表、對第三方 API 回補、批次同步、特徵抽取這類 I/O 工作。把它們丟到背景作業,就能讓同步請求只負責接單與回覆 202,餘下交給 worker 處理。
本文聚焦三條路線與落地做法:
同時銜接前幾天的主題:設定/祕密、結構化日誌、重試與降級、快取策略、測試與一鍵化工作流,讓背景系統不再是黑箱。
判斷清單:
同步路徑負責:驗證輸入 → 簡短商業檢查 → 立刻回 202/accepted。其餘交給任務系統與告警。
面向 | Celery | Dramatiq | asyncio 任務 |
---|---|---|---|
Broker | Redis、RabbitMQ… | Redis、RabbitMQ | 無需外部 broker(自管佇列或記憶體/DB) |
工作流 | group/chord/map 等 canvas 豐富 | 輕量,藉由 middleware/程式邏輯組裝 | 以 TaskGroup/自寫 DAG |
重試/延遲 | 內建策略完整 | 直覺、裝飾器即可 | 自行用 tenacity/策略 |
排程 | celery beat 或外掛 | dramatiq-cron | APScheduler、系統 cron |
運維 | 生態完整,配置面多 | 簡單、腳本好維護 | 最輕,但可靠性要自己補 |
典型場景 | 複雜工作流、大團隊、路由與節流 | 中小團隊、80% 常見需求 | 邊車任務、單機服務、PoC |
小結:要工作流與治理,選 Celery;想輕快省腦,選 Dramatiq;最小依賴與單節點,選 asyncio。
my_project/
├─ src/my_project/
│ ├─ domain/ # 用例/介面(Ports)
│ ├─ services/ # 應用服務
│ ├─ adapters/
│ │ ├─ tasks_celery.py # Celery 實作(Adapter)
│ │ ├─ tasks_dramatiq.py # Dramatiq 實作(Adapter)
│ │ └─ tasks_asyncio.py # asyncio 任務實作(Adapter)
│ ├─ settings.py # 設定/祕密
│ └─ logging_config.py # 結構化日誌
└─ tests/
設計重點:服務層只表達「意圖」,不依賴特定任務框架。 Adapter 可自由替換。
pyproject.toml
[project.optional-dependencies]
worker = ["celery>=5.4", "redis>=5", "orjson>=3"]
[tool.hatch.envs.worker]
features = ["worker", "dev"]
[tool.hatch.envs.worker.scripts]
worker = "celery -A my_project.adapters.tasks_celery:app worker -Q default -l info"
beat = "celery -A my_project.adapters.tasks_celery:app beat -l info"
src/my_project/adapters/tasks_celery.py
)from celery import Celery
import structlog
from my_project.settings import AppSettings
log = structlog.get_logger()
s = AppSettings()
app = Celery("myproj", broker=s.redis_url, backend=s.redis_url)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_default_queue="default",
task_acks_late=True,
worker_prefetch_multiplier=1,
)
@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True,
retry_kwargs={"max_retries": 5})
def send_email_task(self, to: str, subject: str, body: str) -> None:
log.info("task_start", task="send_email", to=to, retry=self.request.retries)
do_send_email(to, subject, body)
log.info("task_done", task="send_email", to=to)
def do_send_email(to, subject, body): ...
# src/my_project/services/mail_service.py
from my_project.adapters.tasks_celery import send_email_task
def enqueue_welcome_mail(email: str) -> None:
send_email_task.delay(email, "Welcome", "Thanks for joining!")
# app.conf.beat_schedule 可在模組底部或獨立檔
app.conf.beat_schedule = {
"rebuild-cached-metrics": {
"task": "my_project.adapters.tasks_celery:rebuild_metrics",
"schedule": 300, # 每 5 分鐘
}
}
什麼時候首選 Celery:你需要 chord/group、路由、Rate limit、路線複雜度高,或團隊已經有既有運維手冊。
[project.optional-dependencies]
dq = ["dramatiq[redis,binaries]>=1.15", "orjson>=3"]
[tool.hatch.envs.dq]
features = ["dq", "dev"]
[tool.hatch.envs.dq.scripts]
worker = "dramatiq my_project.adapters.tasks_dramatiq --processes 2 --threads 4"
src/my_project/adapters/tasks_dramatiq.py
)import dramatiq
from dramatiq.brokers.redis import RedisBroker
import structlog
from my_project.settings import AppSettings
log = structlog.get_logger()
s = AppSettings()
dramatiq.set_broker(RedisBroker(url=s.redis_url))
@dramatiq.actor(max_retries=5, retry_when=lambda e: isinstance(e, Exception))
def send_email_actor(to: str, subject: str, body: str):
log.info("task_start", task="send_email", to=to)
do_send_email(to, subject, body)
log.info("task_done", task="send_email", to=to)
from my_project.adapters.tasks_dramatiq import send_email_actor
def enqueue_welcome_mail(email: str) -> None:
send_email_actor.send(email, "Welcome", "Thanks for joining!")
什麼時候首選 Dramatiq:你要的是 80% 場景的順手工具,不想在設定與教學成本上花太多力氣。
src/my_project/adapters/tasks_asyncio.py
)import anyio, structlog
log = structlog.get_logger()
class AsyncTaskBus:
def __init__(self):
self._tg = None
async def start(self):
self._tg = anyio.create_task_group()
await self._tg.__aenter__()
async def stop(self):
if self._tg:
await self._tg.__aexit__(None, None, None)
async def enqueue_send_email(self, to: str, subject: str, body: str):
async def _job():
log.info("task_start", task="send_email", to=to)
await do_send_email_async(to, subject, body)
log.info("task_done", task="send_email", to=to)
self._tg.start_soon(_job)
將 TaskBus 綁進 FastAPI lifespan 啟停。重試用 tenacity
包一層;排程交給 APScheduler 或系統 cron。注意:沒有獨立進程,不抗崩,需搭配監控與告警。
輸出 JSON log,至少包含:
event
/task
/task_id
args_hash
(避免洩敏)retry_count
/queue
/latency_ms
result
或 error
(含錯誤類型與堆疊摘要)建議在 Adapter 放一層 decorator,把上述欄位統一注入。
.env.example
提供模板,正式值進秘密管控服務。新增一條提交任務的端點,立刻回 202:
from fastapi import APIRouter, status
from pydantic import BaseModel, EmailStr
from my_project.services.mail_service import enqueue_welcome_mail
router = APIRouter()
class WelcomeIn(BaseModel):
email: EmailStr
@router.post("/jobs/welcome", status_code=status.HTTP_202_ACCEPTED)
def enqueue_welcome(payload: WelcomeIn):
enqueue_welcome_mail(payload.email)
return {"accepted": True}
搭配請求日誌寫入 request_id
與 task_id
,形成端到端的追蹤鏈。
hatch run api:serve
啟 APIhatch run worker:worker
或 hatch run dq:worker
啟用對應 workerhatch run worker:beat
啟排程背景作業不是把髒事掃到地毯底下,而是以工程化的方式,讓慢 I/O 與不確定性有地方安放。當使用者拿到立即的 202,而你在側翼穩定處理重試、降級、告警和觀測,整個系統的「心理安全感」才算補齊。今天選好你的隊列,明天就能開始睡得比較好。