iT邦幫忙

2024 iThome 鐵人賽

DAY 19
0
生成式 AI

從系統設計切入,探索 GenAI 在企業中的實踐系列 第 19

[Day19] CDC 實戰-MongoDB 變更流與 RabbitMQ 消息代理

  • 分享至 

  • xImage
  •  

以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫

在上一篇文章中,我們深入探討了 Change Data Capture (CDC) 的基本概念及其在保持數據一致性方面的重要性。今天,我們將進一步探討如何實際執行 Change Data Capture (CDC),並透過 MongoDB 和 RabbitMQ 這兩個工具來達成。今日會介紹 MongoDB 的操作日誌與變更流功能,以及如何使用 RabbitMQ 作為消息代理,實現高效且可擴展的 CDC 架構。

MongoDB 的 CDC 實踐

MongoDB 複製集與操作日誌

MongoDB 的複製集(Replication Set)是實現變更數據捕獲(Change Data Capture, CDC)的基礎。複製集包含主節點與多個從節點,並通過操作日誌(Oplog)記錄數據修改。這個日誌允許 MongoDB 監控變更事件,進而實現 CDC。相比多次查詢數據庫,複製集的變更流(Change Streams)功能能夠實時監控數據變更,提升運作效率。

變更流

而變更流則基於 MongoDB 的 Oplog,使用 watch() 方法實現對數據庫變更的監控。例如,我們可以監聽插入、更新和刪除等操作:

def watch_collection_changes(collection):
    change_stream = collection.watch()
    for change in change_stream:
        print(change)

也可以設定過濾條件,只捕捉插入和更新的事件:

pipeline = [{'$match': {'operationType': {'$in': ['insert', 'update']}}}]
change_stream = collection.watch(pipeline)

變更事件處理

MongoDB 的變更流能捕捉到多種類型的變更事件,包括插入(insert)、更新(update)、刪除(delete)、和替換(replace),也可以根據這些事件分別處理:

def handle_change(change):
    if change['operationType'] == 'insert':
        handle_insert(change['fullDocument'])
    elif change['operationType'] == 'update':
        handle_update(change['documentKey'], change['updateDescription'])
    elif change['operationType'] == 'delete':
        handle_delete(change['documentKey'])

MongoDB 連接與監控

課程中也撰寫了 MongoDB 的連接類,用以確保連接穩定並處理變更流,而寫法是這裡使用前幾天提到的單例模式,確保全局只會初始化一個 MongoClient:

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

from rag.settings import settings


class MongoDatabaseConnector:
    _instance: MongoClient = None

    def __new__(cls, *args, **kwargs):
        
        if cls._instance is None:
            try:
                cls._instance = MongoClient(settings.MONGO_DATABASE_HOST)
            except ConnectionFailure as e:
                print(f"Couldn't connect to the database: {str(e)}")
                raise

        print(f"Connection to database with uri: {settings.MONGO_DATABASE_HOST} successful")
        return cls._instance

    def get_database(self):
        return self._instance[settings.MONGO_DATABASE_NAME]

    def close(self):
        if self._instance:
            self._instance.close()
            print("Connected to database has been closed.")

connection = MongoDatabaseConnector()

進階延伸:異步處理和錯誤處理

在進階延伸,為了避免阻塞主線程,也可以將變更事件處理部分改為異步操作:

import asyncio

async def handle_change_async(change):
    await asyncio.sleep(0)  # 模擬異步處理
    if change['operationType'] == 'insert':
        handle_insert(change['fullDocument'])
    elif change['operationType'] == 'update':
        handle_update(change['documentKey'], change['updateDescription'])
    elif change['operationType'] == 'delete':
        handle_delete(change['documentKey'])

同時,也可以在監控變更流時加入錯誤處理,確保系統不會因為連接中斷或其他異常情況而崩潰:

def watch_collection_changes_with_error_handling(collection):
    try:
        change_stream = collection.watch()
        for change in change_stream:
            asyncio.run(handle_change_async(change))
    except Exception as e:
        print(f"Error occurred while watching changes: {str(e)}")

RabbitMQ 在 CDC 中的角色

RabbitMQ 核心功能

RabbitMQ 是一個開源的消息代理服務,基於高級消息隊列協議(AMQP)運行。提供消息傳遞和串流服務,可以簡易部署在雲端、本地環境,並廣泛用於分布式系統中其主要功能包括:

  • 消息隊列:存儲並轉發消息。
  • 發布/訂閱模型:允許多個消費者同時接收相同的消息。
  • 路由:根據規則將消息發送到不同的隊列,實現精確的消息分發。

RabbitMQ 在 CDC 架構中的作用

在 CDC 架構中,RabbitMQ 充當消息代理,負責管理 MongoDB 和其他數據處理服務之間的數據變更傳遞,主要提供以下幾個功能:

  • 解耦:將生產者(如 MongoDB)與消費者(如資料前處理流程)分離,降低系統間的依賴。
  • 緩衝:在生產者和消費者之間提供緩衝,允許資料大批湧入十進行批量處理。
  • 可靠性:確保在系統某部分失效的情況下,消息不會丟失,保證數據的完整性。
  • 異步處理:允許消費者以自己的速度處理消息,而不影響數據生產者的正常運行。

RabbitMQ 連接管理

pika 是一個要在 Python 中執行 RabbitMQ 需要的工具。
課程中也是透過單例模式撰寫 RabbitMQ 的連接:

class RabbitMQConnection:
    """Singleton class to manage RabbitMQ connection."""

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, host: str = None, port: int = None, username: str = None, password: str = None, virtual_host: str = "/"):
        self.host = host or settings.RABBITMQ_HOST
        self.port = port or settings.RABBITMQ_PORT
        self.username = username or settings.RABBITMQ_DEFAULT_USERNAME
        self.password = password or settings.RABBITMQ_DEFAULT_PASSWORD
        self.virtual_host = virtual_host
        self._connection = None

    def connect(self):
        try:
            credentials = pika.PlainCredentials(self.username, self.password)
            self._connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=credentials
                )
            )
            print("Connected to RabbitMQ successfully")
        except pika.exceptions.AMQPConnectionError as e:
            print("Failed to connect to RabbitMQ:", e)
            raise e

    def is_connected(self) -> bool:
        return self._connection is not None and self._connection.is_open

    def get_channel(self):
        if self.is_connected():
            return self._connection.channel()

    def close(self):
        if self.is_connected():
            self._connection.close()
            self._connection = None
            print("Closed RabbitMQ connection")

發布消息到 RabbitMQ

最後是透過下方的範例語法發布消息。
消息會被發送到指定的隊列,供消費者處理:

def publish_to_rabbitmq(queue_name: str, data: str):
    """Publish data to a RabbitMQ queue."""
    try:
        # Create an instance of RabbitMQConnection
        rabbitmq_conn = RabbitMQConnection()

        # Establish connection
        with rabbitmq_conn:
            channel = rabbitmq_conn.get_channel()

            # Ensure the queue exists
            channel.queue_declare(queue=queue_name, durable=True)

            # Delivery confirmation
            channel.confirm_delivery()

            # Send data to the queue
            channel.basic_publish(
                exchange="",
                routing_key=queue_name,
                body=data,
                properties=pika.BasicProperties(
                    delivery_mode=2,  # make message persistent
                ),
            )
            print("Sent data to RabbitMQ:", data)
    except pika.exceptions.UnroutableError:
        print("Message could not be routed")
    except Exception as e:
        print(f"Error publishing to RabbitMQ: {e}")

ref.


上一篇
[Day18] CDC 變更資料捕獲-保持數據一致性
下一篇
[Day20] CDC 實戰-Docker Compose 本地測試到 AWS 雲端部署
系列文
從系統設計切入,探索 GenAI 在企業中的實踐25
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言