iT邦幫忙

2025 iThome 鐵人賽

DAY 7
0
生成式 AI

打造 AI 微調平台:從系統設計到 AI 協作的 30 天實戰筆記系列 第 7

[Day 7] 任務排程基礎:Celery + Redis 的非同步訓練

  • 分享至 

  • xImage
  •  

完整程式碼可在 GitHub 專案中找到:Finetune-30-days-demo / day-7


在前幾天,我們的訓練流程仍然是直接跑 python train_lora.py。這樣做雖然簡單,但在實務上馬上遇到三個問題:

  1. 阻塞:訓練可能要跑數十分鐘甚至數小時,API 或 CLI 都會被卡住。
  2. 無法併發:如果兩個使用者同時啟動訓練,系統只能排隊處理。
  3. 缺乏狀態追蹤:使用者無法知道訓練是否還在進行、失敗或完成。

如果我們想讓專案真正往「平台化」邁進,必須先解決這三個問題。


為什麼選 Celery + Redis?

  • Celery:業界成熟的任務排程框架,內建非同步執行、狀態管理、錯誤重試等功能。
  • Redis:部署簡單、效能好,同時能充當 queue(任務佇列)與 backend(結果儲存)。
  • 擴展性:未來要整合 Kubernetes、監控系統(Prometheus/Grafana)、或增加 worker 都很容易。

換句話說,Celery + Redis 是最小成本、卻能帶來最大平台效益的組合。


我們做了哪些設計

1. 任務管理架構

新增 app/tasks/app/api.py

app/
├── tasks/
│   ├── __init__.py      # Celery 初始化與配置
│   └── training.py      # LoRA 訓練任務
└── api.py               # FastAPI API 介面

2. Celery 初始化

app/tasks/__init__.py 裡,設定 Celery 與 Redis:

celery_app = Celery(
    "finetune_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

後續又加上了 集中化配置celery_config.py)與 Redis 連線檢查,確保 worker 啟動前環境可用。

3. 把訓練封裝成任務

app/tasks/training.py 定義非同步任務,並加上狀態更新:

@celery_app.task(bind=True)
def train_lora_task(self, config: dict):
    self.update_state(state="STARTED", meta={"progress": "initializing"})
    result = run_training(config)
    return {"status": "completed", "result": result}

4. 提供 API

用 FastAPI 建立兩個端點:

  • POST /train:提交新訓練 → 回傳 task_id
  • GET /task/{id}:查詢任務狀態 → 回傳 PENDING/STARTED/SUCCESS/FAILURE

5. 執行流程

  1. 使用者提交 /train 請求 → 立即回傳 task_id
  2. Celery worker 在背景執行訓練
  3. 使用者查詢 /task/{id} → 得到即時狀態與結果

透過 Celery + Redis,我們讓訓練流程具備了非同步執行、同時處理多個實驗的能力,並且能即時追蹤任務狀態。這不僅解決了阻塞與併發的問題,也替後續功能(像是進度條、詳細訓練指標)建立了基礎。

更重要的是,Day 7 讓系統正式從「單機腳本」邁向「平台級服務」:使用者不再需要等待,系統則具備橫向擴展的能力,只要增加 worker 就能處理更多任務。這是任務平台化的第一步,也為未來的前端 UI 串接、任務追蹤與錯誤恢復等功能鋪好了路。


📎 AI 協作記錄:今日開發指令

請幫我在專案中新增「任務排程基礎」功能,目的是把 LoRA 訓練流程改為非同步執行。

需求:
1. 使用 Celery + Redis 實作任務排程
   - Celery worker 負責執行訓練
   - Redis 作為 broker 和 backend
2. 把訓練流程包成一個 Celery 任務 (train_lora_task)
   - 任務會回報狀態 (PENDING, STARTED, SUCCESS, FAILURE)
3. 建立 FastAPI API 端點
   - POST /train → 建立任務並回傳 task_id
   - GET /task/{id} → 查詢任務狀態與結果
4. 更新 Makefile,新增啟動 Redis、Celery Worker、FastAPI API 的指令
5. 確保程式結構清楚,新增 app/tasks/ 目錄來管理 Celery 相關檔案

請完成上述修改,讓我可以:
- 提交訓練任務
- 在背景執行訓練
- 查詢任務的執行情況

請幫我把 LoRA 訓練流程改成非同步執行,使用 Celery + Redis + FastAPI。目標是實作一個最小可行版本(MVP),只需要做到以下:

1. 建立 Celery 應用(app/tasks/__init__.py),使用 Redis 當 broker 與 backend。
2. 建立一個訓練任務(app/tasks/training.py),包裝 train_lora,並能回傳簡單結果(例如 val_loss、val_acc)。
3. 建立 FastAPI API(app/api.py),提供:
   - POST /train → 提交任務,回傳 task_id
   - GET /task/{id} → 查詢任務狀態(PENDING、STARTED、SUCCESS、FAILURE)
4. 更新 Makefile,新增 start-services、start-worker、start-api 指令。
5. 確保提交訓練後,API 不會被阻塞,可以查詢任務狀態。

請只做最小範例,不需要進度條、不需要資料庫紀錄。

上一篇
[Day 6] 日誌與結果管理:讓每次實驗有自己的家
系列文
打造 AI 微調平台:從系統設計到 AI 協作的 30 天實戰筆記7
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言