深入探討 zenoh-plugin-ros2dds
、其 Rust 架構,以及如何解決常見的 ROS 2 通信挑戰。
機器人作業系統 2(ROS 2)是用於構建現代機器人應用的頂級開源框架。它是對原始 ROS 的全新設計,從零開始滿足商業和工業機器人的需求:即時性能、增強的安全性與多機器人系統支持。
ROS 2 的核心是一套靈活的通信系統,允許機器人軟體中的不同部分(稱為「節點」)通過主題(topics)、服務(services)和動作(actions)交換資料。這得益於 ROS 中介軟體介面(RMW),這是一層抽象,讓開發者可以替換底層的通信技術。預設且最常見的技術是資料分發服務(DDS)。
DDS 是一種健全且業界標準的中介軟體協定,專為即時、可靠與可擴展的資料交換設計。在單一穩定的區域網路(LAN)環境中,DDS 是通訊的絕佳選擇。其關鍵特色包括:
然而,DDS 在 LAN 環境中的優勢,在現代機器人推向極限時也會產生挑戰。
DDS 的發現機制(主要基於 DDSI-RTPS 協定)過度依賴多重廣播(multicast)公告。每個節點定時向整個網路發送「我在這裡!」的訊號,尋找其他節點。在受控環境下此機制有效,但在複雜場景中會引發幾個問題:
這些挑戰對 ROS 2 的預設功能形成限制。如何打造真正分布式、互聯網規模的機器人系統?
這正是 Zenoh 設計用來解決的問題。
Zenoh 是為分布式應用下一時代所設計的發布/訂閱/查詢協定。它極輕量、高效,能運行於從微控制器到資料中心的所有裝置。透過統一處理動態資料、靜態資料和運算,Zenoh 提供一個簡單卻強大的通信抽象。
在分布式機器人場景中,Zenoh 相較於標準 DDS 的主要優勢:
那麼,我們如何結合 ROS 2 豐富的生態系統與 Zenoh 強大的網路能力?答案是 zenoh-plugin-ros2dds
。
zenoh-plugin-ros2dds
zenoh-plugin-ros2dds
是一個高速橋接器,無縫連結 ROS 2 DDS 世界與 Zenoh 網路。它不是通用的 DDS 到 Zenoh 的橋接,而是專為 ROS 2 設計的,深度整合 ROS 圖譜,維護 ROS 2 通信(主題、服務和動作)的語意。
意即你可以在不同網路上運行 ROS 2 節點,它們會像在同一 LAN 般被發現並交換資料,全賴 Zenoh 高效的路由。
zenoh-plugin-ros2dds
使用 Rust 編寫,利用其效能、安全性與高階併發功能。架構核心是一個非同步事件迴圈,協調三大任務:本地 ROS 2 實體的發現、監聽遠端橋接公告,以及處理管理查詢。
// The main event loop in ROS2PluginRuntime
loop {
select! {
// A new local ROS 2 entity was discovered
evt = discovery_rcv.recv_async() => {
// ... handle local discovery
},
// An announcement from a remote bridge was received
liveliness_event = liveliness_subscriber.recv_async() => {
// ... handle remote announcement
},
// An admin query was received
get_request = admin_queryable.recv_async() => {
// ... handle admin query
}
)
}
非同步事件驅動模式貫穿整個插件架構,select!
巨集(Rust futures 庫)是核心工廠模組。
例如,負責處理發現事件的 DiscoveryMgr
擁有自己的 select!
迴圈,分別監聽 DDS 內建主題公告及新版 ROS 2 發布的 ros_discovery_info
主題。
// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/discovery_mgr.rs
loop {
select! {
// Raw discovery event from DDS built-in topics
evt = dds_disco_rcv.recv_async() => {
// ... process raw DDS entity ...
}
// Periodic timer to check the ros_discovery_info topic
_ = ros_disco_timer_rcv.recv_async() => {
let infos = ros_discovery_mgr.read();
// ... process entities from ros_discovery_info ...
}
)
}
RosDiscoveryInfoMgr
也使用類似循環,定期檢查自身狀態變更,必要時發佈至 ros_discovery_info
。
// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/ros_discovery.rs
loop {
select! {
// A periodic timer fires, e.g., every 100ms
_ = ros_disco_timer_rcv.recv_async() => {
let (ref msg, ref mut has_changed) = *zwrite!(participant_entities_state);
if *has_changed {
// If state has changed, publish an update to the DDS topic
Self::write(writer, msg).unwrap_or_else(|e|
tracing::error!("Failed to publish update on 'ros_discovery_info' topic: {e}")
);
*has_changed = false;
}
}
)
}
這種一致的 select!
模式實現多來源事件非阻塞、高效且可擴展的處理,對高速網路橋至關重要。
插件透過 DDS 內建 DCPSPublication
和 DCPSSubscription
主題讀者偵測本地 ROS 2 發布者與訂閱者。
// from dds_discovery.rs
pub fn run_discovery(dp: dds_entity_t, tx: Sender<DDSDiscoveryEvent>) {
unsafe {
// ... create listener ...
// Create a DDS reader for the built-in topic to discover publications
let _pr = dds_create_reader(
dp,
DDS_BUILTIN_TOPIC_DCPSPUBLICATION,
std::ptr::null(),
sub_listener,
);
// ... create another reader for subscriptions ...
}
}
新 ROS 2 節點出現時觸發 on_data
回調,插件送出 ROS2DiscoveryEvent
到主迴圈並建立路由。
插件真正亮點是動態、按需路由:
當本地 ROS 發布者出現(DDS到Zenoh):
RoutePublisher
,用輕量級Liveliness Token在 Zenoh 宣告。此「匹配激活」機制節省系統資源,避免創建無觀眾 DDS 閱讀者。
當本地 ROS 訂閱者出現(Zenoh到DDS):
插件將 ROS 2 通信模式映射至 Zenoh,以下示範其中橋接主題/發布。
從 DDS 到 Zenoh(路由 ROS 發布者資料)
插件透過 DDS 讀者回呼接收原始 DDS 取樣,並發佈到相應 Zenoh 金鑰。
ROS 2 Publisher (Local)
|
| 1. Publishes data via DDS
v
+---------------------+
| DDS Reader |
| (in zenoh-bridge) |
+---------------------+
|
| 2. Callback forwards data
v
+---------------------+
| Zenoh Publisher |
| (in zenoh-bridge) |
+---------------------+
|
| 3. Data sent over Zenoh network (WAN)
v
+---------------------+
| Zenoh Subscriber |
| (remote bridge) |
+---------------------+
|
| 4. Callback forwards data
v
+---------------------+
| DDS Writer |
| (remote bridge) |
+---------------------+
|
| 5. Publishes data via DDS
v
ROS 2 Subscriber (Remote)
// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_publisher.rs
fn route_dds_message_to_zenoh(
sample: &DDSRawSample,
publisher: &Arc<AdvancedPublisher>,
route_id: &str,
) {
if *LOG_PAYLOAD {
tracing::debug!("{route_id}: routing message - payload: {:02x?}", sample);
} else {
tracing::trace!("{route_id}: routing message - {} bytes", sample.len());
}
// The raw DDS sample is published to Zenoh
if let Err(e) = publisher.put(sample).wait() {
tracing::error!("{route_id}: failed to route message: {e}");
}
}
從 Zenoh 到 DDS(路由資料至 ROS 訂閱者)
Zenoh 訂閱者收到訊息後,回呼將原始資料直接寫入本地 DDS 寫入者。
// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_subscriber.rs
fn route_zenoh_message_to_dds(s: Sample, ros2_name: &str, data_writer: dds_entity_t) {
// ... logging ...
unsafe {
// ... prepare data from the Zenoh Sample ...
// Write the raw CDR data directly to the DDS writer
let ret = dds_writecdr(data_writer, fwdp);
if ret < 0 {
tracing::warn!(
"Route Subscriber (Zenoh:{} -> ROS:{}): DDS write failed",
s.key_expr(),
ros2_name,
);
return;
}
// ... memory cleanup ...
}
}
ROS 2 服務完全匹配 Zenoh 的查詢/回覆模型。
ROS 2 Client (Local)
|
| 1. Sends DDS Request
v
+---------------------+
| DDS Service Reader |
| (in zenoh-bridge) |
+---------------------+
|
| 2. Callback sends Zenoh Query
v
+---------------------+
| Zenoh Session (get) |
| (in zenoh-bridge) |
+---------------------+
|
| 3. Zenoh Query over WAN
v
+---------------------+
| Zenoh Queryable |
| (remote bridge) |
+---------------------+
|
| 4. Handler calls remote ROS Service
v
ROS 2 Service (Remote)
^
| 5. Service returns response
v
+---------------------+
| Zenoh Queryable |
| (remote bridge) |
+---------------------+
|
| 6. Sends Zenoh Reply
v
+---------------------+
| Zenoh Session (get) |
| (in zenoh-bridge) |
+---------------------+
|
| 7. `await` returns reply
v
+---------------------+
| DDS Service Writer |
| (in zenoh-bridge) |
+---------------------+
|
| 8. Sends DDS Reply
v
ROS 2 Client (Local)
從 DDS 到 Zenoh(公開 ROS 服務伺服器)
發現本地 ROS 服務伺服器後,插件建立 Zenoh Queryable。收到查詢時,Queryable 處理器作為 ROS 服務客戶端呼叫本地服務並將結果回覆給查詢端。
// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_service_srv.rs
// This is the callback for the Zenoh Queryable
async fn reply_to_query(
query: &Query,
context: &Context,
ros2_name: &str,
type_name: &str,
_type_info: &Option<Arc<TypeInfo>>,
) {
// ... create a temporary ROS2 service client ...
// Send the request to the local ROS2 service server
let result = dds_service_client
.send_request(query.value().payload.to_vec())
.await;
// Send the result back as a reply to the Zenoh query
match result {
Ok(payload) => {
let _ = query.reply(payload).await;
}
Err(e) => {
// ... handle error ...
}
}
}
從 Zenoh 到 DDS(呼叫遠端 ROS 服務)
本地 ROS 服務客戶端發送請求時,插件攔截並透過 Zenoh query 發送至遠端服務,並等待回覆。
// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_service_cli.rs
// This is the callback for the DDS reader that receives local service requests
async fn forward_request_to_zenoh(
sample: &DDSRawSample,
context: &Context,
zenoh_key_expr: &OwnedKeyExpr,
queries_timeout: Duration,
dds_reply_writer: dds_entity_t,
) {
// Send a Zenoh query (get) and wait for the reply
match context
.zsession
.get(zenoh_key_expr)
.value(sample.payload.clone())
.timeout(queries_timeout)
.await
{
Ok(replies) => {
// Forward the first valid reply to the local DDS reply writer
if let Some(Ok(reply)) = replies.first() {
// ... write reply.payload to dds_reply_writer ...
}
}
Err(e) => {
// ... handle error ...
}
}
}
這種優雅映射是橋接如此透明強大的原因。
插件可智慧地從程式碼動態配 CycloneDDS 設定環境變數,因應不同 ROS 2 版本及用戶配置。
// from lib.rs
// Dynamically create a CycloneDDS configuration string
let cyclonedds_config = create_cyclonedds_config(
ros_automatic_discovery_range.unwrap_or(RosAutomaticDiscoveryRange::Subnet),
ros_static_peers.unwrap_or(Vec::new())
);
// Set the environment variable for CycloneDDS to use
env::set_var(
"CYCLONEDDS_URI",
format!("{}{}", cyclonedds_config, env::var("CYCLONEDDS_URI").unwrap_or_default()),
);
QoS 也有智慧映射,例如 TRANSIENT_LOCAL
持久性 ROS 發布者使用 Zenoh Publication Cache 支持,保留網路間 QoS 語意。
安裝方式多樣:Debian 套件、Docker 或從原始碼編譯。
echo "deb [trusted=yes] [https://download.eclipse.org/zenoh/debian-repo/](https://download.eclipse.org/zenoh/debian-repo/) /" | sudo tee -a /etc/apt/sources.list
sudo apt update
sudo apt install zenoh-bridge-ros2dds
talker
與 listener
示範在一台機器(「機器人」)執行 talker
,另一台(「主機」)執行 listener
,透過 Zenoh 橋接通信。
準備四個終端視窗。
機器人端(IP 例如 192.168.1.100)
終端 1:啟動 Zenoh 橋接
# 啟動橋接,開始偵聽 Zenoh 节点
zenoh-bridge-ros2dds
終端 2:執行 ROS 2 talker
# 載入 ROS 2 環境
source /opt/ros/humble/setup.bash
# 執行 C++ talker 範例
ros2 run demo_nodes_cpp talker
主機端
終端 3:啟動 Zenoh 橋接並連接機器人橋接
zenoh-bridge-ros2dds -e tcp/192.168.1.100:7447
終端 4:執行 ROS 2 listener
source /opt/ros/humble/setup.bash
# 先列出主題驗證遠端可見
ros2 topic list
# 你應看到 '/chatter' 主題。
# 執行 C++ listener 範例
ros2 run demo_nodes_cpp listener
此時,主機端 listener 會接收並顯示機器人端 talker 發佈的訊息,Zenoh 無縫跨網路橋接通信。
zenoh-plugin-ros2dds
為分布式 ROS 2 通信挑戰提供強大優雅的解決方案。藉由 Zenoh 高效網路與自身智能按需路由(Rust 實作),它成就透明且高效的橋接,在 LAN、WAN 與互聯網間擴展 ROS 2 應用範圍。無論是多機器人系統、遠端操控還是任何分布式機器人應用,此插件都是不可或缺的利器。