iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
AI & Data

論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台系列 第 19

Day 18|每日知識快遞上線!(上)|鍵盤一敲,AI 幫你自動整理論文精華到信箱

  • 分享至 

  • xImage
  •  

⚡《AI 知識系統建造日誌》這不是一篇純技術文章,而是一場工程師的魔法冒險。程式是咒語、流程是魔法陣、錯誤訊息則是黑暗詛咒。請準備好你的魔杖(鍵盤),今天,我們要踏入魔法學院的基礎魔法課,打造穩定、可擴展的 AI 知識系統。


前言

昨天我們剛讓 RAG API 上線,系統終於能回應問題,算是把魔法陣刻在地板上了。
但問題來了——光有咒語,卻沒人聽見,不就等於在黑暗森林裡自言自語?

所以今天我們要升級,把「知識」包裝成每日快遞,送到每個訂閱者的信箱。這不只是寄封信那麼簡單,而是一次大型魔法實驗:

  • 清晨自動掃描 ArXiv 最新論文,確保你早餐還沒吃完就有新知上桌。
  • 用 LLM 萃取摘要,壓縮冗長的學術咒文成短小精悍的重點。
  • 需要的話,再來個翻譯魔法,讓不同語言的魔法師都能看懂。
  • 用 HTML 把結果排版成「魔法卷軸」——看起來比純文字舒服一百倍。
  • 最後透過 Prefect 定時啟動,把這份知識每天準時送到使用者手上。

今天的任務,就是讓這整套流程能「自己動起來」,而不是靠工程師半夜手動按下傳送鍵。

若你錯過前情,可以先看看這幾篇:
Day 8|Email Pipeline 技術拆解(上) - 打造訂閱系統
Day 9|Email Pipeline 技術拆解(下) - 打造訂閱系統
Day 13|資料管線魔法進階:Prefect 架構與自動化全解析(下)

今天要讓它變成 可以按時執行。核心程式碼 pipeline.py 不會特別說明

回顧 pipeline.py 與 容器設定

容器設定

services:
    email-flow:
      build:
        context: .
        dockerfile: ./services/emailservice/Dockerfile.email
      image: email-flow:latest
      container_name: email-flow
      command: bash /app/deploy_flows.sh
      environment:
        PREFECT_API_URL: http://orion:4200/api
      volumes:
        - ./email:/app
      env_file:
        - .env
      networks:
        - ai-net
    
    orion:
      image: prefecthq/prefect:3-latest
      container_name: prefect-orion
      command: prefect server start --host 0.0.0.0 --port 4200
      ports:
        - "4200:4200" # Web UI + API
     environment:
        PREFECT_API_DATABASE_CONNECTION_URL: sqlite+aiosqlite:///prefect.db
      volumes:
        - ./data/prefect_data:/root/.prefect
      networks:
        - ai-net

Dockerfile.email

# --- 基礎映像 ---
# FROM python:3.11-slim
FROM prefecthq/prefect:3.4.18-python3.10-conda

# --- 工作目錄 ---
WORKDIR /app

