iT邦幫忙

2025 iThome 鐵人賽

DAY 14
0

⚡《AI 知識系統建造日誌》這不是一篇純技術文章,而是一場工程師的魔法冒險。程式是咒語、流程是魔法陣、錯誤訊息則是黑暗詛咒。請準備好你的魔杖(鍵盤),今天,我們要踏入魔法學院的基礎魔法課,打造穩定、可擴展的 AI 知識系統。


前言

延續先前Ollam, RAG 介紹,若你錯過前情,可以先看看這幾篇:


環境infra

noteserver Container

  noteserver:
    build:
      context: .
      dockerfile: ./services/noteservice/Dockerfile.noteserver
    image: noteserver:latest
    container_name: note
    volumes:
      - ./note:/app
      - ./data/noteserver:/data
      - ./docker_cache/hf_cache:/root/.cache/huggingface # avoid space out, you don't need to add this.
    ports:
      - "8022:8000"
    env_file:
      - .env
    networks:
      - langfuse-otel-net
    

Dockerfile.noteserver

FROM python:3.10-slim

RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    git \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY services/noteservice/requirements.txt ./requirements.txt

COPY ./note /app

RUN pip install --no-cache-dir --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]



回顧 rag pipeplne

  1. get_embedding
  2. qdrant_client.search()
  3. re_ranking
  4. langchain_client.prompt_builder.create_structured_prompt
  5. langchain_client.llm_context
query_embedding = get_embedding(query)
chunks, sources, msg, arxiv_ids, total_hits = qdrant_client.search(
        query=query,
        query_vector=query_embedding,
        size=system_settings.top_k,
        min_score=0.3,
    )
    
if chunks:
    reranked = re_ranking(
                chunks,
                query,
                vector_weight=vector_weight,
                bm25_weight=bm25_weight,
            ) # 💡是可選步驟
    prompt_data = langchain_client.prompt_builder.create_structured_prompt(
                    query, reranked, system_settings.user_language
                )
    resp = langchain_client.llm_context(
                final_prompt,
                query,
                user_language=system_settings.user_language,
                system_prompt=system_settings.system_prompt,
            )

get_embedding 已經於 Day 7 | 穿越 RAG 魔法迷宮:打造智慧問答系統的秘訣 - RAG Pipeline 跟大家見面了。今天終於可以來看看 其餘幾個如何實現。 將目標轉向 qdrant_client.search rerank

rag pipeplne 說穿了就是 🔮 Embedding → 📚 Qdrant Hybrid Search → ⚖️ Re-ranking → 🏗️ Prompt Build → 🧠 LLM Generate


由於 過了一段時間,魔法也迭代換新了(畢竟連iphone 每年都可以迭代換新了),不過分吧,會將

  • Day 5|資料的家:MinIO , Qdrant, PostgreSQL 的故事 — 文件與資料庫 的 Qdrant 施展小魔法 修改了一下 qdrant_client.search 使用 hybird search
  • langchain_client.prompt_builder.create_structured_prompt 換成 ollama_client.prompt_builder.create_structured_prompt 以及 ollama_client.prompt_builder.create_rag_prompt (其實就是 prompt 不同而已)
  • langchain_client.llm_context 蛻變成 ollama_client.generate_rag_answer

🌟 核心魔法流程(RAG Pipeline 更新簡化版)

Query
  │
  ▼
Embedding(get_embedding)  →  Dense Vector
  │
  ▼
Qdrant Hybrid Search(Dense + BM25)→ Top K Candidates
  │
  ▼
Optional Re-ranking(vector_weight + bm25_weight)→ 精準排序
  │
  ▼
Prompt Builder(ollama_client.create_structured_prompt / create_rag_prompt)
  │
  ▼
LLM Generate Answer(ollama_client.generate_rag_answer)

def retrieval_pipeline(
    query: str,
    system_settings: SystemSettings,
    qdrant_client: QdrantClient,
    search_mode: str = "hybrid",
    rag_tracer: RAGTracer = None,
    trace=None,
    categories=None,
) -> Tuple[List[dict], List[str]]:

    logger.info("Step 0: Embedding")

    query_embedding = get_embedding(query)


    logger.info("Step 1: Retrieval")
    top_k = max(1, system_settings.top_k)  # avoid zero
    chunks, sources, msg, arxiv_ids, total_hits = qdrant_client.search(
        query=query,
        query_vector=query_embedding,
        size=top_k * 2,  # retrieve more for reranking
        min_score=0.3,
        hybrid=system_settings.hybrid_search,
        categories=categories,
    )
        
    if not chunks:
        logger.warning("No chunks retrieved, fallback to query as prompt")
        return [], []

    if not system_settings.reranker_enabled:
        logger.info("Reranker disabled, skip reranking step")
        return chunks[:top_k], sources

    logger.info("Step 2: Re-ranking ")
    vector_weight = 0.6
    bm25_weight = 0.3
    reranked = re_ranking(
        chunks,
        query,
        vector_weight=vector_weight,
        bm25_weight=bm25_weight,
    )
    
    return reranked[:top_k], sources



