iT邦幫忙

2025 iThome 鐵人賽

DAY 14
0
DevOps

30 天帶你實戰 LLMOps:從 RAG 到觀測與部署系列 第 14

Day14 - LLMOps Pipeline 自動化實戰:用 Prefect 與 Dagster,拯救你的睡眠時間

  • 分享至 

  • xImage
  •  

🔹 前言

終於來到和 DevOps 這個主題比較相關的部分了,感謝前面有耐心跟著讀的讀者 😆

在 RAG 系統裡,我們需要週期性地做很多「重複又必要」的工作來讓知識庫持續更新

  • 每天同步最新文件
  • 清洗並切片 (Cleaning & Chunking)
  • 重新產生 Embedding
  • 上傳向量資料庫

如果這些流程靠工程師手動操作,不僅容易出錯,也無法長期維護。 這就是 DevOps 們要介入的地方:Pipeline 自動化、可觀測性、責任分離

這時候,就會需要 Pipeline 自動化工具 來幫忙排程、監控、重試與管理工作流程。

https://ithelp.ithome.com.tw/upload/images/20250928/201200699udPOfRWrK.png


🔹 Airflow

當我想到跟 ML 有關的自動化工具時,我第一個會想到的是 Apache Airflow

Airflow:傳統的重量級方案

在資料工程領域,最有名的 Workflow Orchestration 工具就是 Apache Airflow

  • 由 Airbnb 開發,後來捐贈給 Apache 基金會。
  • 用 DAG (Directed Acyclic Graph) 來表達任務依賴,適合複雜 ETL 流程。
  • 提供完整的 Scheduler + Web UI,能處理數百上千任務。

https://ithelp.ithome.com.tw/upload/images/20250928/20120069pH34EJNajy.png

這次為什麼我們不選它來 Demo?

  • 架構偏重:需要維護 Scheduler、Worker、Database。
  • 學習曲線高,不太適合小團隊或 side-project。

➡️ 在企業資料平台中它仍是王者,但如果要用在本文的 LLMOps pipeline demo,Airflow 顯得過於笨重。


🔹 Prefect:快速自動化的最佳選擇 🚀

如果你想要「像寫 Python 一樣簡單地寫 pipeline」,Prefect 是很好的選擇。

  • 只要在 function 上加上 @task@flow,就能定義一個 workflow。
  • Prefect Cloud 提供直觀的 UI,能看到任務狀態、錯誤、重試情況。
  • 本地就能跑,不需要維護複雜架構。

https://ithelp.ithome.com.tw/upload/images/20250928/20120069mnXfMheF0B.png

🛠️ Demo:用 Prefect 自動化 RAG 更新

這個範例把 「員工手冊 → 清洗 → Chunk → Embedding →(模擬)向量索引」做成一條 Prefect Flow,程式碼等細節都放在 GitHub - day14_prefect_demo,有興趣的讀者可以自行拉下來研究。

步驟

  1. 讀取員工手冊 (worker_manual.txt)
  2. 文件清洗 & Chunking
  3. Embedding → 產生向量索引 (data/vector_index.json)
  4. 查詢相關內容

支援:

  • 本地一次性執行
  • (可選)建立 Deployment:每日 02:00(Asia/Taipei)自動跑
  • 最小檢索腳本(cosine similarity)

專案結構

day14_prefect_demo/
├─ flows/                 # 流程定義:包含每日更新的 pipeline 與 Prefect Deployment
├─ scripts/               # 輔助腳本:查詢向量、監控檔案變動並自動觸發流程
├─ utils/                 # 工具函式:文件清洗、切片、產生 Embedding
├─ data/                  # 測試資料與輸出:員工手冊 (輸入) / 向量索引 (輸出)
└─ README.md              # 專案說明文件(安裝方式、使用方式)

流程圖

https://ithelp.ithome.com.tw/upload/images/20250928/20120069kVvjT4BciD.png

