iT邦幫忙

2024 iThome 鐵人賽

DAY 22
0
生成式 AI

從系統設計切入,探索 GenAI 在企業中的實踐系列 第 22

[Day22] 結構化資料前處理:架設從 RabbitMQ 到 Qdrant 向量儲存的資料流

  • 分享至 

  • xImage
  •  

以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫

在資料處理領域,資料流處理和批量處理是兩種常見且關鍵的技術。隨著系統需要即時處理大量持續流入的資料,資料流處理相比批量處理,具有更低的延遲,使其特別適合需要快速反應的即時分析和系統監控場景。

在本篇文章中,我們將介紹如何使用 Bytewax 和 Qdrant 來實現資料流處理的架構:

  • Bytewax 是一個資料流處理框架,能夠即時處理資料,並支持並行處理,適合處理來自串流資料來源(如 RabbitMQ)的實時資料。
  • Qdrant 是一個向量資料庫,專門用來儲存高維度向量資料,特別適合於像嵌入向量這類資料的儲存和檢索。

本文將逐步展示如何從 RabbitMQ 隊列接收資料,經過多層處理後,將清理和嵌入的資料存儲到 Qdrant。接著,我們將介紹 Bytewax 和 Qdrant 以及資料處理的完整架構。

資料流處理與批處理的比較

特點 批處理 資料流處理
處理方式 集中處理大批量數據 即時處理每一條數據
延遲
適用場景 非即時任務、批量報告 即時分析、監控
資源利用率 資源利用集中於處理時間點 資源利用平均且持續
可控性 易於管理大規模數據,適合預定計劃 適應動態、連續的數據流

在我們的系統中,資料來自 RabbitMQ 隊列,並且需要進行即時處理。因此,選擇資料流處理是最佳解決方案。資料在處理完畢後會被快速地存入 Qdrant 向量資料庫,以供即時檢索。這種架構可以大幅減少延遲,並確保系統能夠實時處理資料。

Bytewax 資料流架構

Bytewax 是一個靈活的資料流處理框架,允許我們通過 輸入(Input)處理(Processing)輸出(Output) 三個步驟來設計資料流管道。接下來將展示如何使用 Bytewax 實現從 RabbitMQ 到 Qdrant 的即時處理過程。

1. 輸入:從 RabbitMQ 讀取數據

首先,我們從 RabbitMQ 隊列中讀取資料,將其引入 Bytewax 流程中。

flow = Dataflow("Streaming ingestion pipeline")
stream = op.input("input", flow, RabbitMQSource())
  • Dataflow():這行程式碼建立了一個資料流處理管道,並命名為 "Streaming ingestion pipeline"DataflowBytewax 的核心架構,負責組織整個資料流的處理步驟。

  • op.input():這個函式會從定義的資料來源讀取資料。在這個例子中,我們指定了來自 RabbitMQ 的資料源 (RabbitMQSource()),表示資料將從 RabbitMQ 消息隊列中進行讀取。

  • stream:變數 stream 是資料流的起點,它接收到的資料會推送進資料處理管道,後續處理步驟會依次對這些資料進行清理、轉換等操作。

2. 資料處理:多層處理步驟

在此步驟中,資料流管道將接收到的原始資料逐步轉換為最終的嵌入向量。這個過程分為多層處理,包括消息的結構化處理、數據清理、分塊以及嵌入。

(1) 原始消息處理

首先,我們需要將從 RabbitMQ 隊列中讀取的原始數據轉換為結構化的 Pydantic 模型,以便在後續的處理步驟中進行進一步操作。

