iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0
Rust

30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊系列 第 26

Day 26: 深入解析 Rust 與 CycloneDDS 的橋接利器 —— Cyclors

  • 分享至 

  • xImage
  •  

深入解析 Rust 與 CycloneDDS 的橋接利器 —— Cyclors

在我們先前的文章 Day 25: Zenoh 在機器人系統的應用全景 Part 2 - ROS 2 與 zenoh-plugin-ros2dds 中,曾介紹過 zenoh-plugin-ros2dds 這款強大工具。今天,我們將深入探討其核心元件——Cyclors,一個基於 Rust 的專案,負責提供對 CycloneDDS 的低階綁定。

核心架構:Cyclors 結合 CycloneDDS

Cyclors 基於 CycloneDDS,後者是一款高效且成熟的資料分發服務(DDS)標準實作。CycloneDDS 的強項包括:

  • 網路範圍自動發現:能自動尋找並識別網路上的發布者與訂閱者
  • 完善的服務品質(QoS)設定:支持多種可靠性、持久性及即時行為的配置
  • 型別安全保障:透過介面定義語言(IDL)加強型別檢查
  • 跨平台支援:適用於多種作業系統與硬體架構

Cyclors 用符合 Rust 風格的包裝器將這些特性呈現給開發者,不僅帶來記憶體安全及現代化寫法,也不犧牲底層 C 函式的高效能。

Cyclors 如何整合 CycloneDDS

Cyclors 定位為 CycloneDDS 的輕量 Rust 封裝層,提供以下關鍵功能:

1. FFI(外部函式接口)橋接架構

利用 Rust 的 FFI,Cyclors 安全地與 CycloneDDS 的 C API 互動:

  • 編譯時自動產生綁定:使用 bindgen 根據 CycloneDDS 標頭自動生成 Rust 綁定
  • 記憶體安全保障:借助 Rust 所有權系統,避免常見的 C 語言錯誤如使用後釋放與緩衝區溢位
  • 資源自動管理:透過 Rust 的 Drop trait 負責清理 C 端資源
  • 強型別保護:防止許多 C 系統中容易出錯的運行時問題

2. DDS 核心功能暴露

Cyclors 以安全的 Rust 抽象層完整展現 CycloneDDS 的功能:

  • 參與者(Participants):加入 DDS 域的主體
  • 主題(Topics):型別安全的主題定義與自動序列化/反序列化
  • 發布者與訂閱者:對 DDS 寫入器與讀取器的安全包裝
  • QoS 策略:完整 DDS 服務品質配置支援
  • 自動發現機制:跨網路自動偵測參與者與端點
Discovery Example Flow
──────────────────────

Application Start
        |
        v
dds_create_participant()
        |
        v
Setup C callback listener
        |
        v
dds_create_reader() for builtin topics
        |
        v
Process C callbacks in Rust channels

3. 可選購的 Iceoryx 本地通信整合

除了標準的 CycloneDDS 網路傳輸,Cyclors 還可以選擇整合 Eclipse Iceoryx,提供極致效能的本地(shared memory)通訊:

未啟用 Iceoryx:

  • 所有通訊透過 CycloneDDS 標準 UDP/TCP
  • 分布式系統中仍維持良好效能
  • 支援所有 CycloneDDS 支援的平台
  • 部署更簡單,依賴較少

啟用 Iceoryx(使用 iceoryx feature flag):

  • CycloneDDS 自動判斷同機發布者與訂閱者
  • 本地通訊透明地走共享記憶體路徑
  • 大資料傳輸可零拷貝(zero-copy)提升效率
  • 遠端仍走網路通信保持正常運作

零拷貝資料流水線詳解

下圖展示運用零拷貝技術傳送訊息的流程:

zero-copy


核心解析 Part 1:C層基礎 — Glue Code 與標頭檔

在使用 Rust 前,我們需先讓 C/C++ 函式庫能彼此協調,這正是 cyclocutwrapper.h 的定位。

wrapper.h:FFI 的入口檔案

bindgen 會從 C 的標頭檔自動產生 Rust 綁定,必須指定一個入口標頭檔,也就是 wrapper.h。它會將我們想要使用的所有 C 函式與型別一一引入:

// wrapper.h - Real implementation
#include <string.h>

#include <dds/dds.h>
#include <dds/ddsc/dds_loaned_sample.h>
#include <dds/ddsc/dds_psmx.h>
#include <dds/ddsi/ddsi_serdata.h>
#include <dds/ddsi/ddsi_typelib.h>

#include <cdds/cdds_util.h>

cyclocut:C層工具集

cyclocut 負責提供 CycloneDDS 功能強化的 C 函數工具,內容包括:

  • 自訂主題建立cdds_blob.c 支援為任意資料型態建立 blob 主題
  • 序列化處理器:客製序列化與反序列化的實作
  • PSMX 支援:跨平台共享記憶體傳輸擴展
  • 輔助函式:橋接 Rust 與 C API 的工具函數

