在我們先前的文章 Day 25: Zenoh 在機器人系統的應用全景 Part 2 - ROS 2 與 zenoh-plugin-ros2dds 中,曾介紹過 zenoh-plugin-ros2dds 這款強大工具。今天,我們將深入探討其核心元件——Cyclors,一個基於 Rust 的專案,負責提供對 CycloneDDS 的低階綁定。
Cyclors 基於 CycloneDDS,後者是一款高效且成熟的資料分發服務(DDS)標準實作。CycloneDDS 的強項包括:
Cyclors 用符合 Rust 風格的包裝器將這些特性呈現給開發者,不僅帶來記憶體安全及現代化寫法,也不犧牲底層 C 函式的高效能。
Cyclors 定位為 CycloneDDS 的輕量 Rust 封裝層,提供以下關鍵功能:
利用 Rust 的 FFI,Cyclors 安全地與 CycloneDDS 的 C API 互動:
bindgen
根據 CycloneDDS 標頭自動生成 Rust 綁定Drop
trait 負責清理 C 端資源Cyclors 以安全的 Rust 抽象層完整展現 CycloneDDS 的功能:
Discovery Example Flow
──────────────────────
Application Start
|
v
dds_create_participant()
|
v
Setup C callback listener
|
v
dds_create_reader() for builtin topics
|
v
Process C callbacks in Rust channels
除了標準的 CycloneDDS 網路傳輸,Cyclors 還可以選擇整合 Eclipse Iceoryx,提供極致效能的本地(shared memory)通訊:
未啟用 Iceoryx:
啟用 Iceoryx(使用 iceoryx
feature flag):
下圖展示運用零拷貝技術傳送訊息的流程:
在使用 Rust 前,我們需先讓 C/C++ 函式庫能彼此協調,這正是 cyclocut
與 wrapper.h
的定位。
wrapper.h
:FFI 的入口檔案bindgen
會從 C 的標頭檔自動產生 Rust 綁定,必須指定一個入口標頭檔,也就是 wrapper.h
。它會將我們想要使用的所有 C 函式與型別一一引入:
// wrapper.h - Real implementation
#include <string.h>
#include <dds/dds.h>
#include <dds/ddsc/dds_loaned_sample.h>
#include <dds/ddsc/dds_psmx.h>
#include <dds/ddsi/ddsi_serdata.h>
#include <dds/ddsi/ddsi_typelib.h>
#include <cdds/cdds_util.h>
cyclocut
:C層工具集cyclocut
負責提供 CycloneDDS 功能強化的 C 函數工具,內容包括:
cdds_blob.c
支援為任意資料型態建立 blob 主題該組件以靜態庫形式 (libcdds-util.a
) 編譯,讓 Cyclors 能與 CycloneDDS 的進階功能(尤其是共享記憶體傳輸)無縫協同。
build.rs
)Cargo 在編譯時執行的 build.rs
腳本,負責將 C/C++ 相依重組編譯並產生 Rust FFI 綁定。
// build.rs - Real implementation structure
fn main() {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
// Check features and validate compatibility
let iceoryx_enabled = is_iceoryx_enabled();
let prefix_symbols_enabled = is_prefix_symbols_enabled();
// Build Iceoryx (if enabled) - conditional compilation
let mut iceoryx = PathBuf::new();
if iceoryx_enabled {
let iceoryx_src_dir = Path::new("iceoryx/iceoryx_meta");
let iceoryx_out_dir = out_dir.join("iceoryx-build");
iceoryx = build_iceoryx(iceoryx_src_dir, &iceoryx_out_dir);
}
// Build CycloneDDS with conditional Iceoryx support
let cyclonedds_src_dir = prepare_cyclonedds_src("cyclonedds", &out_dir, &prefix);
let cyclonedds_out_dir = out_dir.join("cyclonedds-build");
let cyclonedds = build_cyclonedds(&cyclonedds_src_dir, &cyclonedds_out_dir, iceoryx.as_os_str());
// Build cyclocut utilities
let cyclocut_src_dir = Path::new("cyclocut");
let cyclocut_out_dir = out_dir.join("cyclocut-build");
let cyclocut = build_cyclocut(cyclocut_src_dir, &cyclocut_out_dir, &cyclonedds);
// Generate Rust FFI bindings with conditional include paths
let mut bindings = bindgen::Builder::default()
.header("wrapper.h")
.clang_arg(format!("-I{}", cyclonedds_include.to_str().unwrap()))
.clang_arg(format!("-I{}", cyclocut_include.to_str().unwrap()))
.generate_comments(false);
// Handle cross-platform library linking
let bindings = bindings.generate().expect("Unable to generate bindings");
bindings.write_to_file(out_dir.join("bindings.rs")).expect("Couldn't write bindings!");
}
src/lib.rs
)src/lib.rs
負責在產生的 FFI 綁定之上構建安全且簡潔的 Rust 抽象,同時保有底層 CycloneDDS 功能的直接存取。
unsafe
綁定設計中以極簡原則為主,避免過度抽象,聚焦於:
// src/lib.rs - Real implementation approach
// Constants for builtin topics
pub const DDS_BUILTIN_TOPIC_DCPSPARTICIPANT: dds_entity_t = (DDS_MIN_PSEUDO_HANDLE + 1);
pub const DDS_BUILTIN_TOPIC_DCPSPUBLICATION: dds_entity_t = (DDS_MIN_PSEUDO_HANDLE + 3);
pub const DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION: dds_entity_t = (DDS_MIN_PSEUDO_HANDLE + 4);
pub const DDS_DOMAIN_DEFAULT: u32 = 0xffffffff_u32;
// QoS management module
pub mod qos;
// Include generated bindings
#[allow(clippy::all)]
mod bindings {
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
}
pub use bindings::*;
// Include additional wrapper functions
include!(concat!(env!("OUT_DIR"), "/functions.rs"));
此方法重點擺在高效能且直接 C API 存取,非打造高階封裝。
src/qos.rs
)Cyclors 在 QoS 層提供全面且型別安全的封裝,qos.rs
具體建構:
// src/qos.rs - Real QoS implementation
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct Qos {
pub user_data: Option<Vec<u8>>,
pub topic_data: Option<Vec<u8>>,
pub durability: Option<Durability>,
pub durability_service: Option<DurabilityService>,
pub presentation: Option<Presentation>,
pub deadline: Option<Deadline>,
pub latency_budget: Option<LatencyBudget>,
pub ownership: Option<Ownership>,
pub liveliness: Option<Liveliness>,
pub time_based_filter: Option<TimeBasedFilter>,
pub partition: Option<Vec<String>>,
pub reliability: Option<Reliability>,
pub transport_priority: Option<TransportPriority>,
pub lifespan: Option<Lifespan>,
pub destination_order: Option<DestinationOrder>,
pub history: Option<History>,
pub resource_limits: Option<ResourceLimits>,
pub writer_data_lifecycle: Option<WriterDataLifecycle>,
// ... 20+ QoS policy types in total
}
此系統具有:
示範 Publisher
:
// In src/lib.rs
// The safe, high-level Publisher struct
pub struct Publisher {
dds_writer: dds_entity_t,
iox_publisher: iox_pub_t,
}
impl Publisher {
// The constructor hides the unsafe initialization logic
pub fn new(topic_name: &str) -> Result<Self, CyclorsError> {
unsafe {
// ... (unsafe code to initialize DDS and Iceoryx) ...
Ok(Publisher { dds_writer, iox_publisher })
}
}
// The publish method takes a safe Rust slice
pub fn publish(&mut self, payload: &[u8]) -> Result<(), CyclorsError> {
unsafe {
// 1. Get a shared memory chunk from Iceoryx
// 2. Copy the slice data into the shared memory
// 3. Create our C-level "blob" to point to the chunk
// 4. "Write" the blob pointer using the DDS API
}
}
}
// Implement the Drop trait to automatically clean up C resources!
impl Drop for Publisher {
fn drop(&mut self) {
unsafe {
// ... (call C functions to destroy the dds_writer and iox_publisher) ...
}
}
}
該封裝實現資源取得即初始化(RAII),確保 Rust 物件釋放時,C 資源不遺漏,杜絕記憶體洩漏。
examples/disco.rs
)examples/disco.rs
展示 Cyclors 實際應用狀況,直接利用 unsafe
FFI 呼叫操作 CycloneDDS,演示 DDS 中發布與訂閱的發現機制:
// examples/disco.rs - Real implementation
use cyclors::*;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
#[derive(Debug)]
pub enum MatchedEntity {
DiscoveredPublication {
topic_name: String,
type_name: String,
partition: Option<String>,
qos: Arc<*mut dds_qos_t>,
},
UndiscoveredPublication {
topic_name: String,
type_name: String,
partition: Option<String>,
},
// Similar variants for subscriptions...
}
unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let btx = Box::from_raw(arg as *mut (bool, Sender<MatchedEntity>));
// Complex manual sample handling with MaybeUninit
let mut si = MaybeUninit::<[dds_sample_info_t; MAX_SAMPLES]>::uninit();
let mut samples: [*mut ::std::os::raw::c_void; MAX_SAMPLES] = [std::ptr::null_mut(); MAX_SAMPLES];
let n = dds_take(dr, samples.as_mut_ptr() as *mut *mut libc::c_void,
si.as_mut_ptr() as *mut dds_sample_info_t, MAX_SAMPLES, MAX_SAMPLES as u32);
// Process each discovered endpoint...
for i in 0..n {
if si[i as usize].valid_data {
let sample = samples[i as usize] as *mut dds_builtintopic_endpoint_t;
let topic_name = CStr::from_ptr((*sample).topic_name).to_str().unwrap();
let type_name = CStr::from_ptr((*sample).type_name).to_str().unwrap();
// Extract QoS and partition information...
}
}
}
fn main() {
unsafe {
let (tx, rx) = channel();
let dp = dds_create_participant(DDS_DOMAIN_DEFAULT, std::ptr::null(), std::ptr::null());
// Set up discovery listeners for publications and subscriptions
let pub_listener = dds_create_listener(Box::into_raw(ptx) as *mut std::os::raw::c_void);
dds_lset_data_available(pub_listener, Some(on_data));
let _pr = dds_create_reader(dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, std::ptr::null(), pub_listener);
// Process discovered entities
while let Ok(me) = rx.recv() {
println!("{:?}", me);
}
}
}
此範例彰顯 Cyclors 的實務風格:
unsafe
的 C 函式調用Cyclors 不透過笨重的高階封裝掩飾複雜度,而是提供可靠且高效的基礎工具,特別適合效能至上的系統。
在下一篇文章,我們將繼續探討 Zenoh 與 ROS 2 的更多應用,敬請期待!