stream = op.map("raw dispatch", stream, RawDispatcher.handle_mq_message)
  • op.map()mapBytewax 中的一個操作,它會對每條資料進行處理並返回一個新的資料流。在這裡,我們將原始資料進行轉換。
  • `"raw dispatch" :這個是操作的名稱標籤,用於追蹤資料流中的步驟名稱。
  • RawDispatcher.handle_mq_message:這裡調用了 RawDispatcher 中的 handle_mq_message 方法,該方法將從 RabbitMQ 接收到的原始資料轉換為 Pydantic 模型,為後續的處理步驟做準備。

這個步驟是將原始資料從無結構的狀態轉換為可以操作的結構化資料,方便後續的數據清理和處理。

(2) 資料清理

接下來,我們對資料進行清理,移除無效字符、符號等不必要的內容,並保留對後續處理有用的部分。

stream = op.map("clean dispatch", stream, CleaningDispatcher.dispatch_cleaner)
  • op.map():同樣使用 map 操作來對每一條資料進行轉換。
  • "clean dispatch":這是清理操作的名稱標籤,標示這個步驟在資料流中的作用。
  • CleaningDispatcher.dispatch_cleaner:這裡調用了 CleaningDispatcher 中的 dispatch_cleaner 方法,該方法負責根據資料的類型,選擇合適的清理器來清理數據。

這一步是確保資料在進入下一個階段前,已經去除了不必要的雜訊,並且保持資料的一致性。

(3) 分塊處理

數據清理後,我們需要將其分塊,這樣可以更有效地進行嵌入處理。使用 ChunkingDispatcher 進行分塊操作。

stream = op.flat_map("chunk dispatch", stream, ChunkingDispatcher.dispatch_chunker)
  • op.flat_map()flat_map 是一個特別的操作,它可以將單個資料轉換為多個輸出。例如,在這裡,清理後的資料會被分割成多個塊(chunk),每個塊會作為獨立的資料進行後續處理。
  • "chunk dispatch":這是分塊操作的標籤,用於追蹤這一步的操作。
  • ChunkingDispatcher.dispatch_chunker:這裡調用了 ChunkingDispatcher 中的 dispatch_chunker 方法,該方法負責將資料分塊,使每個塊都能獨立進行嵌入處理。

這步驟主要是將資料分割,以用作接下來的嵌入。

(4) 嵌入處理

最後,我們對分塊後的資料進行嵌入處理,將文本轉換為向量表示形式,這些向量將用於後續的檢索或分析工作。

stream = op.map("embedded chunk dispatch", stream, EmbeddingDispatcher.dispatch_embedder)
  • op.map():這裡再次使用 map 操作來對每個資料塊進行嵌入處理。
  • "embedded chunk dispatch":這是嵌入操作的標籤,用來追蹤這一步驟。
  • EmbeddingDispatcher.dispatch_embedder:這裡調用了 EmbeddingDispatcherdispatch_embedder 方法,該方法負責將分塊後的數據轉換為嵌入向量。

這步驟是將文本數據轉換為向量表示,這些向量會被儲存到向量資料庫中,供後續的檢索和分析使用。

3. 輸出:存儲到 Qdrant 向量資料庫

在資料處理完畢後,最後一步是將處理後的數據存儲到 Qdrant 向量資料庫。

op.output("cleaned data insert to qdrant", stream, QdrantOutput(connection=connection, sink_type="clean"))
op.output("embedded data insert to qdrant", stream, QdrantOutput(connection=connection, sink_type="vector"))
  • op.output():這是 Bytewax 用來將資料流輸出到外部系統的操作。這裡指定了數據的目標系統為 Qdrant
  • "cleaned data insert to qdrant":這個操作標籤表示將清理後的數據插入到 Qdrant 中。
  • stream:資料流,這裡表示要處理的資料流是已經完成清理和嵌入的數據。
  • QdrantOutput(connection=connection, sink_type="clean"):這裡我們定義了輸出的數據類型和數據庫連接。sink_type="clean" 表示存入的是清理後的數據。
  • QdrantOutput(connection=connection, sink_type="vector"):這裡定義了嵌入後數據的輸出,將嵌入向量存儲到 Qdrant 的向量集合中。

Qdrant 資料存儲與輸出架構

Qdrant 是一個專門用來存儲向量數據的資料庫,特別適合於像嵌入向量這樣的大規模高維數據。在資料流處理過程中,處理完的數據需要被高效地存儲到 Qdrant 向量資料庫中。為了實現這個過程,Bytewax 提供了兩個工具:DynamicSinkStatelessSinkPartition

Bytewax:DynamicSinkStatelessSinkPartition

  • DynamicSink:這個工具允許我們根據資料的不同狀態或類型動態構建資料輸出管道,並將資料輸送到不同的目的地。它可以靈活地根據處理邏輯,選擇不同的接收器來存儲資料,這對於多種類型資料的處理系統尤為重要。

  • StatelessSinkPartition:這個工具負責處理具體的資料寫入邏輯,將處理後的資料分批寫入資料庫。它是 DynamicSink 實際寫入資料的具體實現。在這裡,我們使用不同的 StatelessSinkPartition 類別來處理清理後和嵌入後的資料,並將它們分別存入 Qdrant 的不同集合。

QdrantOutput 的實現

以下程式碼展示了如何使用 DynamicSinkStatelessSinkPartition,根據資料狀態選擇不同的接收器來處理資料。

from bytewax.outputs import DynamicSink, StatelessSinkPartition
from db.qdrant import QdrantDatabaseConnector

class QdrantOutput(DynamicSink):
    def __init__(self, connection: QdrantDatabaseConnector, sink_type: str) -> None:
        self._connection = connection
        self._sink_type = sink_type

    def build(self, worker_index: int, worker_count: int) -> StatelessSinkPartition:
        if self._sink_type == "clean":
            return QdrantCleanedDataSink(self._connection)
        elif self._sink_type == "vector":
            return QdrantVectorDataSink(self._connection)
  • QdrantOutput(DynamicSink)QdrantOutput 繼承了 DynamicSink,並根據 sink_type 動態選擇要使用的接收器。sink_type 會根據資料是清理後的還是嵌入後的來選擇不同的寫入邏輯。
  • build()build() 是一個動態方法,根據傳入的 sink_type 返回相應的資料接收器。例如,當 sink_type"clean" 時,它會返回 QdrantCleanedDataSink;當 sink_type"vector" 時,則返回 QdrantVectorDataSink

這段程式碼的作用是根據數據的不同狀態(清理後或嵌入後),自動選擇適當的接收器來將數據寫入 Qdrant

清理後資料的儲存邏輯

這一部分的邏輯負責將清理後的數據寫入到 Qdrant 的指定集合中。QdrantCleanedDataSink 是用來處理清理後資料的接收器,它使用 write_batch() 方法來將資料批量寫入資料庫。

class QdrantCleanedDataSink(StatelessSinkPartition):
    def __init__(self, connection: QdrantDatabaseConnector):
        self._client = connection

    def write_batch(self, items: List[DBDataModel]) -> None:
        payloads = [item.save() for item in items]
        ids, data = zip(*payloads)
        collection_name = get_clean_collection(...)
        self._client.write_data(
            collection_name=collection_name,
            points=Batch(ids=ids, vectors={}, payloads=data),
        )
  • QdrantCleanedDataSink(StatelessSinkPartition):這是處理清理後資料的接收器,它繼承了 StatelessSinkPartition,負責具體的資料寫入邏輯。
  • write_batch():這個方法會接收批量的資料(items),並將其保存到指定的 Qdrant 集合中。每一個資料會先進行序列化(item.save()),並提取出需要儲存的 idsdata,然後調用 self._client.write_data() 將它們寫入到對應的集合中。

這段程式碼實現了將清理後的資料(通常是結構化的文字資料)批量寫入到 Qdrant 中,用於後續的檢索和分析`

