iT邦幫忙

2025 iThome 鐵人賽

DAY 8
0
Rust

30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊系列 第 8

Day 08: 🦀 用 Rust 來撰寫一個 Zenoh 微服務

  • 分享至 

  • xImage
  •  

🦀 用 Rust 來撰寫一個 Zenoh 微服務

今天,我們將透過撰寫一個小型應用程式,來熟悉如何在 Zenoh 中操作 Session

Zenoh 是一個設計用於分散式系統的 發布/訂閱/查詢 通訊協定,能夠從雲端延伸到邊緣裝置。
只需幾行 Rust 程式碼,就可以建立功能強大的微服務,並在各種網路拓撲中無縫溝通。

在這篇文章中,我們將撰寫一個單一的 Zenoh 節點,並具備以下功能:

  • 使用查詢提供兩個 服務(echo 與 convert)
  • 發布 感測器資料(溫度)
  • 訂閱 感測器資料 的更新
  • 作為 客戶端 查詢自身的服務

這樣一來,它就成為一個完整自足的 Zenoh 微服務


環境設定

建立一個新的專案:

cargo new micro-service

修改 Cargo.toml

[dependencies]
zenoh = "1.0.0"
tokio = { version = "1", features = ["full"] }

完整程式碼

接下來修改 _src/main.rs_

// 導入 Zenoh 通訊及非同步操作所需的模組
use zenoh::Config;
use tokio::time::{sleep, Duration};

/// Zenoh 多服務節點的主要進入點
/// 此應用展示各種 Zenoh 模式:
/// - Queryable 服務(echo 和 convert)
/// - 發佈/訂閱模式(Pub/Sub)
/// - 客戶端查詢
#[tokio::main]
async fn main() {
    // 使用預設設定初始化 Zenoh session
    // Session 是所有 Zenoh 操作的主要入口
    let session = zenoh::open(Config::default()).await.unwrap();
    println!("Zenoh 多服務節點啟動!");

    // -----------------------------
    // Echo 服務
    // -----------------------------
    // 宣告一個可查詢服務,回應 "service/echo" 的請求
    // 此服務會簡單地回傳收到的訊息
    let echo = session.declare_queryable("service/echo").await.unwrap();
    tokio::spawn(async move {
        // 持續監聽傳入的查詢
        while let Ok(query) = echo.recv_async().await {
            // 從查詢中提取訊息內容
            let msg = query.payload().map(|p| p.try_to_string().unwrap_or_default().to_string()).unwrap_or_default();
            println!("[Echo 服務] 收到: {}", msg);

            // 回應查詢,回傳 echo 後的訊息
            query.reply(query.key_expr().clone(), format!("Echo: {}", msg))
                .await.unwrap();
        }
    });

    // -----------------------------
    // 二進位轉換服務
    // -----------------------------
    // 宣告一個可查詢服務,將數字轉換為二進位格式
    let convert = session.declare_queryable("service/convert").await.unwrap();
    tokio::spawn(async move {
        // 持續監聽傳入的查詢
        while let Ok(query) = convert.recv_async().await {
            // 從查詢中提取訊息內容
            let msg = query.payload().map(|p| p.try_to_string().unwrap_or_default().to_string()).unwrap_or_default();
            println!("[Binary Convert 服務] 收到: {}", msg);

            // 嘗試解析訊息為整數,並轉換為二進位
            // 若解析失敗,回傳錯誤訊息
            let reply = msg.parse::<i64>()
                .map(|v| format!("{} 的二進位格式為 0b{:b}", v, v))
                .unwrap_or_else(|_| "錯誤:不是有效的整數".into());

            // 回應查詢,傳回二進位轉換結果
            query.reply(query.key_expr().clone(), reply).await.unwrap();
        }
    });

    // -----------------------------
    // Sensor 發佈者
    // -----------------------------
    // 宣告一個發佈者,用於模擬溫度感測器資料
    // 此發佈者將溫度資料發佈到 "sensor/temperature" 主題
    let publisher = session.declare_publisher("sensor/temperature").await.unwrap();
    tokio::spawn(async move {
        // 初始化溫度值
        let mut value = 25.0;
        loop {
            // 格式化並發佈溫度資料
            let msg = format!("Temp = {:.1}", value);
            publisher.put(msg.clone()).await.unwrap();
            println!("[Publisher] 發佈: {}", msg);

            // 模擬溫度變化,略微增加
            value += 0.1;
            // 等待 2 秒再發佈下一筆資料
            sleep(Duration::from_secs(2)).await;
        }
    });

    // -----------------------------
    // 訂閱者
    // -----------------------------
    // 宣告一個訂閱者,監聽所有感測器資料,使用通配符模式
    // "sensor/**" 會匹配任何以 "sensor/" 開頭的 key expression
    let subscriber = session.declare_subscriber("sensor/**").await.unwrap();
    tokio::spawn(async move {
        // 持續監聽感測器主題的發佈資料
        while let Ok(sample) = subscriber.recv_async().await {
            // 取出 payload 並顯示對應的主題 key
            let payload = sample.payload().try_to_string().unwrap_or_default().to_string();
            println!("[Subscriber] '{}' -> {}", sample.key_expr(), payload);
        }
    });

    // -----------------------------
    // 客戶端定期查詢
    // -----------------------------
    // 建立一個客戶端任務,定期查詢服務
    // 展示 Zenoh 的請求-回應模式
    tokio::spawn({
        let session = session.clone();
        async move {
            // 等待服務啟動完成,再發送查詢
            sleep(Duration::from_secs(5)).await;

            let mut counter = 0;
            loop {
                counter += 1;
                println!("[Client] 發送查詢 #{}", counter);

                // 查詢 echo 服務
                let replies = session.get("service/echo")
                    .payload(format!("Hello Zenoh! #{}", counter))
                    .await.unwrap();

                // 處理所有 echo 服務回覆
                while let Ok(reply) = replies.recv_async().await {
                    if let Ok(sample) = reply.result() {
                        println!("[Client] Echo 回覆: {}", sample.payload().try_to_string().unwrap());
                    }
                }

                // 遞增整數並查詢 convert 服務
                let test_value = 42 + counter;
                let replies = session.get("service/convert")
                    .payload(test_value.to_string())
                    .await.unwrap();

                // 處理 convert 服務所有回覆
                while let Ok(reply) = replies.recv_async().await {
                    if let Ok(sample) = reply.result() {
                        println!("[Client] Convert 回覆: {}", sample.payload().try_to_string().unwrap());
                    }
                }

                // 等待 3 秒後再發送下一批查詢
                sleep(Duration::from_secs(3)).await;
            }
        }
    });

    // 保持主執行緒存活,以允許所有 spawned 任務持續運行
    // 無限迴圈防止程式結束
    loop {
        sleep(Duration::from_secs(60)).await;
    }
}

