iT邦幫忙

2024 iThome 鐵人賽

DAY 22
0
生成式 AI

從系統設計切入,探索 GenAI 在企業中的實踐系列 第 22

[Day22] 結構化資料前處理:架設從 RabbitMQ 到 Qdrant 向量儲存的資料流

  • 分享至 

  • xImage
  •  

以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫

在資料處理領域,資料流處理和批量處理是兩種常見且關鍵的技術。隨著系統需要即時處理大量持續流入的資料,資料流處理相比批量處理,具有更低的延遲,使其特別適合需要快速反應的即時分析和系統監控場景。

在本篇文章中,我們將介紹如何使用 Bytewax 和 Qdrant 來實現資料流處理的架構:

  • Bytewax 是一個資料流處理框架,能夠即時處理資料,並支持並行處理,適合處理來自串流資料來源(如 RabbitMQ)的實時資料。
  • Qdrant 是一個向量資料庫,專門用來儲存高維度向量資料,特別適合於像嵌入向量這類資料的儲存和檢索。

本文將逐步展示如何從 RabbitMQ 隊列接收資料,經過多層處理後,將清理和嵌入的資料儲存到 Qdrant。我們將首先比較資料流處理與批量處理的異同,然後展示具體的資料處理架構。

資料流處理與批處理的比較

特點 批處理 資料流處理
處理方式 集中處理大批量數據 即時處理每一條數據
延遲
適用場景 非即時任務、批量報告 即時分析、監控
資源利用率 資源利用集中於處理時間點 資源利用平均且持續
可控性 易於管理大規模數據,適合預定計劃 適應動態、連續的數據流

在我們的系統中,資料來自 RabbitMQ 隊列,並且需要進行即時處理。因此,選擇資料流處理是最佳解決方案。資料在處理完畢後會被快速地存入 Qdrant 向量資料庫,以供即時檢索。這種架構可以大幅減少延遲,並確保系統能夠實時處理資料。

Bytewax 資料流架構

Bytewax 是一個靈活的資料流處理框架,允許我們通過 輸入(Input)處理(Processing)輸出(Output) 三個步驟來設計資料流管道。接下來將展示如何使用 Bytewax 實現從 RabbitMQ 到 Qdrant 的即時處理過程。

1. 輸入:從 RabbitMQ 讀取數據

首先,我們從 RabbitMQ 隊列中讀取資料,將其引入 Bytewax 流程中。

flow = Dataflow("Streaming ingestion pipeline")
stream = op.input("input", flow, RabbitMQSource())
  • Dataflow():這行程式碼建立了一個資料流處理管道,並命名為 "Streaming ingestion pipeline"DataflowBytewax 的核心架構,負責組織整個資料流的處理步驟。

  • op.input():這個函式會從定義的資料來源讀取資料。在這個例子中,我們指定了來自 RabbitMQ 的資料源 (RabbitMQSource()),表示資料將從 RabbitMQ 消息隊列中進行讀取。

  • stream:變數 stream 是資料流的起點,它接收到的資料會推送進資料處理管道,後續處理步驟會依次對這些資料進行清理、轉換等操作。

2. 資料處理:多層處理步驟

在此步驟中,資料流管道將接收到的原始資料逐步轉換為最終的嵌入向量。這個過程分為多層處理,包括消息的結構化處理、數據清理、分塊以及嵌入。

(1) 原始消息處理

首先,我們需要將從 RabbitMQ 隊列中讀取的原始數據轉換為結構化的 Pydantic 模型,以便在後續的處理步驟中進行進一步操作。

stream = op.map("raw dispatch", stream, RawDispatcher.handle_mq_message)
  • op.map()mapBytewax 中的一個操作,它會對每條資料進行處理並返回一個新的資料流。在這裡,我們將原始資料進行轉換。
  • RawDispatcher.handle_mq_message:這裡調用了 RawDispatcher 中的 handle_mq_message 方法,該方法將從 RabbitMQ 接收到的原始資料轉換為 Pydantic 模型,為後續的處理步驟做準備。

(2) 資料清理

接下來,我們對資料進行清理,移除無效字符、符號等不必要的內容,並保留對後續處理有用的部分。

stream = op.map("clean dispatch", stream, CleaningDispatcher.dispatch_cleaner)
  • CleaningDispatcher.dispatch_cleaner:這裡調用了 CleaningDispatcher 中的 dispatch_cleaner 方法,該方法負責根據資料的類型,選擇合適的清理器來清理數據。

(3) 分塊處理

對清理後的數據進行分塊,便於嵌入處理:

stream = op.flat_map("chunk dispatch", stream, ChunkingDispatcher.dispatch_chunker)
  • op.flat_map():將單個數據轉換為多個塊(chunk),每個塊會作為獨立的資料進行後續處理。
  • ChunkingDispatcher.dispatch_chunker:這裡調用了 ChunkingDispatcher 中的 dispatch_chunker 方法,該方法負責將資料分塊,使每個塊都能獨立進行嵌入處理。

(4) 嵌入處理

最後,將分塊後的數據轉換為向量表示形式,這些向量將用於後續檢索或分析:

stream = op.map("embedded chunk dispatch", stream, EmbeddingDispatcher.dispatch_embedder)
  • EmbeddingDispatcher.dispatch_embedder:這裡調用了 EmbeddingDispatcherdispatch_embedder 方法,該方法負責將分塊後的數據轉換為嵌入向量。

3. 輸出:存儲到 Qdrant 向量資料庫

在資料處理完畢後,最後一步是將處理後的數據存儲到 Qdrant 向量資料庫。

