今天主要介紹 arxiv pipeline 前半段的兩個task fetch_papers_task
, process_pdfs_task
arxiv_pipeline
├─ fetch_papers_task → 抓取 metadata
├─ process_pdfs_task
│ ├─ process_pdfs_batch → 下載與解析 PDF
│ └─ store_to_db → 存入資料庫
├─ qdrant_index_task → 向量化並上傳 Qdrant
└─ generate_report_task → 生成日報告
def arxiv_pipeline(
date_from: str, date_to: str, max_results: int = 10, store_to_db: bool = True
):
results = {
"papers_fetched": 0,
"pdfs_downloaded": 0,
"pdfs_parsed": 0,
"papers_stored": 0,
"papers_indexed": 0,
"errors": [],
"processing_time": 0,
}
logger.info("results")
start_time = datetime.now()
# Step 1: Fetch paper metadata from arXiv
papers = asyncio.run(fetch_papers_task(date_from, date_to, max_results))
results["papers_fetched"] = len(papers)
# Step 2: Process PDFs if requested
pdf_results = {}
if papers:
pdf_results = asyncio.run(process_pdfs_task(papers, store_to_db=True))
results["pdfs_downloaded"] = pdf_results["downloaded"]
results["pdfs_parsed"] = pdf_results["parsed"]
results["errors"].extend(pdf_results["errors"])
results["papers_stored"] = pdf_results["papers_stored"]
print(f"Stored {pdf_results['papers_stored']} papers in DB")
# Step 3: Qdrant Index
indexed_count, _ = qdrant_index_task(papers, pdf_results.get("parsed_papers", {}))
results["papers_indexed"] = indexed_count
result_summary = {
"papers_fetched": len(papers),
"pdfs_downloaded": pdf_results.get("downloaded", 0),
"pdfs_parsed": pdf_results.get("parsed", 0),
"papers_indexed": indexed_count,
"papers_stored": pdf_results["papers_stored"],
"errors": pdf_results.get("errors", []),
}
# 呼叫日報告 task
report = generate_report_task(result_summary)
fetch_papers_task
: 抓取 metadata (fetch_papers_task)process_pdfs_task
: 下載、解析 PDF將抓取 metadata 與 PDF 處理拆開,有以下好處:
fetch_papers_task
:只抓論文資訊,流程單純、易維護。process_pdfs_task
:單獨負責下載、快取、解析 PDF,方便重試與並行處理。在此之前 先來看看 arXiv API 的使用限制、抓取方式及 PDF 處理建議。
提供程式化存取 arXiv 電子印刷品(e-print)資料
功能:
特性:
http://export.arxiv.org/api/query?search_query=all:electron&start=0&max_results=10
參數:
search_query
→ 搜尋關鍵字start
→ 從第幾筆開始max_results
→ 回傳筆數上限回傳資料:每篇論文包含:
<title>
<summary>
<author>
<link>
<arxiv:primary_category>
抓 metadata(標題、摘要、作者) ✅
抓 PDF ⚠️
程式語言支援:
urllib
/ httpx
/ feedparser
XML::Atom
feedtools
SimplePie
arxiv_pipeline
├─ fetch_papers_task → 抓取 metadata
├─ process_pdfs_task
│ ├─ process_pdfs_batch → 下載與解析 PDF
│ └─ store_to_db → 存入資料庫
├─ qdrant_index_task → 向量化並上傳 Qdrant
└─ generate_report_task → 生成日報告
fetch_papers_task
抓取 metadata 就像派出探險船,先找到海上的小島位置。我使用 非同步 HTTP 請求,可以同時抓取多筆資料,提高效率。基本流程如下:
async def fetch_papers_task(
date_from: str, date_to: str, max_results: int = 5
) -> List[ArxivPaper]:
client = get_cached_services()
papers = await client.fetch_papers(
from_date=date_from, to_date=date_to, max_results=max_results
)
print(f"Fetched {len(papers)} papers from {date_from} to {date_to}")
return papers
⚡ 內部 fetch_papers 會處理:
- Rate limit 控制
- API URL 組裝與參數處理
- XML 解析、封裝成 ArxivPaper 物件
process_pdfs_task
抓到 metadata 只是第一步,PDF 才是真正的寶藏。這個模組負責:
async def process_pdfs_task(papers: PDFParserService, store_to_db: bool):
pdf_parser = PDFParserService(max_pages=20, max_file_size_mb=10)
metadata_fetcher = MetadataFetcher(client, pdf_parser)
pdf_results = await metadata_fetcher.process_pdfs_batch(papers)
print(f"Processed PDFs: {pdf_results.get('parsed', 0)} parsed ")
if store_to_db:
stored_count = metadata_fetcher.store_to_db(
papers, pdf_results.get("parsed_papers", {})
)
return pdf_results
⚡ 注意:
- process_pdfs_batch 可重試與並行下載
- 解析後的文字可存入資料庫或用於後續向量化