一次性執行(本地)

直接走一次「員工手冊 → 清洗 → Chunk → Embedding →(模擬)向量索引」這個流程:

python -m flows.daily_pipeline
# 成功後會產生 data/vector_index.json

執行結果:

❯ python -m flows.daily_pipeline
13:46:50.749 | INFO    | prefect.engine - Created flow run 'whimsical-guppy' for flow 'daily_rag_update'
13:46:50.750 | INFO    | Flow run 'whimsical-guppy' - View at http://127.0.0.1:4200/flow-runs/flow-run/d28ddf21-010f-41ef-99f6-066a6c3db4e1
13:46:50.771 | INFO    | Flow run 'whimsical-guppy' - Created task run 'fetch_data-0' for task 'fetch_data'
13:46:50.772 | INFO    | Flow run 'whimsical-guppy' - Executing 'fetch_data-0' immediately...
13:46:50.791 | INFO    | Task run 'fetch_data-0' - Reading file: /Users/hazel/Documents/github/2025-ironman-llmops-demo/day14_prefect_demo/data/worker_manual.txt
13:46:50.798 | INFO    | Task run 'fetch_data-0' - Finished in state Completed()
13:46:50.806 | INFO    | Flow run 'whimsical-guppy' - Created task run 'process_text-0' for task 'process_text'
13:46:50.806 | INFO    | Flow run 'whimsical-guppy' - Executing 'process_text-0' immediately...
13:46:50.826 | INFO    | Task run 'process_text-0' - Finished in state Completed()
13:46:50.834 | INFO    | Flow run 'whimsical-guppy' - Created task run 'build_embeddings-0' for task 'build_embeddings'
13:46:50.835 | INFO    | Flow run 'whimsical-guppy' - Executing 'build_embeddings-0' immediately...
13:46:52.143 | INFO    | Task run 'build_embeddings-0' - Finished in state Completed()
13:46:52.189 | INFO    | Flow run 'whimsical-guppy' - Created task run 'upload-0' for task 'upload'
13:46:52.190 | INFO    | Flow run 'whimsical-guppy' - Executing 'upload-0' immediately...
13:46:52.253 | INFO    | Task run 'upload-0' - Finished in state Completed()
13:46:52.265 | INFO    | Flow run 'whimsical-guppy' - Finished in state Completed('All states completed.')

執行完成會在 data 目錄下生成 vector_index.json 這個檔案。

查詢

(A) 用假向量(不需 API key)

python scripts/query.py "加班規則"

執行結果:

❯ python scripts/query.py "加班規則"
🔎 Query: 加班規則

[1] score=0.0706 | id=0
公司員工手冊 v1.0 第一章:出勤規範 1. 上班時間:上午 9 點至下午 6 點。 2. 請假規則:需提前一天提出申請,緊急情況可事後補辦。 3. 遲到超過 15 分鐘需登記並扣考勤分。 第二章:加班與補休 1. 加班需事前提出申請,經主管核准後方可進行。 2. 加班工時可折換補休,需於一個月內使用完畢。 3. 連續加班超過三日,主管需評估員工狀況。

[2] score=0.0620 | id=1
可折換補休,需於一個月內使用完畢。 3. 連續加班超過三日,主管需評估員工狀況。 第三章:出差與報銷 1. 出差需填寫出差單,並附上詳細行程與預算。 2. 報銷需提供正式發票,金額超過 1000 元需經理簽核。 3. 出差結束後需提交出差報告,三日內完成。 第四章:福利制度 1. 每年提供三天帶薪病假。 2. 員工旅遊每兩年舉辦一次,由公司補助部分費用。

[3] score=0.0556 | id=2
1. 每年提供三天帶薪病假。 2. 員工旅遊每兩年舉辦一次,由公司補助部分費用。 3. 員工可申請教育訓練補助,每年上限 5000 元。 第五章:獎懲制度 1. 表現優異者可獲得年度獎金或晉升機會。 2. 違反公司規範者,視情節輕重給予警告或處分。 3. 貪污、洩密等重大違規行為將直接解僱。