該組件以靜態庫形式 (libcdds-util.a) 編譯,讓 Cyclors 能與 CycloneDDS 的進階功能(尤其是共享記憶體傳輸)無縫協同。


核心解析 Part 2:編譯期魔法 (build.rs)

Cargo 在編譯時執行的 build.rs 腳本,負責將 C/C++ 相依重組編譯並產生 Rust FFI 綁定。

// build.rs - Real implementation structure
fn main() {
    let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());

    // Check features and validate compatibility
    let iceoryx_enabled = is_iceoryx_enabled();
    let prefix_symbols_enabled = is_prefix_symbols_enabled();

    // Build Iceoryx (if enabled) - conditional compilation
    let mut iceoryx = PathBuf::new();
    if iceoryx_enabled {
        let iceoryx_src_dir = Path::new("iceoryx/iceoryx_meta");
        let iceoryx_out_dir = out_dir.join("iceoryx-build");
        iceoryx = build_iceoryx(iceoryx_src_dir, &iceoryx_out_dir);
    }

    // Build CycloneDDS with conditional Iceoryx support
    let cyclonedds_src_dir = prepare_cyclonedds_src("cyclonedds", &out_dir, &prefix);
    let cyclonedds_out_dir = out_dir.join("cyclonedds-build");
    let cyclonedds = build_cyclonedds(&cyclonedds_src_dir, &cyclonedds_out_dir, iceoryx.as_os_str());

    // Build cyclocut utilities
    let cyclocut_src_dir = Path::new("cyclocut");
    let cyclocut_out_dir = out_dir.join("cyclocut-build");
    let cyclocut = build_cyclocut(cyclocut_src_dir, &cyclocut_out_dir, &cyclonedds);

    // Generate Rust FFI bindings with conditional include paths
    let mut bindings = bindgen::Builder::default()
        .header("wrapper.h")
        .clang_arg(format!("-I{}", cyclonedds_include.to_str().unwrap()))
        .clang_arg(format!("-I{}", cyclocut_include.to_str().unwrap()))
        .generate_comments(false);

    // Handle cross-platform library linking
    let bindings = bindings.generate().expect("Unable to generate bindings");
    bindings.write_to_file(out_dir.join("bindings.rs")).expect("Couldn't write bindings!");
}

核心解析 Part 3:安全的 Rust 抽象層 (src/lib.rs)

src/lib.rs 負責在產生的 FFI 綁定之上構建安全且簡潔的 Rust 抽象,同時保有底層 CycloneDDS 功能的直接存取。

基本的 unsafe 綁定

設計中以極簡原則為主,避免過度抽象,聚焦於:

// src/lib.rs - Real implementation approach

// Constants for builtin topics
pub const DDS_BUILTIN_TOPIC_DCPSPARTICIPANT: dds_entity_t = (DDS_MIN_PSEUDO_HANDLE + 1);
pub const DDS_BUILTIN_TOPIC_DCPSPUBLICATION: dds_entity_t = (DDS_MIN_PSEUDO_HANDLE + 3);
pub const DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION: dds_entity_t = (DDS_MIN_PSEUDO_HANDLE + 4);

pub const DDS_DOMAIN_DEFAULT: u32 = 0xffffffff_u32;

// QoS management module
pub mod qos;

// Include generated bindings
#[allow(clippy::all)]
mod bindings {
    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
}
pub use bindings::*;

// Include additional wrapper functions
include!(concat!(env!("OUT_DIR"), "/functions.rs"));

此方法重點擺在高效能且直接 C API 存取,非打造高階封裝。

QoS 管理系統 (src/qos.rs)

Cyclors 在 QoS 層提供全面且型別安全的封裝,qos.rs 具體建構:

// src/qos.rs - Real QoS implementation
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct Qos {
    pub user_data: Option<Vec<u8>>,
    pub topic_data: Option<Vec<u8>>,
    pub durability: Option<Durability>,
    pub durability_service: Option<DurabilityService>,
    pub presentation: Option<Presentation>,
    pub deadline: Option<Deadline>,
    pub latency_budget: Option<LatencyBudget>,
    pub ownership: Option<Ownership>,
    pub liveliness: Option<Liveliness>,
    pub time_based_filter: Option<TimeBasedFilter>,
    pub partition: Option<Vec<String>>,
    pub reliability: Option<Reliability>,
    pub transport_priority: Option<TransportPriority>,
    pub lifespan: Option<Lifespan>,
    pub destination_order: Option<DestinationOrder>,
    pub history: Option<History>,
    pub resource_limits: Option<ResourceLimits>,
    pub writer_data_lifecycle: Option<WriterDataLifecycle>,
    // ... 20+ QoS policy types in total
}

