iT邦幫忙

2025 iThome 鐵人賽

DAY 25
0
Rust

30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊系列 第 25

Day 25: Zenoh 在機器人系統的應用全景 Part 2 - ROS 2 與 zenoh-plugin-ros2dds

  • 分享至 

  • xImage
  •  

Zenoh 在機器人系統的應用全景 Part 2 - ROS 2 與 zenoh-plugin-ros2dds

深入探討 zenoh-plugin-ros2dds、其 Rust 架構,以及如何解決常見的 ROS 2 通信挑戰。


機器人作業系統 2(ROS 2)是用於構建現代機器人應用的頂級開源框架。它是對原始 ROS 的全新設計,從零開始滿足商業和工業機器人的需求:即時性能、增強的安全性與多機器人系統支持。

ROS 2 的核心是一套靈活的通信系統,允許機器人軟體中的不同部分(稱為「節點」)通過主題(topics)、服務(services)和動作(actions)交換資料。這得益於 ROS 中介軟體介面(RMW),這是一層抽象,讓開發者可以替換底層的通信技術。預設且最常見的技術是資料分發服務(DDS)。

DDS 的威力

DDS 是一種健全且業界標準的中介軟體協定,專為即時、可靠與可擴展的資料交換設計。在單一穩定的區域網路(LAN)環境中,DDS 是通訊的絕佳選擇。其關鍵特色包括:

  • 去中心化(Decentralization): DDS 無需代理。節點直接在網路上發現彼此,使其對單點故障具韌性。
  • 豐富的服務品質(QoS): DDS 提供全面的 QoS 策略,用以控制可靠性、持久性與延遲,讓開發者可針對不同資料類型微調通信。
  • 互通性: DDS 標準保證不同廠商的實做可相互通信。

然而,DDS 在 LAN 環境中的優勢,在現代機器人推向極限時也會產生挑戰。

DDS 的挑戰:當有線與 Wi-Fi 不足以應付

DDS 的發現機制(主要基於 DDSI-RTPS 協定)過度依賴多重廣播(multicast)公告。每個節點定時向整個網路發送「我在這裡!」的訊號,尋找其他節點。在受控環境下此機制有效,但在複雜場景中會引發幾個問題:

  1. 過度頻繁的發現訊息:「嘮叨 Discovery」
    在大規模系統中,訊息流量龐大,消耗帶寬。對 Wi-Fi 或 5G 這類帶寬有限的無線網路尤其不利。
  2. 廣域網路(WAN)問題:
    多重廣播流量通常不會通過互聯網或不同子網路,讓戶外機器人與雲端伺服器間,或不同建築物中兩台機器人間的 ROS 2 通訊無法透明連通。雖有解決方案,但配置複雜且不穩定。
  3. 整合困難:
    將 ROS 2 資料橋接至其他應用或協定(如網路服務或行動應用)需自訂橋接器,且經常難以完整轉譯 ROS 2 的溝通語意。

這些挑戰對 ROS 2 的預設功能形成限制。如何打造真正分布式、互聯網規模的機器人系統?

png-1

這正是 Zenoh 設計用來解決的問題。

Zenoh 登場:改變通信遊戲規則

Zenoh 是為分布式應用下一時代所設計的發布/訂閱/查詢協定。它極輕量、高效,能運行於從微控制器到資料中心的所有裝置。透過統一處理動態資料、靜態資料和運算,Zenoh 提供一個簡單卻強大的通信抽象。

在分布式機器人場景中,Zenoh 相較於標準 DDS 的主要優勢:

  • 高效的發現機制: Zenoh 的發現機制佔用的頻寬遠低於 DDS,對 Wi-Fi、5G 等頻寬有限環境至關重要。
  • 透明的路由能力: Zenoh 能在多元且異質的網路間,無須複雜設定就能透明路由資料。
  • 無代理去中心化: 與 DDS 一樣,Zenoh 也去中心化,避免單點故障。

那麼,我們如何結合 ROS 2 豐富的生態系統與 Zenoh 強大的網路能力?答案是 zenoh-plugin-ros2dds

兩全其美:zenoh-plugin-ros2dds

zenoh-plugin-ros2dds 是一個高速橋接器,無縫連結 ROS 2 DDS 世界與 Zenoh 網路。它不是通用的 DDS 到 Zenoh 的橋接,而是專為 ROS 2 設計的,深度整合 ROS 圖譜,維護 ROS 2 通信(主題、服務和動作)的語意。