(B) 用 OpenAI Embedding(真實向量)

python scripts/query_with_openai.py "加班規則"

執行結果:

❯ python scripts/query_with_openai.py "加班規則"
🔎 Query: 加班規則
🔧 Query embedding using: text-embedding-3-small

[1] score=0.5728 | id=0
公司員工手冊 v1.0 第一章:出勤規範 1. 上班時間:上午 9 點至下午 6 點。 2. 請假規則:需提前一天提出申請,緊急情況可事後補辦。 3. 遲到超過 15 分鐘需登記並扣考勤分。 第二章:加班與補休 1. 加班需事前提出申請,經主管核准後方可進行。 2. 加班工時可折換補休,需於一個月內使用完畢。 3. 連續加班超過三日,主管需評估員工狀況。

[2] score=0.4492 | id=1
可折換補休,需於一個月內使用完畢。 3. 連續加班超過三日,主管需評估員工狀況。 第三章:出差與報銷 1. 出差需填寫出差單,並附上詳細行程與預算。 2. 報銷需提供正式發票,金額超過 1000 元需經理簽核。 3. 出差結束後需提交出差報告,三日內完成。 第四章:福利制度 1. 每年提供三天帶薪病假。 2. 員工旅遊每兩年舉辦一次,由公司補助部分費用。

[3] score=0.3585 | id=2
1. 每年提供三天帶薪病假。 2. 員工旅遊每兩年舉辦一次,由公司補助部分費用。 3. 員工可申請教育訓練補助,每年上限 5000 元。 第五章:獎懲制度 1. 表現優異者可獲得年度獎金或晉升機會。 2. 違反公司規範者,視情節輕重給予警告或處分。 3. 貪污、洩密等重大違規行為將直接解僱。

🛠️ 使用 Prefect UI 建立 Deployment + 每日 02:00 自動排程

Prefect 的畫面:

https://ithelp.ithome.com.tw/upload/images/20250928/20120069rGcMf9vqeF.png

接下來我們要把 Prefect 的本機 server 跑起來,使用介面來管理流程(適用 Prefect Cloud):

prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
prefect server start

然後部署 Deploymentschedule

"""
flows/deploy.py

這支程式的用途:
1. 匯入 daily_pipeline(主要的 Prefect Flow)。
2. 建立一個 Deployment,將 daily_pipeline 綁定到 Prefect 的工作排程。
3. 設定 Cron 排程:每天 02:00 (Asia/Taipei) 自動執行。
4. 指定 work_queue_name="default",方便 Agent 撿取任務。
5. 套用 Deployment → 註冊到 Prefect Server/Cloud。
   - 之後只要有 Prefect Agent 在跑,流程就會每天 02:00 自動觸發。

流程說明:
    deploy.py → 建立 Deployment → Prefect Server 記錄排程 → Agent 撿取 → 執行 daily_pipeline

使用方式:
    python -m flows.deploy
    # 建立 Deployment 後,可用以下指令確認:
    prefect deployment ls
    # 啟動 Agent(確保有 worker 來執行 flow):
    prefect agent start -q default
"""
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

# 相對匯入:在套件語境下正確
from .daily_pipeline import daily_pipeline

if __name__ == "__main__":
    dep = Deployment.build_from_flow(
        flow=daily_pipeline,
        name="daily-2am",
        schedule=CronSchedule(cron="0 2 * * *", timezone="Asia/Taipei"),
        work_queue_name="default",
    )
    dep.apply()
    print("✅ Deployment created: daily-2am (02:00 Asia/Taipei)")
python -m flows.deploy

