iT邦幫忙

2025 iThome 鐵人賽

DAY 13
0
AI & Data

論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台系列 第 14

Day 13|資料管線魔法進階:Prefect 架構與自動化全解析(下)

  • 分享至 

  • xImage
  •  

⚡《AI 知識系統建造日誌》這不是一篇純技術文章,而是一場工程師的魔法冒險。程式是咒語、流程是魔法陣、錯誤訊息則是黑暗詛咒。請準備好你的魔杖(鍵盤),今天,我們要踏入魔法學院的基礎魔法課,打造穩定、可擴展的 AI 知識系統。


前言

延續昨天的資料管線架構介紹,若你錯過前情,可以先看看這四篇:
Day 12|資料管線魔法初探:讓 AI 系統每天自動抓論文(上)
Day 3 | 攻略第一個據點 — Arxiv Pipeline 技術拆解(上):Metadata 抓取與 PDF 處理
Day X|資料才是英雄——Docling 的 PDF 解析秘笈 📄🛡️
Day 4 | 征服第二個據點 — Arxiv Pipeline 技術拆解(下):PDF 向量化與 Qdrant 上傳

Prefect 架構說明

環境設定 infra

  • Orion Container
orion:
    image: prefecthq/prefect:3-latest
    container_name: prefect-orion
    command: prefect server start --host 0.0.0.0 --port 4200
    ports:
      - "4200:4200" # Web UI + API
    environment:
      PREFECT_API_DATABASE_CONNECTION_URL: sqlite+aiosqlite:///prefect.db
    volumes:
      - ./data/prefect_data:/root/.prefect
    networks:
      - langfuse-otel-net
  • 功能:

    1. 提供 Web UI + APIhttp://localhost:4200
    2. 讓程式或其他容器可以透過 PREFECT_API_URL 與 Orion 通訊
  • arxiv-flow Container

arxiv-flow:
    build:
      context: .
      dockerfile: ./services/arxivservice/Dockerfile.arxiv
    image: arxiv-flow:latest
    container_name: arxiv-flow
    command: bash /app/deploy_flows.sh
    environment:
      PREFECT_API_URL: http://orion:4200/api
    volumes:
      - ./arxiv:/app
      - ./docker_cache/hf_cache:/root/.cache/huggingface # avoid space out, you don't need to add this.
      - ./data/arxiv_worker:/data
    depends_on:
      - orion
    env_file:
      - .env # <- 指定要讀的環境變數檔
    networks:
      - langfuse-otel-net

Dockerfile.arxiv