意即你可以在不同網路上運行 ROS 2 節點,它們會像在同一 LAN 般被發現並交換資料,全賴 Zenoh 高效的路由。

png-2

運作原理:Rust 架構一瞥

zenoh-plugin-ros2dds 使用 Rust 編寫,利用其效能、安全性與高階併發功能。架構核心是一個非同步事件迴圈,協調三大任務:本地 ROS 2 實體的發現、監聽遠端橋接公告,以及處理管理查詢。

// The main event loop in ROS2PluginRuntime
loop {
    select! {
        // A new local ROS 2 entity was discovered
        evt = discovery_rcv.recv_async() => {
            // ... handle local discovery
        },

        // An announcement from a remote bridge was received
        liveliness_event = liveliness_subscriber.recv_async() => {
            // ... handle remote announcement
        },

        // An admin query was received
        get_request = admin_queryable.recv_async() => {
            // ... handle admin query
        }
    )
}

普遍採用的非同步模型

非同步事件驅動模式貫穿整個插件架構,select! 巨集(Rust futures 庫)是核心工廠模組。

例如,負責處理發現事件的 DiscoveryMgr 擁有自己的 select! 迴圈,分別監聽 DDS 內建主題公告及新版 ROS 2 發布的 ros_discovery_info 主題。

// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/discovery_mgr.rs
loop {
    select! {
        // Raw discovery event from DDS built-in topics
        evt = dds_disco_rcv.recv_async() => {
            // ... process raw DDS entity ...
        }

        // Periodic timer to check the ros_discovery_info topic
        _ = ros_disco_timer_rcv.recv_async() => {
            let infos = ros_discovery_mgr.read();
            // ... process entities from ros_discovery_info ...
        }
    )
}

RosDiscoveryInfoMgr 也使用類似循環,定期檢查自身狀態變更,必要時發佈至 ros_discovery_info

// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/ros_discovery.rs
loop {
    select! {
        // A periodic timer fires, e.g., every 100ms
        _ = ros_disco_timer_rcv.recv_async() => {
            let (ref msg, ref mut has_changed) = *zwrite!(participant_entities_state);
            if *has_changed {
                // If state has changed, publish an update to the DDS topic
                Self::write(writer, msg).unwrap_or_else(|e|
                    tracing::error!("Failed to publish update on 'ros_discovery_info' topic: {e}")
                );
                *has_changed = false;
            }
        }
    )
}

這種一致的 select! 模式實現多來源事件非阻塞、高效且可擴展的處理,對高速網路橋至關重要。

動態發現

插件透過 DDS 內建 DCPSPublicationDCPSSubscription 主題讀者偵測本地 ROS 2 發布者與訂閱者。

// from dds_discovery.rs
pub fn run_discovery(dp: dds_entity_t, tx: Sender<DDSDiscoveryEvent>) {
    unsafe {
        // ... create listener ...

        // Create a DDS reader for the built-in topic to discover publications
        let _pr = dds_create_reader(
            dp,
            DDS_BUILTIN_TOPIC_DCPSPUBLICATION,
            std::ptr::null(),
            sub_listener,
        );

        // ... create another reader for subscriptions ...
    }
}

新 ROS 2 節點出現時觸發 on_data 回調,插件送出 ROS2DiscoveryEvent 到主迴圈並建立路由。

按需路由引擎

插件真正亮點是動態、按需路由:

當本地 ROS 發布者出現(DDS到Zenoh):

  1. 發現發布者。
  2. 建立 RoutePublisher,用輕量級Liveliness Token在 Zenoh 宣告。
  3. 不立刻建立 DDS 閱讀者,待遠端 Zenoh 訂閱者感興趣時才創建。
  4. 只有匹配訂閱者出現時開始接收與轉發資料。

此「匹配激活」機制節省系統資源,避免創建無觀眾 DDS 閱讀者。

當本地 ROS 訂閱者出現(Zenoh到DDS):

  1. 發現訂閱者。
  2. 立即為該主題建立 DDS 寫入者,供本地訂閱者連接。
  3. 創建 Zenoh 訂閱者接收網路資料並宣告存在。

深入:ROS 2 與 Zenoh 對應

插件將 ROS 2 通信模式映射至 Zenoh,以下示範其中橋接主題/發布。

發布/訂閱橋接

