🎯 目標
在 Flyte 中建立 模型訓練工作流 (ML Training Workflow)
將簡單的 ML/DL 模型(例如 Logistic Regression 或 Tiny MLP)訓練 → 匯出為 ONNX
與前一天的 ETL 工作流 串接(數據 → 模型)
為後續 Spring Boot AI API 提供模型檔案(models/recommender.onnx)
🏗️ 工作流設計
Tasks
t_prepare_dataset()
從 MySQL 抓資料(或直接用 Pandas 讀 staging/evidence 表)
建立特徵矩陣 X、標籤 y
輸出為 train.parquet + test.parquet
t_train_model()
載入 train.parquet
用 Scikit-learn(Logistic Regression / XGBoost)或 PyTorch(Tiny MLP)訓練
儲存 model.pkl
t_export_onnx()
將 model.pkl 匯出為 model.onnx
位置:data/models/recommender.onnx
t_evaluate_model()
在 test.parquet 上做驗證
回傳 Accuracy / F1 / ROC-AUC(選一即可)
wf_train_pipeline()
串起整個流程:prepare → train → export → evaluate
🔧 技術要點
Scikit-learn → ONNX
用 skl2onnx 套件匯出
範例:
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
initial_type = [('float_input', FloatTensorType([None, n_features]))]
onnx_model = convert_sklearn(clf, initial_types=initial_type)
with open("data/models/recommender.onnx", "wb") as f:
f.write(onnx_model.SerializeToString())
PyTorch → ONNX
用 torch.onnx.export
適合之後 Day14 的 AI reranker
資源設定
每個 task 可設定:@task(requests=Resources(cpu="2", mem="2Gi"))
MacBook M4 Air CPU-only,確保模型小(<100MB)
🧪 測試計畫
Local run:
pyflyte run workflows/train_day17.py wf_train_pipeline --effect-id E002
應在 data/models/ 下看到 recommender.onnx
檢查 ONNX 模型:
import onnx
model = onnx.load("data/models/recommender.onnx")
onnx.checker.check_model(model)
Spring Boot 端點 /api/recommendations/ai 能載入該模型(Day14 的 Service 已預留)
✅ 驗收標準
wf_train_pipeline 執行成功
models/recommender.onnx 產生
t_evaluate_model 回傳一個評估數字(例如 Accuracy > 0.6)
Spring Boot /api/recommendations/ai 成功跑推論(用同一個 ONNX)
📦 今日交付
workflows/train_day17.py(Flyte workflow for training)
requirements.txt 新增:
scikit-learn
skl2onnx
onnx
(或 PyTorch + onnx,依選擇的模型)
data/models/recommender.onnx(dummy 模型也行)
🔗 與 Day17 的銜接
Day17:wf_etl_day3 → 更新 evidence
Day18:wf_train_pipeline → 根據 evidence 訓練模型
Day19(展望):整合 ETL + Training,形成 完整 MLOps pipeline
pipeline: ETL → Train → Export → Evaluate → Deploy