以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫
在上一篇文章中,我們介紹了 Change Data Capture (CDC) 的基本概念。今天,我們將進一步探討如何實際執行 CDC,並透過 MongoDB 和 RabbitMQ 這兩個工具來達成。今天的重點將放在 MongoDB 的操作日誌與變更流功能,以及 RabbitMQ 作為消息代理的應用。
MongoDB 的複製集(Replication Set)是實現 CDC 的基礎。複製集包含主節點與多個從節點,並通過**操作日誌(Oplog)記錄數據修改。這個日誌允許 MongoDB 監控變更事件,進而實現 CDC。相比多次查詢資料庫,複製集的變更流(Change Streams)**功能能夠實時監控資料變更,提升運作效率。
而變更流則基於 MongoDB 的 Oplog,使用 watch()
方法實現對資料庫變更的監控。例如,我們可以監聽插入、更新和刪除等操作:
def watch_collection_changes(collection):
change_stream = collection.watch()
for change in change_stream:
print(change)
也可以設定過濾條件,只捕捉插入和更新的事件:
pipeline = [{'$match': {'operationType': {'$in': ['insert', 'update']}}}]
change_stream = collection.watch(pipeline)
MongoDB 的變更流能捕捉到多種類型的變更事件,包括插入(insert)、更新(update)、刪除(delete)、和替換(replace),也可以根據這些事件分別處理:
def handle_change(change):
if change['operationType'] == 'insert':
handle_insert(change['fullDocument'])
elif change['operationType'] == 'update':
handle_update(change['documentKey'], change['updateDescription'])
elif change['operationType'] == 'delete':
handle_delete(change['documentKey'])
課程中也撰寫了 MongoDB 的連接類,用以確保連接穩定並處理變更流,而寫法是這裡使用前幾天提到的單例模式,確保全局只會初始化一個 MongoClient:
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from rag.settings import settings
class MongoDatabaseConnector:
_instance: MongoClient = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
try:
cls._instance = MongoClient(settings.MONGO_DATABASE_HOST)
except ConnectionFailure as e:
print(f"Couldn't connect to the database: {str(e)}")
raise
print(f"Connection to database with uri: {settings.MONGO_DATABASE_HOST} successful")
return cls._instance
def get_database(self):
return self._instance[settings.MONGO_DATABASE_NAME]
def close(self):
if self._instance:
self._instance.close()
print("Connected to database has been closed.")
connection = MongoDatabaseConnector()
為了避免阻塞主線程,可以將變更事件處理部分改為異步操作:
import asyncio
async def handle_change_async(change):
await asyncio.sleep(0) # 模擬異步處理
if change['operationType'] == 'insert':
handle_insert(change['fullDocument'])
elif change['operationType'] == 'update':
handle_update(change['documentKey'], change['updateDescription'])
elif change['operationType'] == 'delete':
handle_delete(change['documentKey'])
同時,也可以加入錯誤處理機制,確保在連接中斷或其他異常情況下系統能夠繼續運行:
def watch_collection_changes_with_error_handling(collection):
try:
change_stream = collection.watch()
for change in change_stream:
asyncio.run(handle_change_async(change))
except Exception as e:
print(f"Error occurred while watching changes: {str(e)}")
RabbitMQ 是一個開源的消息代理服務,基於**高級消息隊列協議(AMQP)**運行,能夠在雲端或本地環境簡易部署,並廣泛應用於分佈式系統。其主要功能包括:
在 CDC 架構中,RabbitMQ 充當消息代理,負責管理 MongoDB 和其他資料處理服務之間的資料變更傳輸,主要提供以下幾個功能:
pika
是一個要在 Python 中執行 RabbitMQ 需要的工具。
課程中也是透過單例模式撰寫 RabbitMQ 的連接:
class RabbitMQConnection:
"""Singleton class to manage RabbitMQ connection."""
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, host: str = None, port: int = None, username: str = None, password: str = None, virtual_host: str = "/"):
self.host = host or settings.RABBITMQ_HOST
self.port = port or settings.RABBITMQ_PORT
self.username = username or settings.RABBITMQ_DEFAULT_USERNAME
self.password = password or settings.RABBITMQ_DEFAULT_PASSWORD
self.virtual_host = virtual_host
self._connection = None
def connect(self):
try:
credentials = pika.PlainCredentials(self.username, self.password)
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=credentials
)
)
print("Connected to RabbitMQ successfully")
except pika.exceptions.AMQPConnectionError as e:
print("Failed to connect to RabbitMQ:", e)
raise e
def is_connected(self) -> bool:
return self._connection is not None and self._connection.is_open
def get_channel(self):
if self.is_connected():
return self._connection.channel()
def close(self):
if self.is_connected():
self._connection.close()
self._connection = None
print("Closed RabbitMQ connection")
最後是透過下方的範例語法發布消息。
消息會被發送到指定的隊列,供消費者處理:
def publish_to_rabbitmq(queue_name: str, data: str):
"""Publish data to a RabbitMQ queue."""
try:
# Create an instance of RabbitMQConnection
rabbitmq_conn = RabbitMQConnection()
# Establish connection
with rabbitmq_conn:
channel = rabbitmq_conn.get_channel()
# Ensure the queue exists
channel.queue_declare(queue=queue_name, durable=True)
# Delivery confirmation
channel.confirm_delivery()
# Send data to the queue
channel.basic_publish(
exchange="",
routing_key=queue_name,
body=data,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
)
print("Sent data to RabbitMQ:", data)
except pika.exceptions.UnroutableError:
print("Message could not be routed")
except Exception as e:
print(f"Error publishing to RabbitMQ: {e}")
ref.