iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
生成式 AI

一起來打造 PTT 文章智慧問答系統!系列 第 18

【Day18】Pinecone 整合實戰 - 從資料向量化到自動任務串接

  • 分享至 

  • xImage
  •  

Hi大家好,
這是我參加 iT 邦幫忙鐵人賽的第 1 次挑戰,這次的主題聚焦在結合 Python 爬蟲、RAG(檢索增強生成)與 AI,打造一套 PTT 文章智慧問答系統。在過程中,我會依照每天進度上傳程式碼到 GitHub ,方便大家參考學習。也歡迎留言或來信討論,我的信箱是 gerryearth@gmail.com


昨天我們完成了將 PTT 文章段落向量化並手動存入 Pinecone 的練習,今天要把這個流程自動化!

我們將透過 Celery 將「資料向量化」與「存入 Pinecone」的步驟整合起來,並且和爬蟲任務 ptt_scrape 串接,讓資料一進入資料庫就能自動完成向量化與同步儲存。這樣不僅能確保資料處理即時,還能為後續的智慧問答功能打下堅實的基礎。


今日目標

  • 實作資料向量化,並存入 Pinecone
  • chain 串接 Celery 任務
  • 完成排程所有任務

資料向量化

昨天我們練習了如何將 PTT 文章的段落向量化並儲存到 Pinecone。今天,我們要實作這個流程。

我們的目標是:當資料儲存到 MariaDB 時,系統也會自動將該資料進行向量化,並同步存入 Pinecone。為了實現這個流程,我們會使用 Celery 自動排程這些任務。

首先請在 celery_app 資料夾中建立一個名為 data_processing.py 的檔案,用來處理資料向量化的相關任務。

from langchain.text_splitter import RecursiveCharacterTextSplitter
from pinecone import Pinecone
from env_settings import EnvSettings
from langchain_core.documents import Document
from langchain_pinecone import PineconeVectorStore
from pydantic import SecretStr
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from ptt_rag_dev.celery import app
from article.models import Article

env_settings = EnvSettings()


@app.task()
def store_data_in_pinecone(article_id_list: list):
    vector_store = PineconeVectorStore(
        index=Pinecone(
            api_key=env_settings.PINECONE_API_KEY
        ).Index(env_settings.PINECONE_INDEX_NAME),
        embedding=GoogleGenerativeAIEmbeddings(model=env_settings.GOOGLE_EMBEDDINGS_MODEL,                                        google_api_key=SecretStr(env_settings.GOOGLE_API_KEY)),
    )
    documents = []
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=300,
        chunk_overlap=50,
    )
    for article in Article.objects.filter(id__in=article_id_list).all():
        chunks = text_splitter.split_text(article.content)
        for i, chunk in enumerate(chunks):
            documents.append(Document(
                page_content=chunk,
                metadata={
                    "article_id": article.id,
                    "board": article.board,
                    "title": article.title,
                    "author": article.author,
                    "post_time": str(article.post_time),
                    "url": article.url,
                    "chunk_index": i
                }
            ))
    vector_store.add_documents(documents=documents)

這個任務是為了跟之前我們做過的爬蟲任務 ptt_scrape 連接起來,也就是我們會用 chain 串接兩個任務,在這裡 ptt_scrape 輸出的 article_id_list 就會變成 store_data_in_pinecone 的輸入。

store_data_in_pinecone 大致上可以分成三個步驟:

  1. 建立放向量的設定與位置(vector_store)
  2. 建立文字分割器(text_splitter)
  3. 透過 id 將文章取出、切割並放入向量

可以參考 PineconeVectorStore官方網站RecursiveCharacterTextSplitter官方網站chunk_sizechunk_overlap 是切割的長度設置,後面幾天會說明選擇此長度的理由。

保存環境變數

env_settings.py 增加:

    PINECONE_API_KEY: str = None
    PINECONE_INDEX_NAME: str = None
    GOOGLE_EMBEDDINGS_MODEL: str = None

並在 .env 新增:

PINECONE_API_KEY="..."
PINECONE_INDEX_NAME="rag"
GOOGLE_EMBEDDINGS_MODEL="models/gemini-embedding-001"

chain 串接任務

接下來在 ptt_rag_dev/celery.py 加入:

app.conf.imports = [
    ...
    "celery_app.data_processing",
]