FROM prefecthq/prefect:3.4.18-python3.10-conda

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    git \
    curl \
    && rm -rf /var/lib/apt/lists/*

COPY services/arxivservice/requirements.txt ./requirements.txt

# --- 安裝 Python 套件 ---
RUN pip install --no-cache-dir --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt

# --- 複製程式碼 ---
COPY ./arxiv /app

# 預設 command 可以空著,由 docker-compose override
CMD ["bash"]


Prefect 執行架構與資料流。

Prefect 執行架構概覽

[Developer]  ──編寫 Flow / Task
[Flow]       ──定義 Task 執行順序
[Task]       ──實際執行邏輯
[Orion]      ──接收 Flow Run / Task 日誌
   │
   └─(deploy_flows.sh 自動部署)
       ├─ 等待 Orion 就緒
       ├─ 建立 Work Pool / Queue
       └─ 註冊 Flow → 可排程自動執行

1️⃣ Developer(開發者)

  • 角色:設計者與操控者

  • 職責

    • 編寫 FlowTask
    • 定義流程邏輯、資料依賴、錯誤處理、重試策略
    • 決定 Flow 的參數、排程方式(Cron / Interval / Manual)
  • 工具

    • Python、IDE
    • Prefect CLI / SDK

可以把 Developer 想像成魔法師,手持魔杖(鍵盤),設計魔法陣(Flow)與咒語(Task)。

from datetime import timedelta, datetime
from prefect.schedules import Cron
from arxiv_pipeline import arxiv_pipeline


# 動態計算日期
today = datetime.utcnow()
date_to = today.strftime("%Y%m%d")
date_from = (today - timedelta(days=30)).strftime("%Y%m%d")  # 過去 30 天

# 建立 Interval schedule
# 每天 14:01 台北時間 (等於 UTC 06:01)
interval_schedule = Cron("1 6 * * *", timezone="UTC")

# 使用 flow.serve 註冊 flow 並套用 schedule
arxiv_pipeline.serve(
    schedule=interval_schedule,
    parameters={"date_from": date_from, "date_to": date_to, "max_results": 20},
)

2️⃣ Flow(流程)

  • 角色:流程編排者 / 魔法陣

  • 概念

    • Flow 是 一組 Task 的有序或依賴關係集合
    • 管理 Task 執行順序
    • 可接受參數(例如抓取日期範圍 date_fromdate_to
    • 可以設定排程、重試、超時等屬性
  • 特性

    • 流程級監控:每次 Flow Run 都有唯一 ID,便於追蹤
    • 可重用與參數化
  • 範例

    @flow(name="arxiv-pipeline-flow")
    def arxiv_pipeline(date_from, date_to, max_results=10):
        papers = fetch_papers_task(date_from, date_to, max_results)
        pdf_results = process_pdfs_task(papers)
        qdrant_index_task(papers, pdf_results.get("parsed_papers", {}))
        generate_report_task({"papers_fetched": len(papers)})
    

Flow 就像完整的魔法陣,把單一咒語(Task)串起來施法。


3️⃣ Task(任務)

  • 角色:最小單位 / 魔法咒語

  • 概念

    • Task 是 可執行的單位,專注於一件事
    • 可以被 Flow 調用,也可以單獨測試
  • 特性

    • 支援 重試timeout資源限制
    • 可返回結果,供其他 Task 使用
  • 範例

    from prefect import task
    
    @task(retries=3, retry_delay_seconds=10)
    def fetch_papers_task(date_from, date_to, max_results):
        # 呼叫 arXiv API,抓取論文資料
        ...
        return papers
    

Task 就像一個咒語,專門完成單一任務。技術上,Task 是可執行的最小單位


4️⃣ Orion(Prefect UI + API / 監控者)

  • 角色:監控者 / 魔法觀測塔

  • 概念

    • Orion 提供 Web UI + API
    • 可以查看 Flow Run 與 Task 的狀態、日誌、結果
    • 支援手動觸發或透過 Deployment 自動排程
  • 功能

    • Flow Run 管理:追蹤每次執行
    • Task 日誌:查看執行結果、錯誤訊息
    • 部署與排程:設定 Cron / Interval
  • 可視化

    Flow Run: arxiv_pipeline - 2025-09-18 (魔法陣)
        ├─ Task: fetch_papers_task ✅ (咒語1)
        ├─ Task: process_pdfs_task ✅ (咒語2)
        ├─ Task: qdrant_index_task ✅ (咒語3)
        └─ Task: generate_report_task ✅ (咒語4)
    

那實務上,我們通常會準備一個腳本來等待 Orion 啟動,並完成 Flow 的部署:

  • deploy_flows.sh
#!/bin/bash
set -e

# 等待 Orion 啟動
echo "Waiting for Orion API..."
until curl -s http://orion:4200/api/health | grep "true"; do
    echo -n "."
    sleep 2
done
echo "Orion is ready ✅"

# 印 Prefect 版本
prefect version

# 建立 Work Pool / Queue(容錯)
prefect work-pool create default --type process || true
prefect work-queue create ingest-queue --pool default || true

# 部署 flow
echo "Deploying flows..."
python /app/prefect_entrypoint.py

Orion 就像魔法學院的觀測塔,你可以即時監控魔法施法過程,查看每個 Task 是否成功、是否有錯誤、執行時間等。

https://ithelp.ithome.com.tw/upload/images/20250918/20136781dQQIIgtmnC.png

整體執行流程(文字流程圖)

[Developer]
      │ 寫 Flow / Task
      ▼
[Flow] ----------------→ 定義 Task 執行順序
      │
      ▼
[Task] ----------------→ 實際執行邏輯(抓資料、解析 PDF、索引)
      │
      ▼
[Orion] ----------------→ 記錄每次 Flow Run 與 Task 執行狀態、日誌、結果

4️⃣ 小結

  • 你已經有 Prefect 核心 UI + API(Orion Container)
  • Flow 與 Task 可以直接執行並被 Orion 記錄
    1. 開發 Flow / Task
    2. 手動呼叫 Flow
    3. 在 Orion UI 查看每次 Flow Run 的狀態與日誌

補充 - 實戰中的完整Pipeline程式碼細節

程式碼

以下是實戰中的完整版本,包含紀錄處理結果與錯誤統計。

import asyncio
from datetime import datetime, timedelta
from prefect import flow

from logger import AppLogger
from tasks.fetch_papers import fetch_papers_task
from tasks.generate_report import generate_report_task
from tasks.process_pdfs import process_pdfs_task
from tasks.qdrant_index import qdrant_index_task

logger = AppLogger(__name__).get_logger()


@flow(name="arxiv-pipeline-flow")
def arxiv_pipeline(
    date_from: str, date_to: str, max_results: int = 10, store_to_db: bool = True
):
    results = {
        "papers_fetched": 0,
        "pdfs_downloaded": 0,
        "pdfs_parsed": 0,
        "papers_stored": 0,
        "papers_indexed": 0,
        "errors": [],
        "processing_time": 0,
    }
    logger.info("results")
    start_time = datetime.now()

    # Step 1: Fetch paper metadata from arXiv
    papers = asyncio.run(fetch_papers_task(date_from, date_to, max_results))

    logger.info("fetch_papers_task")
    results["papers_fetched"] = len(papers)

    # Step 2: Process PDFs if requested
    pdf_results = {}
    if papers:
        pdf_results = asyncio.run(process_pdfs_task(papers, store_to_db=True))
        results["pdfs_downloaded"] = pdf_results["downloaded"]
        results["pdfs_parsed"] = pdf_results["parsed"]
        results["errors"].extend(pdf_results["errors"])
        results["papers_stored"] = pdf_results["papers_stored"]
        logger.info(f"Stored {pdf_results['papers_stored']} papers in DB")

    # Step 3: Qdrant Index
    indexed_count, _ = qdrant_index_task(papers, pdf_results.get("parsed_papers", {}))
    results["papers_indexed"] = indexed_count
    logger.info(f"Qdrant Index {indexed_count}")

    # Calculate total processing time
    processing_time = (datetime.now() - start_time).total_seconds()
    results["processing_time"] = processing_time

    result_summary = {
        "papers_fetched": len(papers),
        "pdfs_downloaded": results.get("pdfs_downloaded", 0),
        "pdfs_parsed": results.get("pdfs_parsed", 0),
        "papers_indexed": results.get("papers_indexed", 0),
        "papers_stored": results.get("papers_stored", 0),
        "errors": pdf_results.get("errors", []),
    }

    # 呼叫日報告 task
    report = generate_report_task(result_summary)
    logger.info(f"\n{report}")



上一篇
Day 12|資料管線魔法初探:讓 AI 系統每天自動抓論文(上)
下一篇
Day14|RAG 魔法課 (上):Hybrid Search 與 Re-ranking 完整實戰
系列文
論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台16
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言