op.output("cleaned data insert to qdrant", stream, QdrantOutput(connection=connection, sink_type="clean"))
op.output("embedded data insert to qdrant", stream, QdrantOutput(connection=connection, sink_type="vector"))
  • op.output():這是 Bytewax 用來將資料流輸出到外部系統的操作。這裡指定了數據的目標系統為 Qdrant
  • stream:資料流,這裡表示要處理的資料流是已經完成清理和嵌入的數據。
  • QdrantOutput(connection=connection, sink_type="clean"):這裡我們定義了輸出的數據類型和數據庫連接。sink_type="clean" 表示存入的是清理後的數據。
  • QdrantOutput(connection=connection, sink_type="vector"):這裡定義了嵌入後數據的輸出,將嵌入向量存儲到 Qdrant 的向量集合中。

Qdrant 資料存儲與輸出架構

Qdrant 是一個專門用來存儲向量數據的資料庫,特別適合於像嵌入向量這樣的大規模高維數據。在資料流處理過程中,處理完的數據需要被高效地存儲到 Qdrant 向量資料庫中。為了實現這個過程,Bytewax 提供了兩個工具:DynamicSinkStatelessSinkPartition

Bytewax:DynamicSinkStatelessSinkPartition

  • DynamicSink:這個工具允許我們根據資料的不同狀態或類型動態構建資料輸出管道,並將資料輸送到不同的目的地。它可以靈活地根據處理邏輯,選擇不同的接收器來存儲資料,這對於多種類型資料的處理系統尤為重要。

  • StatelessSinkPartition:這個工具負責處理具體的資料寫入邏輯,將處理後的資料分批寫入資料庫。它是 DynamicSink 實際寫入資料的具體實現。在這裡,我們使用不同的 StatelessSinkPartition 類別來處理清理後和嵌入後的資料,並將它們分別存入 Qdrant 的不同集合。

以下展示如何使用 DynamicSinkStatelessSinkPartition 根據資料狀態選擇不同的接收器來處理資料:

from bytewax.outputs import DynamicSink, StatelessSinkPartition
from db.qdrant import QdrantDatabaseConnector

class QdrantOutput(DynamicSink):
    def __init__(self, connection: QdrantDatabaseConnector, sink_type: str) -> None:
        self._connection = connection
        self._sink_type = sink_type

    def build(self, worker_index: int, worker_count: int) -> StatelessSinkPartition:
        if self._sink_type == "clean":
            return QdrantCleanedDataSink(self._connection)
        elif self._sink_type == "vector":
            return QdrantVectorDataSink(self._connection)
  • QdrantOutput(DynamicSink)QdrantOutput 繼承了 DynamicSink,並根據 sink_type 動態選擇要使用的接收器。sink_type 會根據資料是清理後的還是嵌入後的來選擇不同的寫入邏輯。
  • build()build() 是一個動態方法,根據傳入的 sink_type 返回相應的資料接收器。例如,當 sink_type"clean" 時,它會返回 QdrantCleanedDataSink;當 sink_type"vector" 時,則返回 QdrantVectorDataSink

這段程式碼的作用是根據數據的不同狀態(清理後或嵌入後),自動選擇適當的接收器來將數據寫入 Qdrant

清理後資料的儲存邏輯

這一部分的邏輯負責將清理後的數據寫入到 Qdrant 的指定集合中。QdrantCleanedDataSink 是用來處理清理後資料的接收器,它使用 write_batch() 方法來將資料批量寫入資料庫。

class QdrantCleanedDataSink(StatelessSinkPartition):
    def __init__(self, connection: QdrantDatabaseConnector):
        self._client = connection

    def write_batch(self, items: List[DBDataModel]) -> None:
        payloads = [item.save() for item in items]
        ids, data = zip(*payloads)
        collection_name = get_clean_collection(...)
        self._client.write_data(
            collection_name=collection_name,
            points=Batch(ids=ids, vectors={}, payloads=data),
        )
  • QdrantCleanedDataSink(StatelessSinkPartition):這是處理清理後資料的接收器,它繼承了 StatelessSinkPartition,負責具體的資料寫入邏輯。
  • write_batch():這個方法會接收批量的資料(items),並將其保存到指定的 Qdrant 集合中。每一個資料會先進行序列化(item.save()),並提取出需要儲存的 idsdata,然後調用 self._client.write_data() 將它們寫入到對應的集合中。

嵌入後資料的儲存邏輯

與清理後資料的邏輯相似,嵌入後的數據會被存儲到向量集合中。QdrantVectorDataSink 負責處理這些嵌入後的向量數據,並將它們存入 Qdrant

class QdrantVectorDataSink(StatelessSinkPartition):
    def __init__(self, connection: QdrantDatabaseConnector):
        self._client = connection

    def write_batch(self, items: List[DBDataModel]) -> None:
        payloads = [item.save() for item in items]
        ids, vectors, metadata = zip(*payloads)
        collection_name = get_vector_collection(...)
        self._client.write_data(
            collection_name=collection_name,
            points=Batch(ids=ids, vectors=vectors, payloads=metadata),
        )
  • QdrantVectorDataSink(StatelessSinkPartition):這個接收器專門用來處理嵌入後的向量資料,它會將嵌入的向量和相關的 metadata 批量寫入 Qdrant 的向量集合中。
  • write_batch():與清理後數據的邏輯類似,這個方法會接收批量的嵌入後資料據(items),並通過 item.save() 來序列化每條資料。然後將 idsvectorsmetadata 提取出來,最終寫入到 Qdrant 中的指定向量集合。

ref.


上一篇
[Day21] 結構化資料前處理:Pydantic 管理資料狀態
下一篇
[Day23] 結構化資料前處理:清理、分塊與嵌入
系列文
從系統設計切入,探索 GenAI 在企業中的實踐30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言