# --- Full RAG pipeline ---
async def ask_flow(
    query: str,
    system_settings: SystemSettings,
    ollama_client: OllamaClient,
    qdrant_client: QdrantClient,
    rag_tracer: RAGTracer,
    trace=None,
    user_id: str = "anonymous",
    model: str = "gpt-oss:20b",
) -> AskResponse:
    search_mode = "hybrid" if system_settings.hybrid_search else "dense-only"

    chunks, sources = retrieval_pipeline(
        query, system_settings, qdrant_client, search_mode, rag_tracer, trace
    )
    if not chunks:
        response = AskResponse(
            query=query,
            answer="I couldn't find any relevant information in the papers to answer your question.",
            sources=[],
            chunks_used=0,
            search_mode=search_mode,
        )
        return response

    logger.info("Step 4: Build prompt")
    try:
        prompt_data = ollama_client.prompt_builder.create_structured_prompt(
            query, chunks, system_settings.user_language
        )
        final_prompt = prompt_data["prompt"]
    except Exception:
        final_prompt = ollama_client.prompt_builder.create_rag_prompt(
            query, chunks, system_settings.user_language
        )

   
    parsed_response, response = await ollama_client.generate_rag_answer(
        query=query,
        chunks=chunks,
        user_language=system_settings.user_language,
        use_structured_output=True,
        temperature=system_settings.temperature,
    )


    return AskResponse(
        query=query,
        answer=parsed_response.get("answer", "Unable to generate answer"),
        sources=sources,
        chunks_used=len(chunks),
        search_mode=search_mode,
    )


技術重點 - hybird search

在現代資訊檢索系統中,單一的搜尋方法往往難以滿足不同查詢類型的需求。Dense embedding(稠密向量)可以捕捉語義,讓使用者即使查詢與文件中使用不同詞彙,也能找到相關內容;而傳統的 keyword-based 方法(如 Bag-of-Words、TFIDF、BM25)在精確匹配和特殊名稱識別上仍具有不可替代的優勢。本段落介紹如何在 Qdrant 中實現混合搜尋(Hybrid Search),同時利用稠密向量與稀疏向量的優勢。

一句話 : **Hybrid Search(混合搜尋)**是一種同時結合 向量搜尋(Vector Search) 與 傳統文字搜尋(Text Search) 的搜尋策略。

我們來談 Hybrid Search(混合搜尋)。我會從概念到實作都說清楚,並且跟我現在用的 Qdrant 範例做連結。


稀疏向量(Sparse Vectors)

傳統的基於關鍵字的檢索,也可以視為向量搜尋,只不過這些向量大部分維度為 0,即稀疏向量。向量的每個非零維度對應字典中某個詞的出現。稀疏向量的優勢在於:

  • 能精確匹配專有名詞、代碼或隨機字符等文本。
  • 字典可以動態擴充,不需要重新訓練模型。

BM25

BM25(Best Matching 25)是經典的稀疏向量方法,基於統計模型,不涉及神經網路,因此非常快速輕量,常作為搜尋基準。

BM25 的核心公式包含:

  • Term Frequency (TF):同一詞彙在文件中出現多次會增加相關性,但有遞減效果。
  • Inverse Document Frequency (IDF):提升稀有詞彙的重要性,降低常見詞權重。
  • Document Length Normalization:防止長文檔因字數多而獲得不公平的高分。

為什麼要用 Hybrid Search

Dense 主要抓語意,Sparse 精確匹配,Hybrid 則兼顧兩者。