架構圖

以下展示 Zenoh 微服務節點內各個元件如何協作:

          ┌───────────────────────────────────────────┐
          │           Zenoh 微服務節點                │
          │                                           │
          │   ┌──────────────┐     ┌──────────────┐   │
Client ───┼──▶│ Echo 服務    │     │ Convert 服務 │◀──┼── Client
(查詢)    │   └──────────────┘     └──────────────┘   │  (查詢)
          │                                           │
          │   ┌──────────────┐     ┌──────────────┐   │
Publisher ───▶│ 溫度發佈器   │────▶│ 訂閱者監聽器 │───┼── 控制台
 (發佈)   │   │              │     │              │   │ (紀錄資料)
          │   └──────────────┘     └──────────────┘   │
          └───────────────────────────────────────────┘
  • Echo 服務:回傳 "Echo: <msg>"
  • Convert 服務:解析數字並回傳二進位表示。
  • 發佈者 (Publisher):持續發送溫度數據。
  • 訂閱者 (Subscriber):即時監聽溫度更新。
  • 客戶端 (Client):定期查詢服務以展示請求-回應模式。

運行微服務

執行應用程式:

cargo run

輸出範例:

Zenoh 多服務節點啟動!
[Publisher] 發佈: Temp = 25.0
[Subscriber] 'sensor/temperature' -> Temp = 25.0
[Publisher] 發佈: Temp = 25.1
[Subscriber] 'sensor/temperature' -> Temp = 25.1
[Publisher] 發佈: Temp = 25.2
[Subscriber] 'sensor/temperature' -> Temp = 25.2
[Client] 發送查詢 #1
[Echo 服務] 收到: Hello Zenoh! #1
[Client] Echo 回覆: Echo: Hello Zenoh! #1
[Binary Convert 服務] 收到: 43
[Client] Convert 回覆: 43 的二進位格式為 0b101011
[Publisher] 發佈: Temp = 25.3
[Subscriber] 'sensor/temperature' -> Temp = 25.3
[Client] 發送查詢 #2
[Echo 服務] 收到: Hello Zenoh! #2
[Client] Echo 回覆: Echo: Hello Zenoh! #2
[Binary Convert 服務] 收到: 44
[Client] Convert 回覆: 44 的二進位格式為 0b101100

回顧

  • Pub/Sub — 模擬感測器持續發佈,訂閱者即時接收。
  • Query/Reply — 服務可回應即時請求(/service/echo/service/convert)。
  • 客戶端整合 — 節點可以像其他 peer 一樣查詢自己的服務。
  • 整合微服務節點 — 所有功能在單一 Rust 程式中運行,方便擴展。

之後我們會將這個範例拆分成獨立微服務,在不同進程中運行,或分散到叢集。藉此來展示Zenoh 會自動處理發現、路由與訊息傳遞。我們下一篇文章見!


上一篇
Day 07: Zenoh 會議層(Session Layer)
下一篇
Day 09: 🦀 打造各種網路拓撲的Zenoh 微服務
系列文
30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊10
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言