嵌入後資料的儲存邏輯

與清理後資料的邏輯相似,嵌入後的數據會被存儲到向量集合中。QdrantVectorDataSink 負責處理這些嵌入後的向量數據,並將它們存入 Qdrant

class QdrantVectorDataSink(StatelessSinkPartition):
    def __init__(self, connection: QdrantDatabaseConnector):
        self._client = connection

    def write_batch(self, items: List[DBDataModel]) -> None:
        payloads = [item.save() for item in items]
        ids, vectors, metadata = zip(*payloads)
        collection_name = get_vector_collection(...)
        self._client.write_data(
            collection_name=collection_name,
            points=Batch(ids=ids, vectors=vectors, payloads=metadata),
        )
  • QdrantVectorDataSink(StatelessSinkPartition):這個接收器專門用來處理嵌入後的向量資料,它會將嵌入的向量和相關的 metadata 批量寫入 Qdrant 的向量集合中。
  • write_batch():與清理後數據的邏輯類似,這個方法會接收批量的嵌入後資料據(items),並通過 item.save() 來序列化每條資料。然後將 idsvectorsmetadata 提取出來,最終寫入到 Qdrant 中的指定向量集合。

這段程式碼負責將嵌入後的向量批量寫入 Qdrant,用於後續的向量檢索、相似度計算等應用。


ref.


上一篇
[Day21] 結構化資料前處理:Pydantic 管理資料狀態
下一篇
[Day23] 結構化資料前處理:清理、分塊與嵌入的實踐
系列文
從系統設計切入,探索 GenAI 在企業中的實踐25
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言