從 DDS 到 Zenoh(路由 ROS 發布者資料)

插件透過 DDS 讀者回呼接收原始 DDS 取樣,並發佈到相應 Zenoh 金鑰。

ROS 2 Publisher (Local)
       |
       | 1. Publishes data via DDS
       v
+---------------------+
| DDS Reader          |
| (in zenoh-bridge)   |
+---------------------+
       |
       | 2. Callback forwards data
       v
+---------------------+
| Zenoh Publisher     |
| (in zenoh-bridge)   |
+---------------------+
       |
       | 3. Data sent over Zenoh network (WAN)
       v
+---------------------+
| Zenoh Subscriber    |
| (remote bridge)     |
+---------------------+
       |
       | 4. Callback forwards data
       v
+---------------------+
| DDS Writer          |
| (remote bridge)     |
+---------------------+
       |
       | 5. Publishes data via DDS
       v
ROS 2 Subscriber (Remote)
// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_publisher.rs

fn route_dds_message_to_zenoh(
    sample: &DDSRawSample,
    publisher: &Arc<AdvancedPublisher>,
    route_id: &str,
) {
    if *LOG_PAYLOAD {
        tracing::debug!("{route_id}: routing message - payload: {:02x?}", sample);
    } else {
        tracing::trace!("{route_id}: routing message - {} bytes", sample.len());
    }
    // The raw DDS sample is published to Zenoh
    if let Err(e) = publisher.put(sample).wait() {
        tracing::error!("{route_id}: failed to route message: {e}");
    }
}

從 Zenoh 到 DDS(路由資料至 ROS 訂閱者)

Zenoh 訂閱者收到訊息後,回呼將原始資料直接寫入本地 DDS 寫入者。

// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_subscriber.rs

fn route_zenoh_message_to_dds(s: Sample, ros2_name: &str, data_writer: dds_entity_t) {
    // ... logging ...
    unsafe {
        // ... prepare data from the Zenoh Sample ...

        // Write the raw CDR data directly to the DDS writer
        let ret = dds_writecdr(data_writer, fwdp);
        if ret < 0 {
            tracing::warn!(
                "Route Subscriber (Zenoh:{} -> ROS:{}): DDS write failed",
                s.key_expr(),
                ros2_name,
            );
            return;
        }
        // ... memory cleanup ...
    }
}

服務橋接

ROS 2 服務完全匹配 Zenoh 的查詢/回覆模型。

ROS 2 Client (Local)
       |
       | 1. Sends DDS Request
       v
+---------------------+
| DDS Service Reader  |
| (in zenoh-bridge)   |
+---------------------+
       |
       | 2. Callback sends Zenoh Query
       v
+---------------------+
| Zenoh Session (get) |
| (in zenoh-bridge)   |
+---------------------+
       |
       | 3. Zenoh Query over WAN
       v
+---------------------+
| Zenoh Queryable     |
| (remote bridge)     |
+---------------------+
       |
       | 4. Handler calls remote ROS Service
       v
ROS 2 Service (Remote)
       ^
       | 5. Service returns response
       v
+---------------------+
| Zenoh Queryable     |
| (remote bridge)     |
+---------------------+
       |
       | 6. Sends Zenoh Reply
       v
+---------------------+
| Zenoh Session (get) |
| (in zenoh-bridge)   |
+---------------------+
       |
       | 7. `await` returns reply
       v
+---------------------+
| DDS Service Writer  |
| (in zenoh-bridge)   |
+---------------------+
       |
       | 8. Sends DDS Reply
       v
ROS 2 Client (Local)

從 DDS 到 Zenoh(公開 ROS 服務伺服器)

發現本地 ROS 服務伺服器後,插件建立 Zenoh Queryable。收到查詢時,Queryable 處理器作為 ROS 服務客戶端呼叫本地服務並將結果回覆給查詢端。

// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_service_srv.rs

// This is the callback for the Zenoh Queryable
async fn reply_to_query(
    query: &Query,
    context: &Context,
    ros2_name: &str,
    type_name: &str,
    _type_info: &Option<Arc<TypeInfo>>,
) {
    // ... create a temporary ROS2 service client ...

    // Send the request to the local ROS2 service server
    let result = dds_service_client
        .send_request(query.value().payload.to_vec())
        .await;

    // Send the result back as a reply to the Zenoh query
    match result {
        Ok(payload) => {
            let _ = query.reply(payload).await;
        }
        Err(e) => {
            // ... handle error ...
        }
    }
}