在瀏覽器輸入 http://127.0.0.1:4200,接著就可以在 UI 畫面看到:

  • Flows tab 中的 daily_rag_update:「員工手冊 → 清洗 → Chunk → Embedding →(模擬)向量索引」的 RAG Flow。
  • Deployments tab : daily_rag_update (Flow name) / daily-2am (Deployment name),這個 Deployment 會在每天台北時間 02:00 自動執行該 RAG Flow。

https://ithelp.ithome.com.tw/upload/images/20250928/20120069mf8wyKB6Dn.png

https://ithelp.ithome.com.tw/upload/images/20250928/20120069dINu0Ei2my.png

👀 自動監控 (可選)

除了排程,你也可以在 修改 data/worker_manual.txt 後自動觸發流程。

啟動監控腳本

pip install watchdog
python scripts/watch_and_trigger.py

預設行為

  • 監控 data/worker_manual.txt
  • 檔案被修改時,自動執行 flows/daily_pipeline.py

切換成觸發 Prefect Deployment

如果已經有 deployment(例如 daily_rag_update/daily-2am),可以改成用 deployment 方式觸發:

export USE_PREFECT_DEPLOYMENT=true
export PREFECT_DEPLOYMENT_NAME="daily_rag_update/daily-2am"

python scripts/watch_and_trigger.py

其他參數

# 修改監控的檔案路徑
export WATCH_FILE="data/worker_manual.txt"
# 設定去彈跳秒數 (避免一次儲存多次觸發)
export DEBOUNCE_SEC=1

➡️ 對於 LLMOps side-project / demo,非常適合用 Prefect 開始專案自動化。


🔹 Dagster:資料資產導向,企業級的 LLMOps 超強幫手 🏢

Dagster 和 Prefect 最大的不同是:它不是任務導向,而是 資料資產 (Data Asset) 導向

  • 每個步驟產出的結果(cleaned_text、chunks、embedding)都被視為一個資產。

  • Dagster 會自動幫你建立「資料血緣 (lineage)」,能清楚追蹤:

    • 這個 embedding 是由哪個版本的 chunks 生成?
    • 這份 chunks 又是由哪個文件清洗而來?
  • 內建型別檢查,避免髒資料流入下游。

  • Dagit UI 提供漂亮的資料血緣圖。

https://ithelp.ithome.com.tw/upload/images/20250928/20120069mIfcfJPwmz.png

🛠️ Demo: Dagster Pipeline

這是一個使用 Dagster 的 Asset 驅動設計 來管理 RAG 知識庫 Pipeline Demo 的資料轉換流程。本文會省略較多專案的細節,細節等資訊會寫在 GitHub Repo 的 README.md 裡面,有興趣的讀者可以自行拉下來研究。

步驟:

  1. 讀取原始文本(raw_text)- data/worker_manual.txt
  2. 文字清理(cleaned_text)
  3. Chunk 切片(chunks)
  4. 產生向量(vectors,呼叫 OpenAI Embedding API)
  5. 建立向量索引(vector_index → 輸出 JSON 檔案)

最終產物是 data/vector_index.json,可以直接拿來做檢索。此外,這個 demo 也示範了 定時排程(每日 02:00)檔案變更觸發(Sensor),讓知識庫能自動更新。

支援:

  • 每日 02:00 (Asia/Taipei) 自動排程
  • 檔案變更監控(修改 worker_manual.txt 會自動觸發重新產生 index)

專案結構:

day14_dagster_demo/
├── assets/          # 資產定義:raw_text、cleaned_text、chunks、vectors、vector_index
├── data/            # 測試檔案(員工手冊)
├── defs.py          # 資產組裝、Schedule、Sensor
└── README.md

▶️ 啟動開發模式

dagster dev -m defs

執行結果:

❯ conda activate day14_dagster_demo
❯ dagster dev -m defs
2025-09-09 15:24:04 +0800 - dagster - INFO - Loaded environment variables from .env file: OPENAI_API_KEY,QUERY_EMBEDDING_MODEL
2025-09-09 15:24:04 +0800 - dagster - INFO - Using temporary directory /Users/hazel/Documents/github/2025-ironman-llmops-demo/day14_dagster_demo/tmp6oa6slnb for storage. This will be removed when dagster dev exits.
2025-09-09 15:24:04 +0800 - dagster - INFO - To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
2025-09-09 15:24:04 +0800 - dagster - INFO - Launching Dagster services...
2025-09-09 15:24:05 +0800 - dagster-webserver - INFO - Loaded environment variables from .env file: OPENAI_API_KEY,QUERY_EMBEDDING_MODEL
2025-09-09 15:24:05 +0800 - dagster - WARNING - /Users/hazel/Documents/github/2025-ironman-llmops-demo/day14_dagster_demo/defs.py:57: ExperimentalWarning: Parameter `target` of initializer `SensorDefinition.__init__` is experimental. It may break in future versions, even between dot releases. To mute warnings for experimental functionality, invoke warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) or use one of the other methods described at https://docs.python.org/3/library/warnings.html#describing-warning-filters.
  file_change_sensor = SensorDefinition(

2025-09-09 15:24:05 +0800 - dagster.daemon - INFO - Instance is configured with the following daemons: ['AssetDaemon', 'BackfillDaemon', 'SchedulerDaemon', 'SensorDaemon']
2025-09-09 15:24:05 +0800 - dagster - WARNING - /Users/hazel/Documents/github/2025-ironman-llmops-demo/day14_dagster_demo/defs.py:57: ExperimentalWarning: Parameter `target` of initializer `SensorDefinition.__init__` is experimental. It may break in future versions, even between dot releases. To mute warnings for experimental functionality, invoke warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) or use one of the other methods described at https://docs.python.org/3/library/warnings.html#describing-warning-filters.
  file_change_sensor = SensorDefinition(

2025-09-09 15:24:06 +0800 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 3446

打開 UI 入口: http://127.0.0.1:3000 介面長這樣:

https://ithelp.ithome.com.tw/upload/images/20250928/201200696f06VnmGCQ.png

⏱ 手動一次性執行

dagster asset materialize -m defs --select "*"

會自動跑完整流程,並在 data/vector_index.json 產生最新索引。

✅ 開啟每日 02:00 自動排程

方法 A:使用 Dagster Dev(推薦)

dagster dev -m defs

https://ithelp.ithome.com.tw/upload/images/20250928/20120069Vea3f6TUJl.png

  • 找到 daily_2am_taipei → 切換 Running 為 ON:

https://ithelp.ithome.com.tw/upload/images/20250928/20120069bzRZ9MHYtx.png

只要這個 Process 在跑,Dagster Daemon 就會在每天 02:00 (Asia/Taipei) 自動觸發 pipeline。

方法 B:純 CLI

⚠️ 若用 CLI,需要額外啟動 daemon:

README.md 裡面有寫一些前置設定,一定要看才跑得起來 !

dagster-daemon run
# 啟用每日 02:00 schedule
dagster schedule start -m defs daily_2am_taipei
# 確認啟用成功
dagster schedule list -m defs

✅ 開啟檔案變更 Sensor

如果要在 worker_manual.txt 修改後自動重跑:

方法 A:在 Dagster UI 啟用

  • 進入 http://127.0.0.1:3000
  • 點選上面選單的 Deployment -> Sensors
  • 找到 on_worker_manual_change → 切換 Running 為 ON

https://ithelp.ithome.com.tw/upload/images/20250928/20120069YS6HeSzSju.png

方法 B:CLI

dagster sensor start -m defs on_worker_manual_change

同樣需要 dagster-daemon run 來常駐執行。

➡️ 對於需要 資料可追溯性、型別驗證、法規合規 的 LLMOps(金融、醫療、企業級 RAG pipeline),Dagster 的價值非常大,我會用 3 個案例展示 Dagster 的不同之處:

1. Lineage - 透明可追蹤

https://ithelp.ithome.com.tw/upload/images/20250928/20120069lRGc2m93P8.png

在這個分頁可以看到資產依賴圖(Asset Lineage Graph),讓我們一眼看到數據是如何一步步從原始檔案流動到最後的索引檔:

https://ithelp.ithome.com.tw/upload/images/20250928/20120069Xo0L26wjNW.png

2. 文件變動以及版本顯示

如果手動修改了 worker_manual.txt,會顯示 文件有更新

https://ithelp.ithome.com.tw/upload/images/20250928/20120069jBl1T1h5aT.png

https://ithelp.ithome.com.tw/upload/images/20250928/20120069WJwPkPmXWp.png

3. 型別驗證

打開 assets/vectors.py,找到 vectors 資產:

@asset
def vectors(chunks: list[str]) -> list[list[float]]:
    ...

嘗試把回傳型別故意改錯,例如回傳 str

context.log.info(f"嵌入完成:{len(vecs)} 向量,模型={model}")
# return vecs
return "not a vector"

再次 materialize pipeline 會立刻顯示錯誤,指出回傳的型別不符合:

https://ithelp.ithome.com.tw/upload/images/20250928/20120069IIyFYw2cau.png

https://ithelp.ithome.com.tw/upload/images/20250928/20120069ZsGz56U9PB.png

這就是 Dagster 的價值:不只跑流程,還能守住資料品質


🔹 Airflow / Prefect / Dagster 比較表

面向 Airflow Prefect Dagster
核心概念 DAG (Task 依賴) Flow (Pythonic 任務流) Data Asset (資料驅動)
定位 傳統 ETL / 企業級排程 輕量、自動化快 資料工程 / ML pipeline
UI / 監控 任務監控強大 Prefect Cloud → 直觀 Dagit → lineage、型別
型別支持 幾乎沒有 基本 內建完整型別驗證
上手難度 中(熟 Python 即可) 中偏高(需理解 asset 思維)
適合情境 大型企業 Data Platform(有專門維護團隊) Side-project、個人專案,或是小企業想快速自動化 若專案需要資料血緣(lineage)與治理能力,就選 Dagster。
對 LLMOps 的幫助 排程 OK,但缺乏資料追蹤 快速更新知識庫 pipeline,適合中小型專案 最適合追蹤 embedding lineage 與資料版本
實務案例 每晚抽取交易資料 → 清理轉換 → 載入數據倉庫,生成月度報表 每日 02:00 自動更新 FAQ Bot 知識庫 每次文件改動觸發,重新產生向量索引

🔹 小結

Day 14 的重點:讓 LLMOps 做到 Pipeline 自動化,並且融合 DevOps 三大基礎概念:

  • Pipeline 自動化 = LLMOps 的 CI/CD
    • Prefect 讓我們快速實現「知識庫的自動更新」。
    • 不再靠工程師每天手動跑 script,而是交給 Orchestrator 工具。
  • 可觀測性 (Observability)
    • Prefect 提供 UI 監控任務狀態、並提供錯誤偵測與重試功能。
    • Dagster 提供 lineage 與型別驗證,確保資料可追溯。
  • 責任分離 (Responsibility Separation)
    • 工程師設計流程,而 Pipeline 工具執行、排程、監控。
    • 減少人工作業,降低錯誤風險,責任邊界更清晰。

明天(Day 15),我們會深入探討 Prompt Engineering 系統化,從零散的 Prompt 試驗走向可維護的 Prompt 管理。

📚 引用 / 延伸閱讀


上一篇
Day13 - 為什麼知識會「過期」?Data Drift 偵測與更新策略實作
下一篇
Day15 - Prompt Generation:用模板和版本管理 Prompt,規範 LLM 的回應
系列文
30 天帶你實戰 LLMOps:從 RAG 到觀測與部署17
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言