特性 / 搜尋方式 Dense Vector Search (Embedding) Sparse Search (BM25 / TF-IDF) Hybrid Search (Dense + Sparse)
原理 文字向量化 → 比較語意相似度 傳統文字檢索 → 關鍵字匹配 同時使用語意向量 + 關鍵字匹配,結果融合
優點 抓語意相關、關鍵字不同也能找到答案 精確匹配關鍵字、處理專有名詞或代碼有效 - 結合雙方優勢、語意 + 精確匹配
缺點 精確字面匹配差、需要向量模型 無法捕捉語意相似、 容易漏掉同義詞 - 較複雜、需要融合策略與計算成本
排序方式 Cosine / Dot-product 相似度 BM25 分數 Dense + Sparse 結果融合 (如 RRF)
適合場景 問答系統、語意搜索、相似文件查找 文件檢索、專有名詞檢索、精確匹配 大型知識庫、RAG pipeline、LLM 上下文準備

對應的修改

Day 5|資料的家:MinIO , Qdrant, PostgreSQL 的故事 — 文件與資料庫 的 Qdrant 施展小魔法 修改了一下 qdrant_client.search 使用 hybird search ,來展開談談吧

qdrant_client = QdrantClient("http://localhost:6333")

建魔法陣(Create Collection)

qdrant_client.create_collection(
        collection_name=COLLECTION_NAME,
        vectors_config={
            "dense": models.VectorParams(
                size=768,
                distance=models.Distance.COSINE,
            ),
        },
        sparse_vectors_config={
            "bm25": models.SparseVectorParams(
                modifier=models.Modifier.IDF,
            )
        },
    )

召喚資料(Upsert Points)

for chunk_idx, chunk in enumerate(chunks):
    point = models.PointStruct(
            id=uuid.uuid4().hex,
            vector={
                # dense embedding 使用指定模型 all-mpnet-base-v2
                "dense": vector,  # 自己計算的向量
                # sparse embedding (BM25) 自動計算
                "bm25": models.Document(
                    text=chunk,
                    model="Qdrant/bm25",
                ),
            },
            payload={
                **metadata,
                "text": chunk,
                "chunk_idx": chunk_idx,
            },
        )

    points.append(point)
    
qdrant_client.upsert(
        collection_name=COLLECTION_NAME, points=batch_points
    )
    

query_vector 這邊使用自己的 embedding 。雖然models.Document 會自動幫我 做 Vector Embedding,但是只限一些 hugging face 的embedding模型

施展雙重搜尋(Hybrid Search + Fusion)


query_vector = embedding(query)

query_result = self.client.query_points(
            collection_name=self.settings.COLLECTION_NAME,
            prefetch=[
                models.Prefetch(query=query_vector, using="dense", limit=5 * size),
                models.Prefetch(
                    query=models.Document(
                        text=query,
                        model="Qdrant/bm25",
                    ),
                    using="bm25",
                    limit=5 * size,
                ),
            ],
            # Fusion query enables fusion on the prefetched results
            query=models.FusionQuery(fusion=models.Fusion.RRF),
            limit=size,
            with_payload=True,
            ).points
  1. Prefetchusing="dense" → 語意向量搜尋。
  2. Prefetchusing="bm25" → 傳統文字搜尋。
  3. FusionQuery(fusion=RRF) → 將兩個搜尋結果融合,排序出最終結果。

RRF = Reciprocal Rank Fusion,一種融合多個搜尋結果排序的方法。


Rerank 依舊可選。可是 qdrant 不是 會自行排序? 👍

敏感的你可能觀察到 Qdrant 的 query_points 已經會回傳 排序好的結果(不論是 dense-only 或 hybrid with RRF),那為什麼還要額外做 re_ranking

Qdrant 幫你快速抓 Top-K 候選,Re-rank 才是讓 LLM 看見真正最 relevant 資料的關鍵。

🔎 Qdrant 排序 vs. 自行 Re-ranking

1. Qdrant 排序

  • Dense-only 模式 → 依照 embedding 的 cosine / dot-product 相似度排序。
  • BM25-only 模式 → 依照 BM25 分數排序。
  • Hybrid (FusionQuery, RRF) → 把 dense 與 BM25 的排名融合,產生新的分數,再排序後回傳 top K。

👉 也就是說,當你呼叫 Qdrant,得到的 points 已經是 依照 RRF 或某一種策略排序的。


2. 為什麼還要 Re-ranking?

