iT邦幫忙

2024 iThome 鐵人賽

DAY 21
1
生成式 AI

從系統設計切入,探索 GenAI 在企業中的實踐系列 第 21

[Day21] 結構化資料前處理:Pydantic 管理資料狀態

  • 分享至 

  • xImage
  •  

以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫

在課程中,爬取的資料會同時以 RAG 和 fine-tune 兩種方式來提升 LLM 回應的表現,而無論是哪一個流程,都會涉及資料的處理流程。從原始資料到最終結果,資料會經歷多次狀態轉換,包括清理、分塊以及嵌入等步驟。在這些過程中,使用結構化的模型來管理資料狀態不僅能確保資料的一致性,還能有效地減少錯誤發生的機會。

接下來會針對這一天的課程,將核心概念整理成兩天的篇幅,而今天將深入探討如何運用 Pydantic 模型來管理資料狀態,並展示如何通過 分派層(dispatcher) 動態處理不同狀態的資料,以提高整體流程的可靠性與擴展性。

不同狀態下的資料模型

在資料流處理中,資料會經歷四個主要的狀態:原始資料(Raw)清理後的資料(Cleaned)分塊後的資料(Chunked)、以及嵌入後的資料(Embedded)。每個階段的資料具有不同的結構,並且會繼承兩個基礎模型:DataModelDBDataModel

這些模型設計的核心目的是確保在不同階段中,資料的一致性與結構的清晰性,同時提供儲存該階段資料的方法。

基礎模型設計

在深入探討四個階段的資料模型之前,先來看兩個基礎模型:

  1. DataModel
    是所有資料模型的基礎類,包含了所有資料的公共屬性如 entry_idtype,用於標識每筆資料的唯一 ID 及其類型。
from abc import ABC
from pydantic import BaseModel

class DataModel(ABC, BaseModel):
    """
    用於所有資料模型的抽象類。
    """
    entry_id: int
    type: str  # 用來標示資料的類型
  1. DBDataModel
    繼承自 DataModel,並且加入了資料保存方法 save() 的抽象定義,適用於那些需要被序列化並保存到資料庫中的資料模型。
from abc import ABC, abstractmethod

class DBDataModel(DataModel):
    """
    用於需要被序列化,並保存到向量數據庫中的抽象類。
    """
    @abstractmethod
    def save(self):
        pass

四個階段的資料模型

資料在四個主要階段會使用各自的 Pydantic 模型來管理結構和狀態。以下介紹每個階段的模型及其繼承關係。

1. 原始資料模型:PostsRawModel

原始資料(Raw) 是資料流的初始狀態。來自外部數據源(如 RabbitMQ)並未經過處理。在這個階段,資料還沒有被格式化或清理,僅僅是以原始形態存儲。

class PostsRawModel(DataModel):
    platform: str
    content: dict
    author_id: str
    image: Optional[str] = None
  • platform:資料來源,如 "Twitter"。
  • content:貼文的具體內容,使用字典來儲存。
  • author_id:貼文的作者 ID。
  • image:圖片 URL,不一定要填寫。

這個模型代表了數據流的初始形態,無需進行資料保存。

2. 清理後的資料模型:PostCleanedModel

經過清理後,資料轉換為**清理後的資料(Cleaned)**狀態。在這個階段,資料中的無效字符和雜訊被清除,並且格式化為可進行進一步處理的形態。由於這個階段的資料需要保存到資料庫,因此模型繼承了 DBDataModel 並實現了 save() 方法。

class PostCleanedModel(DBDataModel):
    platform: str
    cleaned_content: str
    author_id: str
    image: Optional[str] = None

    def save(self) -> tuple:
        data = {...}
        
        return self.entry_id, data
  • cleaned_content:經過清理的文本內容。
  • author_id:作者 ID,與原始資料一致。
  • image:圖片 URL,與原始資料一致。

PostCleanedModel 實現了 save() 方法,將清理後的資料保存至資料庫,並返回 entry_id 及清理後的資料。

3. 分塊後的資料模型:PostChunkModel

在某些應用中,資料的文本過於龐大,因此需要將其分塊以便於處理,這就是**分塊後的資料(Chunked)**階段。在這裡,我們對資料進行切割,使每個分塊能夠獨立處理。這個模型繼承了 DataModel,不需要實現 save() 方法。

class PostChunkModel(DataModel):
    entry_id: str
    platform: str
    chunk_id: str
    chunk_content: str
    author_id: str
    image: Optional[str] = None
    type: str
  • chunk_id:每個分塊的唯一 ID。
  • chunk_content:分塊後的具體文本內容。
  • author_id:作者 ID,與原始資料一致。
  • image:圖片 URL,與原始資料一致。

這個模型代表了資料被分割為多個塊的狀態,方便後續處理。

4. 嵌入後的資料模型:PostEmbeddedChunkModel

最後一步,**嵌入後的資料(Embedded)**將文本轉換為嵌入向量,這些向量會被用於檢索、分類或其他機器學習任務。由於嵌入後的資料需要保存到向量資料庫,這個模型繼承了 DBDataModel,並實現了 save() 方法。