我們目前只有排程 ptt_scrape 的任務,因此把在 period_send_ptt_scrape_task 改成:

@app.task()
def period_send_ptt_scrape_task():
    board_list = ['Gossiping', 'NBA', 'Stock', 'LoL', 'home-sale']
    for board in board_list:
        chain(ptt_scrape.s(board), store_data_in_pinecone.s())()

這樣寫就可以用 chain 達到我們串接任務的需求,也要注意把 ptt_scrape@app.task() 設成任務。

這裡要引入套件:

from celery import chain
from celery_app.data_processing import store_data_in_pinecone

測試結果

簡單測試後 Pinecone 資料的格式如下:
https://ithelp.ithome.com.tw/upload/images/20250704/20172834iNKys8K2M1.png
如果到這裡都有跟上腳步的話代表你已經完成專案超過一半的進度了!

可以把之前測試的資料清空整理,重新正式啟動服務,讓服務更乾淨順暢。
最好能跑三天都沒問題,我們就可以用這三天的資料來做智慧問答。

分批處理

實際操作後很可能會有配額問題,我建議在這個地方加 分批處理 + 重試機制,避免一次送太多,也能在撞到 quota 限制時自動延遲重試。
下面是改良版的 data_processing.py

from langchain.text_splitter import RecursiveCharacterTextSplitter
from pinecone import Pinecone
from env_settings import EnvSettings
from langchain_core.documents import Document
from langchain_pinecone import PineconeVectorStore
from pydantic import SecretStr
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from ptt_rag_dev.celery import app
from article.models import Article
import time
import random
from math import ceil

env_settings = EnvSettings()


BATCH_SIZE = 50  # 每批處理 50 筆,避免一次塞爆 API
MAX_RETRIES = 5
BASE_DELAY = 2  # 秒

def retry_with_backoff(func, *args, **kwargs):
    for attempt in range(MAX_RETRIES):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            if "ResourceExhausted" in str(e) or "429" in str(e):
                delay = BASE_DELAY * (2 ** attempt) + random.uniform(0, 1)
                print(f"[Quota hit] Retry {attempt+1}/{MAX_RETRIES} after {delay:.2f}s...")
                time.sleep(delay)
            else:
                raise
    raise RuntimeError("Max retries reached for embedding request")

@app.task()
def store_data_in_pinecone(article_id_list: list):
    vector_store = PineconeVectorStore(
        index=Pinecone(api_key=env_settings.PINECONE_API_KEY)
        .Index(env_settings.PINECONE_INDEX_NAME),
        embedding=GoogleGenerativeAIEmbeddings(
            model=env_settings.GOOGLE_EMBEDDINGS_MODEL,
            google_api_key=SecretStr(env_settings.GOOGLE_API_KEY),
        ),
    )

    documents = []
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=300,
        chunk_overlap=50,
    )

    for article in Article.objects.filter(id__in=article_id_list).all():
        chunks = text_splitter.split_text(article.content)
        for i, chunk in enumerate(chunks):
            documents.append(
                Document(
                    page_content=chunk,
                    metadata={
                        "article_id": article.id,
                        "board": article.board,
                        "title": article.title,
                        "author": article.author,
                        "post_time": str(article.post_time),
                        "url": article.url,
                        "chunk_index": i,
                    },
                )
            )

    # 分批送入,避免一次塞太多
    total_batches = ceil(len(documents) / BATCH_SIZE)
    for i in range(total_batches):
        batch_docs = documents[i * BATCH_SIZE : (i + 1) * BATCH_SIZE]
        retry_with_backoff(vector_store.add_documents, documents=batch_docs)
        print(f"[Batch {i+1}/{total_batches}] Uploaded {len(batch_docs)} docs")

明天【Day 19】PTT 文章智慧問答系統 - 整合 Gemini LLM 回答語意查詢問題
我們已經將向量資料存入完畢,接下來就是要取得與使用者的提問相近的資料,為後續的智慧問答功能做好準備!


上一篇
【Day 17】建立向量資料庫基礎 - Gemini Embedding 與 Pinecone 初步串接
下一篇
【Day 19】PTT 文章智慧問答系統 - 整合 Gemini LLM 回答語意查詢問題
系列文
一起來打造 PTT 文章智慧問答系統!20
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言