Hi大家好,
這是我參加 iT 邦幫忙鐵人賽的第 1 次挑戰,這次的主題聚焦在結合 Python 爬蟲、RAG(檢索增強生成)與 AI,打造一套 PTT 文章智慧問答系統。在過程中,我會依照每天進度上傳程式碼到 GitHub ,方便大家參考學習。也歡迎留言或來信討論,我的信箱是 gerryearth@gmail.com。
昨天我們完成了將 PTT 文章段落向量化並手動存入 Pinecone 的練習,今天要把這個流程自動化!
我們將透過 Celery 將「資料向量化」與「存入 Pinecone」的步驟整合起來,並且和爬蟲任務 ptt_scrape 串接,讓資料一進入資料庫就能自動完成向量化與同步儲存。這樣不僅能確保資料處理即時,還能為後續的智慧問答功能打下堅實的基礎。
chain
串接 Celery 任務昨天我們練習了如何將 PTT 文章的段落向量化並儲存到 Pinecone。今天,我們要實作這個流程。
我們的目標是:當資料儲存到 MariaDB 時,系統也會自動將該資料進行向量化,並同步存入 Pinecone。為了實現這個流程,我們會使用 Celery 自動排程這些任務。
首先請在 celery_app
資料夾中建立一個名為 data_processing.py
的檔案,用來處理資料向量化的相關任務。
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pinecone import Pinecone
from env_settings import EnvSettings
from langchain_core.documents import Document
from langchain_pinecone import PineconeVectorStore
from pydantic import SecretStr
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from ptt_rag_dev.celery import app
from article.models import Article
env_settings = EnvSettings()
@app.task()
def store_data_in_pinecone(article_id_list: list):
vector_store = PineconeVectorStore(
index=Pinecone(
api_key=env_settings.PINECONE_API_KEY
).Index(env_settings.PINECONE_INDEX_NAME),
embedding=GoogleGenerativeAIEmbeddings(model=env_settings.GOOGLE_EMBEDDINGS_MODEL, google_api_key=SecretStr(env_settings.GOOGLE_API_KEY)),
)
documents = []
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=300,
chunk_overlap=50,
)
for article in Article.objects.filter(id__in=article_id_list).all():
chunks = text_splitter.split_text(article.content)
for i, chunk in enumerate(chunks):
documents.append(Document(
page_content=chunk,
metadata={
"article_id": article.id,
"board": article.board,
"title": article.title,
"author": article.author,
"post_time": str(article.post_time),
"url": article.url,
"chunk_index": i
}
))
vector_store.add_documents(documents=documents)
這個任務是為了跟之前我們做過的爬蟲任務 ptt_scrape
連接起來,也就是我們會用 chain
串接兩個任務,在這裡 ptt_scrape
輸出的 article_id_list
就會變成 store_data_in_pinecone
的輸入。
store_data_in_pinecone
大致上可以分成三個步驟:
vector_store
)text_splitter
)可以參考
PineconeVectorStore
的官方網站 與RecursiveCharacterTextSplitter
的官方網站,chunk_size
跟chunk_overlap
是切割的長度設置,後面幾天會說明選擇此長度的理由。
在 env_settings.py
增加:
PINECONE_API_KEY: str = None
PINECONE_INDEX_NAME: str = None
GOOGLE_EMBEDDINGS_MODEL: str = None
並在 .env
新增:
PINECONE_API_KEY="..."
PINECONE_INDEX_NAME="rag"
GOOGLE_EMBEDDINGS_MODEL="models/gemini-embedding-001"
chain
串接任務接下來在 ptt_rag_dev/celery.py
加入:
app.conf.imports = [
...
"celery_app.data_processing",
]
我們目前只有排程 ptt_scrape
的任務,因此把在 period_send_ptt_scrape_task
改成:
@app.task()
def period_send_ptt_scrape_task():
board_list = ['Gossiping', 'NBA', 'Stock', 'LoL', 'home-sale']
for board in board_list:
chain(ptt_scrape.s(board), store_data_in_pinecone.s())()
這樣寫就可以用 chain
達到我們串接任務的需求,也要注意把 ptt_scrape
用 @app.task()
設成任務。
這裡要引入套件:
from celery import chain from celery_app.data_processing import store_data_in_pinecone
簡單測試後 Pinecone 資料的格式如下:
如果到這裡都有跟上腳步的話代表你已經完成專案超過一半的進度了!
可以把之前測試的資料清空整理,重新正式啟動服務,讓服務更乾淨順暢。
最好能跑三天都沒問題,我們就可以用這三天的資料來做智慧問答。
實際操作後很可能會有配額問題,我建議在這個地方加 分批處理 + 重試機制,避免一次送太多,也能在撞到 quota 限制時自動延遲重試。
下面是改良版的 data_processing.py
:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pinecone import Pinecone
from env_settings import EnvSettings
from langchain_core.documents import Document
from langchain_pinecone import PineconeVectorStore
from pydantic import SecretStr
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from ptt_rag_dev.celery import app
from article.models import Article
import time
import random
from math import ceil
env_settings = EnvSettings()
BATCH_SIZE = 50 # 每批處理 50 筆,避免一次塞爆 API
MAX_RETRIES = 5
BASE_DELAY = 2 # 秒
def retry_with_backoff(func, *args, **kwargs):
for attempt in range(MAX_RETRIES):
try:
return func(*args, **kwargs)
except Exception as e:
if "ResourceExhausted" in str(e) or "429" in str(e):
delay = BASE_DELAY * (2 ** attempt) + random.uniform(0, 1)
print(f"[Quota hit] Retry {attempt+1}/{MAX_RETRIES} after {delay:.2f}s...")
time.sleep(delay)
else:
raise
raise RuntimeError("Max retries reached for embedding request")
@app.task()
def store_data_in_pinecone(article_id_list: list):
vector_store = PineconeVectorStore(
index=Pinecone(api_key=env_settings.PINECONE_API_KEY)
.Index(env_settings.PINECONE_INDEX_NAME),
embedding=GoogleGenerativeAIEmbeddings(
model=env_settings.GOOGLE_EMBEDDINGS_MODEL,
google_api_key=SecretStr(env_settings.GOOGLE_API_KEY),
),
)
documents = []
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=300,
chunk_overlap=50,
)
for article in Article.objects.filter(id__in=article_id_list).all():
chunks = text_splitter.split_text(article.content)
for i, chunk in enumerate(chunks):
documents.append(
Document(
page_content=chunk,
metadata={
"article_id": article.id,
"board": article.board,
"title": article.title,
"author": article.author,
"post_time": str(article.post_time),
"url": article.url,
"chunk_index": i,
},
)
)
# 分批送入,避免一次塞太多
total_batches = ceil(len(documents) / BATCH_SIZE)
for i in range(total_batches):
batch_docs = documents[i * BATCH_SIZE : (i + 1) * BATCH_SIZE]
retry_with_backoff(vector_store.add_documents, documents=batch_docs)
print(f"[Batch {i+1}/{total_batches}] Uploaded {len(batch_docs)} docs")
明天【Day 19】PTT 文章智慧問答系統 - 整合 Gemini LLM 回答語意查詢問題
我們已經將向量資料存入完畢,接下來就是要取得與使用者的提問相近的資料,為後續的智慧問答功能做好準備!