以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫
在課程中,爬取的資料會同時以 RAG 和 fine-tune 兩種方式來提升 LLM 回應的表現,而無論是哪一個流程,都會涉及資料的處理流程。從原始資料到最終結果,資料會經歷多次狀態轉換,包括清理、分塊以及嵌入等步驟。在這些過程中,使用結構化的模型來管理資料狀態不僅能確保資料的一致性,還能有效地減少錯誤發生的機會。
接下來會針對這一天的課程,將核心概念整理成兩天的篇幅,而今天將深入探討如何運用 Pydantic 模型來管理資料狀態,並展示如何通過 分派層(dispatcher) 動態處理不同狀態的資料,以提高整體流程的可靠性與擴展性。
在資料流處理中,資料會經歷四個主要的狀態:原始資料(Raw)、清理後的資料(Cleaned)、分塊後的資料(Chunked)、以及嵌入後的資料(Embedded)。每個階段的資料具有不同的結構,並且會繼承兩個基礎模型:DataModel
和 DBDataModel
。
這些模型設計的核心目的是確保在不同階段中,資料的一致性與結構的清晰性,同時提供儲存該階段資料的方法。
在深入探討四個階段的資料模型之前,先來看兩個基礎模型:
DataModel
entry_id
和 type
,用於標識每筆資料的唯一 ID 及其類型。from abc import ABC
from pydantic import BaseModel
class DataModel(ABC, BaseModel):
"""
用於所有資料模型的抽象類。
"""
entry_id: int
type: str # 用來標示資料的類型
DBDataModel
DataModel
,並且加入了資料保存方法 save()
的抽象定義,適用於那些需要被序列化並保存到資料庫中的資料模型。from abc import ABC, abstractmethod
class DBDataModel(DataModel):
"""
用於需要被序列化,並保存到向量數據庫中的抽象類。
"""
@abstractmethod
def save(self):
pass
資料在四個主要階段會使用各自的 Pydantic 模型來管理結構和狀態。以下介紹每個階段的模型及其繼承關係。
PostsRawModel
原始資料(Raw) 是資料流的初始狀態。來自外部數據源(如 RabbitMQ)並未經過處理。在這個階段,資料還沒有被格式化或清理,僅僅是以原始形態存儲。
class PostsRawModel(DataModel):
platform: str
content: dict
author_id: str
image: Optional[str] = None
platform
:資料來源,如 "Twitter"。content
:貼文的具體內容,使用字典來儲存。author_id
:貼文的作者 ID。image
:圖片 URL,不一定要填寫。這個模型代表了數據流的初始形態,無需進行資料保存。
PostCleanedModel
經過清理後,資料轉換為**清理後的資料(Cleaned)**狀態。在這個階段,資料中的無效字符和雜訊被清除,並且格式化為可進行進一步處理的形態。由於這個階段的資料需要保存到資料庫,因此模型繼承了 DBDataModel
並實現了 save()
方法。
class PostCleanedModel(DBDataModel):
platform: str
cleaned_content: str
author_id: str
image: Optional[str] = None
def save(self) -> tuple:
data = {...}
return self.entry_id, data
PostCleanedModel
實現了 save()
方法,將清理後的資料保存至資料庫,並返回 entry_id
及清理後的資料。
PostChunkModel
在某些應用中,資料的文本過於龐大,因此需要將其分塊以便於處理,這就是**分塊後的資料(Chunked)**階段。在這裡,我們對資料進行切割,使每個分塊能夠獨立處理。這個模型繼承了 DataModel
,不需要實現 save()
方法。
class PostChunkModel(DataModel):
entry_id: str
platform: str
chunk_id: str
chunk_content: str
author_id: str
image: Optional[str] = None
type: str
chunk_id
:每個分塊的唯一 ID。chunk_content
:分塊後的具體文本內容。author_id
:作者 ID,與原始資料一致。image
:圖片 URL,與原始資料一致。這個模型代表了資料被分割為多個塊的狀態,方便後續處理。
PostEmbeddedChunkModel
最後一步,**嵌入後的資料(Embedded)**將文本轉換為嵌入向量,這些向量會被用於檢索、分類或其他機器學習任務。由於嵌入後的資料需要保存到向量資料庫,這個模型繼承了 DBDataModel
,並實現了 save()
方法。
class PostEmbeddedChunkModel(DBDataModel):
platform: str
chunk_id: str
embedded_content: np.ndarray
author_id: str
def save(self) -> tuple:
data = {...}
return self.chunk_id, self.embedded_content, data
embedded_content
:通過嵌入模型生成的數值向量(通常是 Numpy 陣列)。chunk_id
:對應於分塊資料中的 chunk_id,用來標識哪個分塊。author_id
:與原始資料保持一致。PostEmbeddedChunkModel
實現了 save()
方法,將嵌入後的向量數據保存至資料庫,並返回相關數據。
在資料處理管道中,資料的清理是一個至關重要的環節。為了實現靈活且擴展性強的資料清理邏輯,我們可以設計一個基於抽象類(abstract class) 、 具體處理器(concrete handler) 、 工廠模式(factory pattern) 和 分派層(dispatcher) 的架構。
這一章節將逐步介紹每個組件的角色和其相互之間的協作方式。
CleaningDataHandler
:抽象清理處理器類別首先,我們定義一個抽象基礎類 CleaningDataHandler
,它作為所有清理邏輯處理器的父類。這個類別包含一個抽象方法 clean()
,每個具體的清理處理器都需要實現該方法。
from abc import ABC, abstractmethod
class CleaningDataHandler(ABC):
"""
Abstract class for all cleaning data handlers.
"""
@abstractmethod
def clean(self, data_model: DataModel) -> DataModel:
pass
clean()
:該方法將根據具體資料模型的狀態進行清理,並返回清理後的資料模型。clean()
方法,這樣可以在不同資料類型之間統一清理邏輯的接口。PostCleaningHandler
:具體的清理處理器具體的資料類型清理邏輯會由繼承自 CleaningDataHandler
的處理器來實現。在這裡,以 PostCleaningHandler
為例,展示如何清理貼文資料(PostsRawModel
)並將其轉換為清理後的資料模型(PostCleanedModel
)。
class PostCleaningHandler(CleaningDataHandler):
def clean(self, data_model: PostsRawModel) -> PostCleanedModel:
return PostCleanedModel(
entry_id=data_model.entry_id,
platform=data_model.platform,
cleaned_content=clean_text("".join(data_model.content.values())),
author_id=data_model.author_id,
image=data_model.image if data_model.image else None,
type=data_model.type,
)
clean()
方法:該方法接受 PostsRawModel
類型的資料模型,對其內容進行清理(如移除無效字符),並返回一個新的 PostCleanedModel
。cleaned_content
,其他屬性如 platform
和 author_id
則保持原樣。CleaningHandlerFactory
:處理器工廠為了簡化清理邏輯處理器的選擇過程,我們引入工廠模式。CleaningHandlerFactory
根據資料的 type
屬性,動態生成並返回相應的清理處理器。
class CleaningHandlerFactory:
@staticmethod
def create_handler(data_type) -> CleaningDataHandler:
if data_type == "posts":
return PostCleaningHandler()
elif data_type == "articles":
return ArticleCleaningHandler()
elif data_type == "repositories":
return RepositoryCleaningHandler()
create_handler()
方法:根據資料的 type
,該方法會返回相應的處理器類別。例如,對於 type = "posts"
,將返回 PostCleaningHandler
;對於 type = "articles"
,將返回 ArticleCleaningHandler
。CleaningDispatcher
:清理邏輯分派層為了靈活處理不同類型的資料,我們設計了一個分派層(dispatcher),負責根據資料的 type
動態選擇合適的清理處理器。在這裡,CleaningDispatcher
根據資料類型進行分派,並調用相應的清理邏輯。
class CleaningDispatcher:
cleaning_factory = CleaningHandlerFactory()
@classmethod
def dispatch_cleaner(cls, data_model: DataModel) -> DataModel:
data_type = data_model.type
handler = cls.cleaning_factory.create_handler(data_type)
clean_model = handler.clean(data_model)
return clean_model
dispatch_cleaner()
方法:根據資料模型的 type
屬性,分派器會選擇合適的處理器,並調用處理器的 clean()
方法進行資料清理。ref.