iT邦幫忙

2025 iThome 鐵人賽

DAY 12
0
AI & Data

論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台系列 第 13

Day 12|資料管線魔法初探:讓 AI 系統每天自動抓論文(上)

  • 分享至 

  • xImage
  •  

⚡《AI 知識系統建造日誌》這不是一篇純技術文章,而是一場工程師的魔法冒險。程式是咒語、流程是魔法陣、錯誤訊息則是黑暗詛咒。請準備好你的魔杖(鍵盤),今天,我們要踏入魔法學院的基礎魔法課,打造穩定、可擴展的 AI 知識系統。


前言

上一篇文章,我們建立了完美但 空的基礎設施。PostgreSQL、向量資料庫、Docker 等都已就位,但資料庫裡沒有任何資料。漂亮的舞台,卻沒有任何演員上場。這一刻,對每位 AI 工程師來說都很熟悉:基礎打好了,但資料還沒流進系統。

這篇文章,我們要把這個空舞台變成一個 自動化的研究論文資料管線,讓 AI 知識系統每天自動抓取、處理、存儲資料,真正活起來。


為什麼資料管線至關重要

所有成功 AI 系統背後都有穩健的資料管線:

  • Google 搜尋:每天抓取並處理整個網路
  • Netflix 推薦:實時處理數百萬用戶的觀看數據
  • ChatGPT:處理數 TB 文本,保證資料品質
    沒有可靠的資料,算法再強也無用。 如同

核心挑戰

  1. 空資料庫:基礎設施完美,但資料零。
  2. 資料管線的痛點
    • API 限流(rate limit)
    • PDF 格式複雜,容易解析錯誤
    • 網路錯誤、下載損壞
  3. 資料工程佔比超高:實際上 AI 系統 80~90% 的複雜度來自可靠的資料流。

建立的資料管線架構


arxiv_ingestion/
├── readme.md                  # Project overview and setup instructions
├── deploy_flows.sh             # deploy flow
├── prefect_entrypoint.py       #  flow.serve 註冊 flow 並套用 schedule
├── arxiv_pipeline.py      # Main pipeline for fetching, processing, and storing ArXiv papers
├── config.py                  # Configuration settings (e.g., API URLs, collection names, environment variables)
├── logger.py                  # Centralized logging utilities
├── exceptions.py              # Custom exception classes for error handling
│
├── db/                        # Database and storage layer
│   ├── PaperRepository.py     # Repository for paper-related CRUD operations
│   ├── factory.py             # Database session/factory creation
│   ├── minio.py               # MinIO client setup for storing PDFs
│   ├── models.py              # ORM models for entities (Paper, User, etc.)
│   └── qdrant.py              # Qdrant client setup and utilities for vector DB
│
├── services/                  # External services and processing modules
│   ├── arxiv_client.py        # Client for querying the ArXiv API
│   ├── embedding.py           # Embedding utilities for converting text into vectors
│   ├── docling.py             # PDF parsing utilities (Docling integration)
│   ├── metadata_fetcher.py    # Extracting and normalizing metadata from ArXiv papers
│   ├── pdf_parser.py          # Parsing PDFs to extract raw text or structured sections
│   └── schemas.py             # Pydantic schemas and data models
│
└── tasks/                     # Prefect tasks, modular building blocks for the pipeline
    ├── fetch_papers.py        # Task to fetch papers from ArXiv
    ├── generate_report.py     # Task to generate summary reports
    ├── process_pdfs.py        # Task to parse and extract text from PDFs
    └── qdrant_index.py        # Task to index paper chunks into Qdrant


Layer-by-Layer Breakdown

Pipeline Layer (arxiv_pipeline.py)

  • Orchestrates the full workflow for fetching, processing, and storing ArXiv papers
  • Coordinates Prefect tasks to ensure sequential execution and dependency management
  • Handles integration points between data retrieval, processing, embedding, and storage

Tasks Layer (tasks/)

  • Modular building blocks for the pipeline
  • fetch_papers.py: Fetches newly published papers from ArXiv
  • generate_report.py: Generates summary reports from paper content
  • process_pdfs.py: Parses PDF files to extract text or structured sections
  • qdrant_index.py: Indexes paper chunks into Qdrant vector database for retrieval

Services Layer (services/)

  • Implements core business logic and processing utilities
  • arxiv_client.py: Queries ArXiv API for paper metadata and PDFs
  • embedding.py: Generates embeddings from paper text for semantic search
  • docling.py: PDF parsing utilities and extraction logic
  • metadata_fetcher.py: Extracts and normalizes metadata from papers
  • pdf_parser.py: Converts PDFs into raw text or structured content
  • schemas.py: Pydantic models for validation and type safety

Database & Storage Layer (db/)

  • Handles data persistence and storage management
  • PaperRepository.py: CRUD operations for paper entities
  • factory.py: Database session creation and management
  • minio.py: MinIO client setup for storing PDF files
  • models.py: ORM models representing Papers, Users, and related entities
  • qdrant.py: Qdrant client and utilities for storing embeddings and enabling semantic search

Configuration & Utilities (config.py, logger.py, exceptions.py)

  • config.py: Centralized configuration for API URLs, collection names, and environment variables
  • logger.py: Logging utilities for structured logging across the pipeline
  • exceptions.py: Custom exception classes for centralized error handling

特點

  • 工廠模式:依賴注入,方便替換實作
  • 抽象接口:模組化、可替換
  • 非同步處理:併發操作,效率高
  • 錯誤處理完整:重試、限流、緩存、防止破壞系統

技術重點

1️⃣ arXiv API 限流

  • 必須精確遵守 3 秒間隔

  • 機制:

    • 請求時間戳記
    • 智能延遲
    • 指數退避
    • 緩存 PDF

2️⃣ PDF 科學文件解析

  • 傳統 PDF 解析庫失敗率高(數學公式、表格、雙欄排版)。

  • 選擇 Docling:

    • 保留文件結構
    • 生成結構化元資料
    • OCR & VLM 支援
    • 可擴充到 HTML、DOCX 等格式

細節可參考 Day X|資料才是英雄——Docling 的 PDF 解析秘笈 📄🛡️

3️⃣ Prefect 生產級工作流

  • 每天自動抓取最新論文

  • 四階段流程:

    1. 初始化環境
    2. 抓取當日論文
    3. 處理失敗 PDF
    4. 生成日報
  • 特色:

    • 錯誤隔離
    • 重試邏輯
    • 詳細監控
    • 資源管理,避免過度併發

挑戰 解法 工具
arXiv 限流 智能延遲 + 指數退避 Python + asyncio
PDF 複雜格式 保留結構化資料 Docling
每日自動化 Flow + Schedule Prefect

成果

  • 每日自動抓取 10~100+ 論文
  • 成功率 90%+
  • 完整結構化資料庫:
    • 元資料(title、authors、abstract…)
    • 全文內容
    • 正規化 DB schema
  • 跨平台可靠(macOS、Linux、WSL)

教訓

  1. 資料工程 = AI 工程 80%
  2. 錯誤處理佔一半程式碼
  3. 選擇領域專用工具(Docling)
  4. 設定管理很重要(限流、重試、併發、timeout)
  5. 寫文件也是在幫自己 debug

小結

我們把空的基礎設施變成了一個 生產級資料管線:自動抓取、解析、存儲、監控,為後續的資料搜尋和 AI 知識生成奠定了堅實基礎。模組化架構、異步處理、錯誤處理和可靠性,是打造真正可運行系統的關鍵。



上一篇
Day 11|Infra | 容器魔法:Docker Compose 讓服務共舞 實戰篇
系列文
論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台13
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言