原因在於 控制權策略細緻化

  • 自訂權重
    Qdrant 的 RRF 是一種 rank fusion,並不是「可調權重」的融合。

    • RRF = Reciprocal Rank Fusion,本質上是依照排名位置加分,公式是

      (每個檢索系統的影響力相同)

    • 這是 RRF 沒有提供的功能,所以才需要 後處理 re-ranking

  • 加入更多訊號
    除了 Qdrant 原本的分數,你可能還想引入:

    • 文件的 metadata(例如發表時間越近越重要)
    • 使用者偏好(例如某些分類的 paper 要優先)
    • 外部 reranker 模型(cross-encoder BERT / LLM-based scorer)
  • 實驗迭代
    透過自己 re-ranking,可以隨時更換公式,而不用依賴 Qdrant 內建的 fusion 模式。


3. 最佳實務

通常會有兩層 ranking:

  1. Retriever ranking (Qdrant)

    • 用 dense / bm25 / hybrid 快速縮小候選(例如 200 筆)。
    • 保證 recall,不漏掉潛在 relevant 文件。
  2. Reranker ranking (本地 re-ranking)

    • 用更複雜或自訂的方法排序,取最終 top K(例如 20 筆)。
    • 提升 precision,讓 LLM 看見最 relevant 的內容。

4. 程式碼

  1. 計算 vector score
  2. 計算 BM25 score
  3. Combine + 排序
    • 以下程式碼裡的 vector_weight = 0.6, bm25_weight = 0.3,就是想要更細緻地控制 dense 與 bm25 的影響力。
from typing import Any, List, Dict

import jieba
from rank_bm25 import BM25Okapi


def re_ranking(
    chunks: List[Dict[str, Any]],
    query: str,
    vector_weight: float = 0.6,
    bm25_weight: float = 0.3,
    field_weights: dict = None,
) -> List[Dict[str, Any]]:
    """
    Hybrid reranking: vector similarity + BM25 text matching + 欄位加權
    chunks: list of payload dict,需包含 text/title/abstract 與向量相似度 score
    field_weights: 欄位權重,例如 {'text':1.0, 'title':0.8, 'abstract':0.5}
    """
    if field_weights is None:
        field_weights = {"text": 1.0, "title": 0.8, "abstract": 0.5}

    query_tokens = list(jieba.cut(query.lower()))

    scored_chunks = []

    for chunk in chunks:
        # 1️⃣ Vector similarity
        vector_score = chunk.get("score", 1.0)  # 假設 retrieval 已給向量相似度

        # 2️⃣ BM25 score
        bm25_total = 0.0
        for field, weight in field_weights.items():
            content = chunk.get(field, "")
            if not content:
                continue
            tokens = list(jieba.cut(content.lower()))
            bm25 = BM25Okapi([tokens])
            score = bm25.get_scores(query_tokens)[0]  # 單篇文件的 BM25 分數
            bm25_total += weight * score

        # 3️⃣ Combine scores
        total_score = vector_weight * vector_score + bm25_weight * bm25_total
        chunk_with_score = chunk.copy()
        chunk_with_score["vector_score"] = vector_score
        chunk_with_score["bm25_score"] = bm25_total
        chunk_with_score["total_score"] = total_score

        scored_chunks.append((total_score, chunk_with_score))

    # 排序
    scored_chunks.sort(key=lambda x: x[0], reverse=True)
    reranked_chunks = [chunk for score, chunk in scored_chunks]
    return reranked_chunks


# {
#     "id": "doc2",
#     "title": "深度學習應用",
#     "abstract": "卷積神經網路在影像辨識的應用。",
#     "text": "包含 CNN、RNN、Transformer 的案例。",
#     "score": 0.75,
#     "vector_score": 0.75,       # 向量分數
#     "bm25_score": 2.1,          # BM25 欄位加權後分數
#     "total_score": 1.38         # 最終分數 (排序依據)
# }

結論

Qdrant 排序提供一個 快速、通用的初始 ranking

  • Re-ranking 的必要性在於:
    • 補足 Qdrant fusion 的限制(RRF 無法調權重)
    • 引入更多 domain-specific 訊號
    • 精緻化最終給 LLM 的上下文
  • Future work
    • 加入更多訊號
      • 例如「論文時間越新越高分」、「作者權威度加權」。
    • 高級 reranker
      • 可以接上 Cross-Encoder / LLM-based Scorer,提升最終答案品質。

小結

今天的魔法課帶大家認識了 Qdrant Hybrid Search(提升 recall) 與 自訂 re-ranking(提升 precision),兩者搭配才能讓 LLM 吃到最 relevant 的知識

明天再來討論 build_prompt -> generate。必經魔法需要CD 時間。


補充 QdrantClient

from typing import Dict, List, Optional