從 Zenoh 到 DDS(呼叫遠端 ROS 服務)

本地 ROS 服務客戶端發送請求時,插件攔截並透過 Zenoh query 發送至遠端服務,並等待回覆。

// from zenoh-plugin-ros2dds/zenoh-plugin-ros2dds/src/route_service_cli.rs

// This is the callback for the DDS reader that receives local service requests
async fn forward_request_to_zenoh(
    sample: &DDSRawSample,
    context: &Context,
    zenoh_key_expr: &OwnedKeyExpr,
    queries_timeout: Duration,
    dds_reply_writer: dds_entity_t,
) {
    // Send a Zenoh query (get) and wait for the reply
    match context
        .zsession
        .get(zenoh_key_expr)
        .value(sample.payload.clone())
        .timeout(queries_timeout)
        .await
    {
        Ok(replies) => {
            // Forward the first valid reply to the local DDS reply writer
            if let Some(Ok(reply)) = replies.first() {
                // ... write reply.payload to dds_reply_writer ...
            }
        }
        Err(e) => {
            // ... handle error ...
        }
    }
}

這種優雅映射是橋接如此透明強大的原因。

智能配置與 QoS 映射

插件可智慧地從程式碼動態配 CycloneDDS 設定環境變數,因應不同 ROS 2 版本及用戶配置。

// from lib.rs
// Dynamically create a CycloneDDS configuration string
let cyclonedds_config = create_cyclonedds_config(
    ros_automatic_discovery_range.unwrap_or(RosAutomaticDiscoveryRange::Subnet),
    ros_static_peers.unwrap_or(Vec::new())
);
// Set the environment variable for CycloneDDS to use
env::set_var(
    "CYCLONEDDS_URI",
    format!("{}{}", cyclonedds_config, env::var("CYCLONEDDS_URI").unwrap_or_default()),
);

QoS 也有智慧映射,例如 TRANSIENT_LOCAL 持久性 ROS 發布者使用 Zenoh Publication Cache 支持,保留網路間 QoS 語意。

整合實踐指南

安裝方式多樣:Debian 套件、Docker 或從原始碼編譯。

安裝 (Debian/Ubuntu)

echo "deb [trusted=yes] [https://download.eclipse.org/zenoh/debian-repo/](https://download.eclipse.org/zenoh/debian-repo/) /" | sudo tee -a /etc/apt/sources.list
sudo apt update
sudo apt install zenoh-bridge-ros2dds

範例:talkerlistener

示範在一台機器(「機器人」)執行 talker,另一台(「主機」)執行 listener,透過 Zenoh 橋接通信。

準備四個終端視窗。

機器人端(IP 例如 192.168.1.100)

終端 1:啟動 Zenoh 橋接

# 啟動橋接,開始偵聽 Zenoh 节点
zenoh-bridge-ros2dds

終端 2:執行 ROS 2 talker

# 載入 ROS 2 環境
source /opt/ros/humble/setup.bash

# 執行 C++ talker 範例
ros2 run demo_nodes_cpp talker

主機端

終端 3:啟動 Zenoh 橋接並連接機器人橋接

zenoh-bridge-ros2dds -e tcp/192.168.1.100:7447

終端 4:執行 ROS 2 listener

source /opt/ros/humble/setup.bash

# 先列出主題驗證遠端可見
ros2 topic list
# 你應看到 '/chatter' 主題。

# 執行 C++ listener 範例
ros2 run demo_nodes_cpp listener

此時,主機端 listener 會接收並顯示機器人端 talker 發佈的訊息,Zenoh 無縫跨網路橋接通信。

結語

zenoh-plugin-ros2dds 為分布式 ROS 2 通信挑戰提供強大優雅的解決方案。藉由 Zenoh 高效網路與自身智能按需路由(Rust 實作),它成就透明且高效的橋接,在 LAN、WAN 與互聯網間擴展 ROS 2 應用範圍。無論是多機器人系統、遠端操控還是任何分布式機器人應用,此插件都是不可或缺的利器。


上一篇
Day 24: Zenoh 在機器人系統的應用全景 Part 1 - ROS1 與 zenoh-plugin-ros1:傳統機器人網路的新橋梁
下一篇
Day 26: 深入解析 Rust 與 CycloneDDS 的橋接利器 —— Cyclors
系列文
30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言