iT邦幫忙

2

pg-walstream:用 Rust 打造高效能的 PostgreSQL WAL Streaming

  • 分享至 

  • xImage
  •  

在資料同步、CDC(Change Data Capture)、事件驅動系統中,「如何即時取得 PostgreSQL 的資料變更」是一個非常核心的問題。pg-walstream 是一個使用 Rust 實作的 PostgreSQL WAL(Write-Ahead Log)Streaming 函式庫,透過 PostgreSQL Logical Replication Protocol,讓開發者可以直接串流並解析資料庫變更事件,作為自訂 CDC 或資料同步系統的底層元件。

專案背景與用途

PostgreSQL 提供 Logical Replication 機制,能將 WAL 中的資料變更以結構化事件的方式輸出,pg-walstream 的目標是:

  • 提供 Rust 原生 的 WAL streaming library
  • 直接實作 PostgreSQL Logical Replication protocol
  • 可嵌入到任何 async Rust 應用中
  • 作為 CDC、資料同步、事件系統的基礎 building block

適合以下場景:

  • 自建 CDC pipeline(不使用 Debezium)
  • PostgreSQL → Kafka / Redis / EventBus
  • 即時資料同步、資料湖 ingestion
  • 高效能、低延遲的資料變更監聽服務

核心特色

pg-walstream 專注於「穩定、高效、可控」:

  • 支援 PostgreSQL Logical Replication protocol v1 ~ v4
  • 使用 async/await,與 Tokio runtime 完整整合
  • Zero-copy buffer(基於 bytes crate)
  • 內建 LSN(Log Sequence Number)追蹤與回報
  • 可自訂 retry / reconnect 策略

架構概念簡介

pg-walstream 並不是一個「完整 CDC 平台」,而是專注於 WAL streaming 的核心邏輯:

我另一個專案 pg2any 專注完成 CDC 平台, 也是使用這個當作 replication protocal library

Architecture

┌─────────────────────────────────────────┐
│         Application Layer               │
│  (Your CDC / Replication Logic)        │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│    LogicalReplicationStream             │
│  - Connection management                │
│  - Event processing                     │
│  - LSN feedback                         │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│  LogicalReplicationParser               │
│  - Protocol parsing                     │
│  - Message deserialization              │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│     BufferReader / BufferWriter         │
│  - Zero-copy operations                 │
│  - Binary protocol handling             │
└─────────────────────────────────────────┘

你可以在「Your application」這一層:

  • 將事件轉成 JSON
  • 發送到 Kafka / Pulsar
  • 同步到另一個資料庫
  • 寫入 data lake

安裝方式

Cargo.toml 中加入:

[dependencies]
pg_walstream = "0.3.0"

系統層級需要 PostgreSQL client library

# Ubuntu / Debian
sudo apt install libpq-dev clang libclang-dev

# RHEL / CentOS / Fedora
sudo dnf install postgresql-devel

PostgreSQL 前置設定

Logical Replication 需要 PostgreSQL 層級設定:

wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

建立 publication 與 replication user:

CREATE PUBLICATION my_publication FOR ALL TABLES;

CREATE USER replication_user
  WITH REPLICATION
  PASSWORD 'secure_password';

GRANT SELECT ON ALL TABLES IN SCHEMA public
  TO replication_user;

使用範例:建立 WAL Streaming Client

以下是一個最小可運作的 async 範例,示範如何使用 pg-walstream 連線並接收 WAL 事件。

use futures::stream::{self, StreamExt};
use pg_walstream::{
    CancellationToken, LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
};
use std::env;
use std::time::Duration;
use tracing::{error, info, Level};
use tracing_subscriber;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize logging
    tracing_subscriber::fmt()
        .with_max_level(Level::INFO)
        .with_target(false)
        .init();

    info!("Starting PostgreSQL WAL streaming example");

    // Get connection string from environment or use default
    let connection_string = env::var("DATABASE_URL").unwrap_or_else(|_| {
        "postgresql://postgres:password@localhost:5432/postgres?replication=database".to_string()
    });

    info!("Connection string: {}", mask_password(&connection_string));

    // Configure the replication stream
    let config = ReplicationStreamConfig::new(
        "example_slot".to_string(),   // Replication slot name
        "my_publication".to_string(), // Publication name (must exist)
        2,                            // Protocol version (2 supports streaming)
        true,                         // Enable streaming of large transactions
        Duration::from_secs(10),      // Send feedback every 10 seconds
        Duration::from_secs(30),      // Connection timeout
        Duration::from_secs(60),      // Health check interval
        RetryConfig::default(),       // Use default retry strategy
    );

    info!("Creating replication stream...");

    // Create and initialize the stream
    let mut stream = LogicalReplicationStream::new(&connection_string, config).await?;

    info!("Stream created successfully");

    // Start replication from the latest position (None = latest)
    stream.start(None).await?;

    info!("Replication started successfully");
    info!("Listening for changes... (Press Ctrl+C to stop)");
    info!("You can now make changes to your database tables to see events");

    // Create cancellation token for graceful shutdown
    let cancel_token = CancellationToken::new();
    let cancel_token_clone = cancel_token.clone();

    // Setup Ctrl+C handler
    tokio::spawn(async move {
        tokio::signal::ctrl_c()
            .await
            .expect("Failed to listen for ctrl-c");
        info!("Received shutdown signal, cleaning up...");
        cancel_token_clone.cancel();
    });

    // Convert to EventStream
    let event_stream = stream.into_stream(cancel_token);

    // Wrap with futures::stream::unfold to get a proper futures::Stream
    // This allows us to use stream combinators!
    // Use Box::pin to pin the stream on the heap so we can reuse it
    let mut pg_stream = Box::pin(stream::unfold(
        event_stream,
        |mut event_stream| async move {
            match event_stream.next().await {
                Ok(event) => {
                    // Update applied LSN after successful event retrieval
                    event_stream.update_applied_lsn(event.lsn.value());
                    Some((Ok(event), event_stream))
                }
                Err(e) => {
                    // Return error and stop the stream
                    Some((Err(e), event_stream))
                }
            }
        },
    ));

    // Now we can use stream combinators!
    info!("Using futures::Stream combinators for advanced processing");

    while let Some(result) = pg_stream.as_mut().next().await {
        match result {
            Ok(event) => {
                info!("Received event: {:?}", event);
            }
            Err(e) => {
                error!("Error: {}", e);
                break;
            }
        }
    }

    info!("Graceful shutdown complete");

    Ok(())
}

/// Mask password in connection string for logging
fn mask_password(conn_str: &str) -> String {
    if let Some(proto_end) = conn_str.find("://") {
        let proto = &conn_str[..proto_end + 3]; // e.g., "postgresql://"
        let rest = &conn_str[proto_end + 3..];

        if let Some(at_pos) = rest.find('@') {
            let credentials = &rest[..at_pos];
            let after_at = &rest[at_pos..];

            if let Some(colon_pos) = credentials.find(':') {
                let user = &credentials[..colon_pos];
                return format!("{}{}:****{}", proto, user, after_at);
            }
        }
    }
    conn_str.to_string()
}

更多 example 可以參考 https://github.com/isdaniel/pg-walstream/tree/main/examples

總結

pg-walstream 提供了一個乾淨、可擴展的 Rust API,讓開發者可以直接使用 PostgreSQL Logical Replication

如果你正在:

  • 用 Rust 開發資料基礎設施
  • 想深入理解 PostgreSQL WAL / replication protocol
  • 或需要一個可嵌入的 CDC 核心元件

pg-walstream 會是一個非常適合的起點。

專案連結:
https://github.com/isdaniel/pg-walstream


圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言