from config import Settings
from logger import AppLogger
from qdrant_client import QdrantClient as Client
from qdrant_client import models
from qdrant_client.http.exceptions import UnexpectedResponse

logger = AppLogger(__name__).get_logger()


class QdrantClient:
    def __init__(self, settings: Settings):
        self.settings = settings

        self.client = Client(
            url=settings.QDRANT_URL,
            timeout=60,
        )

    def create_collection(self):
        try:
            self.client.create_collection(
                collection_name=self.settings.COLLECTION_NAME,
                vectors_config={
                    "dense": models.VectorParams(
                        size=768,
                        distance=models.Distance.COSINE,
                    ),
                },
                sparse_vectors_config={
                    "bm25": models.SparseVectorParams(
                        modifier=models.Modifier.IDF,
                    )
                },
            )
            print(
                f"✅ Qdrant collection `{self.settings.COLLECTION_NAME}` created successfully."
            )
        except UnexpectedResponse as e:
            # 如果已存在就當作正常,不丟錯
            if "already exists" in str(e):
                logger.info(
                    f"ℹ️ Qdrant collection `{self.settings.COLLECTION_NAME}` already exists, skipping creation."
                )
            else:
                raise  # 其他 UnexpectedResponse 直接丟出

    def get_collections(self):
        return self.client.get_collections()


    def search(
        self,
        query: str,
        query_vector: Optional[List[float]] = None,
        size: int = 10,
        categories: Optional[List[str]] = None,
        min_score: float = 0.25,
        hybrid: bool = True,
    ) -> tuple[List[Dict], List[str], str, List[str], int]:
        # Step 1: 建立 Qdrant filter
        must_conditions = []

        if categories:
            for cat in categories:
                if cat:
                    must_conditions.append(
                        models.FieldCondition(
                            key="categories", match=models.MatchValue(value=cat)
                        )
                    )

        filter_cond = models.Filter(must=must_conditions) if must_conditions else None

        logger.info(f"filter_cond {filter_cond}")
        # Step 2: Qdrant search
        # Hybrid search:向量 + filter

        if hybrid:
            # 🚀 Hybrid search (dense + sparse)
            logger.info("Hybrid search (dense + sparse)")

            query_result = self.client.query_points(
                collection_name=self.settings.COLLECTION_NAME,
                prefetch=[
                    models.Prefetch(query=query_vector, using="dense", limit=5 * size),
                    models.Prefetch(
                        query=models.Document(
                            text=query,
                            model="Qdrant/bm25",
                        ),
                        using="bm25",
                        limit=5 * size,
                    ),
                ],
                # Fusion query enables fusion on the prefetched results
                query=models.FusionQuery(fusion=models.Fusion.RRF),
                limit=size,
                with_payload=True,
            )

        else:
            logger.info("Dense-only search")
            # 🚀 Dense-only search
            query_result = self.client.query_points(
                collection_name=self.settings.COLLECTION_NAME,
                query=query_vector,
                using="dense",
                limit=size,
                with_payload=True,
            )
            

        # Extract essential data for LLM
        chunks = []
        arxiv_ids = []
        sources = set()

        msg = ""
        for hit in query_result.points:
            if hit.score < min_score:
                continue
            payload = hit.payload
            logger.info(f"payload {payload}")
            arxiv_id = payload.get("arxiv_id", "")

            # Minimal chunk data for LLM
            chunks.append(
                {
                    "arxiv_id": arxiv_id,
                    "chunk_text": payload.get("text", payload.get("abstract", "")),
                }
            )

            if arxiv_id:
                arxiv_ids.append(arxiv_id)
                arxiv_id_clean = arxiv_id.split("v")[0] if "v" in arxiv_id else arxiv_id
                sources.add(f"https://arxiv.org/pdf/{arxiv_id_clean}.pdf")

            info_str = f"title: {payload['title']} \n Score: {hit.score}\n arxiv_id : {payload['arxiv_id']}"
            msg += f"Retrieved {len(chunks)} chunks from collection {self.settings.COLLECTION_NAME}\n{info_str}\n\n\n"

        logger.info(
            f"search found {len(chunks)} chunks, {len(sources)} sources\n\n with filter {filter_cond}\n\n"
        )

        return chunks, list(sources), msg, arxiv_ids, len(query_result.points)



上一篇
Day 13|資料管線魔法進階:Prefect 架構與自動化全解析(下)
下一篇
Day15|RAG 魔法課 (下):Prompt 煉金術與答案生成
系列文
論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台16
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言