# --- 安裝系統依賴 ---
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    git \
    curl \
    && rm -rf /var/lib/apt/lists/*

# --- 複製依賴檔案 ---
COPY services/emailservice/requirements.txt ./requirements.txt

# --- 安裝 Python 套件 ---
RUN pip install --no-cache-dir --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt

# --- 複製程式碼 ---
COPY ./email /app


# 預設 command 可以空著,由 docker-compose override
CMD ["bash"]

pipeline


# ----------------------
# Flow
# ----------------------

@flow(name="Daily Subscribe Flow")
def daily_papers_flow(top_k: int = 3):
    # 初始化 Firebase
    cred = firebase_admin.credentials.Certificate(
        f"{FIREBASE_KEY_PATH}/serviceAccountKey.json"
    )
    firebase_admin.initialize_app(cred)

    logger = get_run_logger()

    start_flow = time.time()
    logger.info("Daily Subscribe Flow started")

    # Fetch all subscribed user emails, user_id, translate, and user_language
    users = get_users_task()
    papers = fetch_papers_task()
    if not papers:
        logger.info("No new papers found, skipping flow")
        return

    content_map = fetch_paper_content_task(papers)

    for user in users:
        user_unsent_papers = filter_already_sent_papers(
            user["user_id"], papers
        )  # 返回 dict list
        logger.info(f"user {user['user_id']}")
        logger.info(f"user_unsent_papers   {len(user_unsent_papers)}")
        if not user_unsent_papers:
            logger.info(f"user {user['user_id']} - All papers have been sent, skipping")
            continue

        # 打亂順序
        random.shuffle(user_unsent_papers)

        # 取 top_k
        assigned_papers = user_unsent_papers[:top_k]
        logger.info(f"got top_k {top_k} paper, sorest paper {len(assigned_papers)}")
        # 發送給使用者
        result = process_user_task(user, assigned_papers, content_map)
        logger.info(result)

    logger.info(f"Daily Subscribe Flow completed in {time.time() - start_flow:.2f}s")



Prefect + Flow + Task + Orion + Developer 的概念回顧

執行流程概覽(文字流程圖)

[Developer] ──> 編寫 Flow / Task
      │
      ▼
[Flow] ──> 定義 Task 執行順序與依賴
      │
      ▼
[Task] ──> 執行實際邏輯(抓資料 / 解析 / 索引)
      │
      ▼
[Orion] ──> 記錄每次 Flow Run 與 Task 狀態、日誌、結果

  • Developer:設計者,掌控流程與參數。
  • Flow:流程編排,串接多個 Task。
  • Task:單一執行單元,專注完成任務。
  • Orion:監控與排程管理,提供可視化追蹤。
  • 實戰操作:通常透過 deploy_flows.sh 等待 Orion 就緒 → 建立 Queue → 註冊 Flow → 自動或手動執行。

Flow / Task/ Orion 介紹過了,來說說 deploy_flows.sh 、 建立 Queue 、 註冊 Flow

prefect_entrypoint.py

負責 註冊 Flow 並設定 排程。

from pipeline import daily_papers_flow
from prefect.schedules import Cron

# 建立 Interval schedule
# 每天 06:00 台北時間 (等於 UTC 22:00)
interval_schedule = Cron("0 22 * * *", timezone="UTC")


daily_papers_flow.serve(
    schedule=interval_schedule,
    parameters={"top_k": 3},
)

deploy_flows.sh

自動化部署腳本,流程如下:

  1. 等待 Orion 啟動 → 確保 Prefect API 可用。
  2. 印出 Prefect 版本 → 確認環境。
  3. 建立 Work Pool 與 Queue → 指定執行資源與任務隊列。
  4. 部署 Flow → 呼叫 prefect_entrypoint.py 註冊 Flow。
#!/bin/bash
set -e

# 等待 Orion 啟動
echo "Waiting for Orion API..."
until curl -s http://orion:4200/api/health | grep "true"; do
    echo -n "."
    sleep 2
done
echo "Orion is ready ✅"

# 印 Prefect 版本
prefect version

# 建立 Work Pool / Queue
prefect work-pool create default --type process || true
prefect work-queue create email-queue --pool default || true

# 部署 flow
echo "Deploying flows..."
python /app/prefect_entrypoint.py

結果

https://ithelp.ithome.com.tw/upload/images/20250919/20136781zNGWANZAkk.png

https://ithelp.ithome.com.tw/upload/images/20250919/201367814hzS0jovGo.png

架構


├── deploy_flows.sh                    # deploy flow
├── prefect_entrypoint.py             #  flow.serve 註冊 flow 並套用 schedule
├── config.py                          # Configuration settings (API keys, DB connections, environment variables)
├── pipeline.py                        # Main pipeline orchestrating the process of fetching, summarizing, and emailing papers
├── logger.py                          # Centralized logging utilities
├── serviceAccountKey.json             # Firebase service account credentials for authentication
│
├── services/                          # Core services for fetching, processing, and delivering papers
│   ├── embedding.py                   # Functions for generating embeddings from paper text
│   ├── fetch_new_papers.py            # Fetch newly published papers from source (e.g., ArXiv API)
│   ├── fetch_paper_content.py         # Retrieve and parse the full content of papers
│   ├── filter_already_sent_papers.py  # Filter out papers that have already been sent to users
│   ├── generate_summary.py            # Generate summaries of papers using LLM
│   ├── get_subscribed_users.py        # Fetch the list of subscribed users from Firebase/DB
│   ├── get_user_email_from_firebase.py# Retrieve user email addresses from Firebase
│   ├── langchain_client.py            # Wrapper for interacting with LangChain (LLM interface)
│   ├── prompt_template.txt            # Prompt template used for LLM summarization
│   ├── record_sent_papers.py          # Log and persist which papers have been sent
│   ├── send_email.py                  # Utility to send emails with paper summaries to users
│   └── template.html                  # HTML template for summary emails
│
└── storage/                           # Storage and persistence layer
    ├── __init__.py                    # Package initialization
    ├── minio.py                       # MinIO client for storing PDFs or assets
    ├── model.py                       # ORM models or schema definitions for stored entities
    ├── qdrant_client.py               # Qdrant client for managing vector storage (embeddings, similarity search)
    ├── storage_metrics.py             # Metrics/logging for storage operations
    └── wait_minio.py                  # Utility to wait for MinIO service readiness before startup
    

Layer-by-Layer Breakdown

Pipeline Layer (pipeline.py)

  • Orchestrates the full workflow: fetching papers → summarizing → sending emails
  • Defines task sequences and background job scheduling via Prefect + Orion (prefect_entrypoint.py)
  • Handles alerting and scheduled dispatch of paper summaries

Services Layer (services/)

  • Core business logic for paper retrieval, processing, summarization, and delivery
  • Integration with external APIs: ArXiv for new papers, LangChain/LLM for summaries, Firebase for user info
  • Handles filtering of already sent papers and ensures deduplication
  • Embedding generation and vector similarity utilities (embedding.py)
  • Email generation and formatting, including HTML templates (template.html)
  • Error handling, retry mechanisms, and logging

Storage Layer (storage/)

  • Persistence and storage management for PDFs, embeddings, and metadata
  • MinIO client for asset storage (minio.py)
  • Qdrant client for embedding storage and similarity search (qdrant_client.py)
  • ORM models or schema definitions for persisted entities (model.py)
  • Metrics collection and monitoring (storage_metrics.py)
  • Utilities to ensure service readiness (e.g., wait_minio.py)

Configuration & Utilities (config.py, logger.py, serviceAccountKey.json)

  • Centralized configuration for API keys, DB connections, and environment variables
  • Logging utilities for structured and centralized logs
  • Firebase credentials for user management and authentication

小結

到這裡,我們已經完成一個能 自動運轉的每日知識快遞服務:

  • 早晨抓取 → AI 摘要 → 翻譯(可選)→ HTML 格式化 → 自動寄送
  • 三層架構清晰:Pipeline → Services → Storage
  • Prefect + Orion 撐起背後的排程與監控,工程師只需設好咒語,系統就會按時施法

上一篇
Day 17 | RAG 的全流程(下):用 Gradio 即時串流 RAG 問答
下一篇
Day 19|每日知識快遞 (下)|再也不用怕 Token 爆掉!工程師的分階段榨乾 LLM 心法
系列文
論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言