此系統具有:

  • 雙向型別轉換:Rust 結構與 C QoS 物件
  • 序列化支援:方便管理與存儲配置
  • 型別安全保障:防止配置錯誤
  • 豐富的策略覆蓋:涵蓋多達 20 幾種 QoS 策略

安全包裝層示範

示範 Publisher

// In src/lib.rs

// The safe, high-level Publisher struct
pub struct Publisher {
    dds_writer: dds_entity_t,
    iox_publisher: iox_pub_t,
}

impl Publisher {
    // The constructor hides the unsafe initialization logic
    pub fn new(topic_name: &str) -> Result<Self, CyclorsError> {
        unsafe {
            // ... (unsafe code to initialize DDS and Iceoryx) ...
            Ok(Publisher { dds_writer, iox_publisher })
        }
    }

    // The publish method takes a safe Rust slice
    pub fn publish(&mut self, payload: &[u8]) -> Result<(), CyclorsError> {
        unsafe {
            // 1. Get a shared memory chunk from Iceoryx
            // 2. Copy the slice data into the shared memory
            // 3. Create our C-level "blob" to point to the chunk
            // 4. "Write" the blob pointer using the DDS API
        }
    }
}

// Implement the Drop trait to automatically clean up C resources!
impl Drop for Publisher {
    fn drop(&mut self) {
        unsafe {
            // ... (call C functions to destroy the dds_writer and iox_publisher) ...
        }
    }
}

該封裝實現資源取得即初始化(RAII),確保 Rust 物件釋放時,C 資源不遺漏,杜絕記憶體洩漏。


範例展示:發現機制(examples/disco.rs

examples/disco.rs 展示 Cyclors 實際應用狀況,直接利用 unsafe FFI 呼叫操作 CycloneDDS,演示 DDS 中發布與訂閱的發現機制:

實作

// examples/disco.rs - Real implementation
use cyclors::*;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;

#[derive(Debug)]
pub enum MatchedEntity {
    DiscoveredPublication {
        topic_name: String,
        type_name: String,
        partition: Option<String>,
        qos: Arc<*mut dds_qos_t>,
    },
    UndiscoveredPublication {
        topic_name: String,
        type_name: String,
        partition: Option<String>,
    },
    // Similar variants for subscriptions...
}

unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
    let btx = Box::from_raw(arg as *mut (bool, Sender<MatchedEntity>));

    // Complex manual sample handling with MaybeUninit
    let mut si = MaybeUninit::<[dds_sample_info_t; MAX_SAMPLES]>::uninit();
    let mut samples: [*mut ::std::os::raw::c_void; MAX_SAMPLES] = [std::ptr::null_mut(); MAX_SAMPLES];

    let n = dds_take(dr, samples.as_mut_ptr() as *mut *mut libc::c_void,
                     si.as_mut_ptr() as *mut dds_sample_info_t, MAX_SAMPLES, MAX_SAMPLES as u32);

    // Process each discovered endpoint...
    for i in 0..n {
        if si[i as usize].valid_data {
            let sample = samples[i as usize] as *mut dds_builtintopic_endpoint_t;
            let topic_name = CStr::from_ptr((*sample).topic_name).to_str().unwrap();
            let type_name = CStr::from_ptr((*sample).type_name).to_str().unwrap();
            // Extract QoS and partition information...
        }
    }
}

fn main() {
    unsafe {
        let (tx, rx) = channel();
        let dp = dds_create_participant(DDS_DOMAIN_DEFAULT, std::ptr::null(), std::ptr::null());

        // Set up discovery listeners for publications and subscriptions
        let pub_listener = dds_create_listener(Box::into_raw(ptx) as *mut std::os::raw::c_void);
        dds_lset_data_available(pub_listener, Some(on_data));

        let _pr = dds_create_reader(dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, std::ptr::null(), pub_listener);

        // Process discovered entities
        while let Ok(me) = rx.recv() {
            println!("{:?}", me);
        }
    }
}

小結

此範例彰顯 Cyclors 的實務風格:

  • 直接使用 FFI:原生、unsafe 的 C 函式調用
  • 手動管理記憶體:仔細處理 C 指標與樣本陣列
  • Rust 頻道橋接:將 C 回調事件傳遞至 Rust 程式碼
  • 複雜資料解析:手工讀取 C 結構與字串
  • 資源管理責任明確:由使用者控管 C 端資源週期

Cyclors 不透過笨重的高階封裝掩飾複雜度,而是提供可靠且高效的基礎工具,特別適合效能至上的系統。

在下一篇文章,我們將繼續探討 Zenoh 與 ROS 2 的更多應用,敬請期待!


上一篇
Day 25: Zenoh 在機器人系統的應用全景 Part 2 - ROS 2 與 zenoh-plugin-ros2dds
下一篇
Day 27: Zenoh 在機器人系統的應用全景 Part 3 - CARLA x Autoware x Zenoh 自駕車隊管理展示
系列文
30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言