class PostEmbeddedChunkModel(DBDataModel):
    platform: str
    chunk_id: str
    embedded_content: np.ndarray
    author_id: str

    def save(self) -> tuple:
        data = {...}
        
        return self.chunk_id, self.embedded_content, data
  • embedded_content:通過嵌入模型生成的數值向量(通常是 Numpy 陣列)。
  • chunk_id:對應於分塊資料中的 chunk_id,用來標識哪個分塊。
  • author_id:與原始資料保持一致。

PostEmbeddedChunkModel 實現了 save() 方法,將嵌入後的向量數據保存至資料庫,並返回相關數據。

資料清理邏輯:處理器與分派層的設計

在資料處理管道中,資料的清理是一個至關重要的環節。為了實現靈活且擴展性強的資料清理邏輯,我們可以設計一個基於抽象類(abstract class)具體處理器(concrete handler)工廠模式(factory pattern)分派層(dispatcher) 的架構。

這一章節將逐步介紹每個組件的角色和其相互之間的協作方式。

1. CleaningDataHandler:抽象清理處理器類別

首先,我們定義一個抽象基礎類 CleaningDataHandler,它作為所有清理邏輯處理器的父類。這個類別包含一個抽象方法 clean(),每個具體的清理處理器都需要實現該方法。

from abc import ABC, abstractmethod

class CleaningDataHandler(ABC):
    """
    Abstract class for all cleaning data handlers.
    """
    @abstractmethod 
    def clean(self, data_model: DataModel) -> DataModel:
        pass
  • clean():該方法將根據具體資料模型的狀態進行清理,並返回清理後的資料模型。
  • 抽象類的設計使得我們能夠確保所有子類別都必須實現 clean() 方法,這樣可以在不同資料類型之間統一清理邏輯的接口。

2. PostCleaningHandler:具體的清理處理器

具體的資料類型清理邏輯會由繼承自 CleaningDataHandler 的處理器來實現。在這裡,以 PostCleaningHandler 為例,展示如何清理貼文資料(PostsRawModel)並將其轉換為清理後的資料模型(PostCleanedModel)。

class PostCleaningHandler(CleaningDataHandler):
    def clean(self, data_model: PostsRawModel) -> PostCleanedModel:
        return PostCleanedModel(
            entry_id=data_model.entry_id,
            platform=data_model.platform,
            cleaned_content=clean_text("".join(data_model.content.values())),
            author_id=data_model.author_id,
            image=data_model.image if data_model.image else None,
            type=data_model.type,
        )


  • clean() 方法:該方法接受 PostsRawModel 類型的資料模型,對其內容進行清理(如移除無效字符),並返回一個新的 PostCleanedModel
  • 貼文內容被清理,最終生成了 cleaned_content,其他屬性如 platformauthor_id 則保持原樣。

3. CleaningHandlerFactory:處理器工廠

為了簡化清理邏輯處理器的選擇過程,我們引入工廠模式CleaningHandlerFactory 根據資料的 type 屬性,動態生成並返回相應的清理處理器。

class CleaningHandlerFactory:
    @staticmethod
    def create_handler(data_type) -> CleaningDataHandler:
        if data_type == "posts":
            return PostCleaningHandler()
        elif data_type == "articles":
            return ArticleCleaningHandler()
        elif data_type == "repositories":
            return RepositoryCleaningHandler()
  • create_handler() 方法:根據資料的 type,該方法會返回相應的處理器類別。例如,對於 type = "posts",將返回 PostCleaningHandler;對於 type = "articles",將返回 ArticleCleaningHandler
  • 工廠模式允許我們輕鬆地擴展更多處理器類型,只需在方法中添加新的分支即可,無需修改核心邏輯。

4. CleaningDispatcher:清理邏輯分派層

為了靈活處理不同類型的資料,我們設計了一個分派層(dispatcher),負責根據資料的 type 動態選擇合適的清理處理器。在這裡,CleaningDispatcher 根據資料類型進行分派,並調用相應的清理邏輯。

class CleaningDispatcher:
    cleaning_factory = CleaningHandlerFactory()

    @classmethod
    def dispatch_cleaner(cls, data_model: DataModel) -> DataModel:
        data_type = data_model.type
        handler = cls.cleaning_factory.create_handler(data_type)
        clean_model = handler.clean(data_model)
        return clean_model
  • dispatch_cleaner() 方法:根據資料模型的 type 屬性,分派器會選擇合適的處理器,並調用處理器的 clean() 方法進行資料清理。
  • 這個分派層使得我們可以根據資料類型動態選擇清理邏輯,並可擴展更多類型的處理器。

ref.


上一篇
[Day20] CDC 實戰-Docker Compose 本地測試到 AWS 雲端部署
下一篇
[Day22] 結構化資料前處理:架設從 RabbitMQ 到 Qdrant 向量儲存的資料流
系列文
從系統設計切入,探索 GenAI 在企業中的實踐25
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言