iT邦幫忙

2025 iThome 鐵人賽

DAY 9
0
Rust

30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊系列 第 9

Day 09: 🦀 打造各種網路拓撲的Zenoh 微服務

  • 分享至 

  • xImage
  •  

🦀 打造各種網路拓撲的Zenoh 微服務

在這篇文章中,我們將把單一節點的微服務擴展成一個多進程應用程式。這將讓我們體驗 Zenoh 在服務發現可擴展性上的強大功能。

我們將建立五個獨立的可執行檔(binaries):

  • echo_service:一個簡單的服務,會回應接收到的任何訊息。
  • convert_service:將整數轉換成二進位表示的服務。
  • sensor_publisher:模擬溫度感測器並定期發布資料的發佈者。
  • sensor_subscriber:訂閱並接收溫度更新的訂閱者。
  • client:定期查詢 echoconvert 服務的客戶端。

我們將探討微服務的三種不同運行情境:

  1. 點對點模式(Peer-to-Peer Mode):服務透過多播自動在網路中發現彼此。
  2. 路由模式(Router Mode):透過中央 zenohd 路由器進行服務發現,但通訊仍是點對點。
  3. 跨網路通訊(Multi-Network Communication):不同網路中的服務透過公共路由器互相通訊。

專案設定

我們將沿用前一章建立的 micro-service 專案,並把新的可執行檔放在 src/bin 目錄下。

Cargo.toml

請確保你的 Cargo.toml 檔案包含以下相依套件與 binary 定義:


[package]
name = "mirco-service"
version = "0.1.0"
edition = "2024"

[dependencies]
zenoh = "1.5.1"
tokio = { version = "1", features = ["full"] }

[[bin]]
name = "echo_service"
path = "src/bin/echo_service.rs"

[[bin]]
name = "convert_service"
path = "src/bin/convert_service.rs"

[[bin]]
name = "sensor_publisher"
path = "src/bin/sensor_publisher.rs"

[[bin]]
name = "sensor_subscriber"
path = "src/bin/sensor_subscriber.rs"

[[bin]]
name = "client"
path = "src/bin/client.rs"

Case 1: Peer-to-Peer Mode (Multicast Discovery)

在此模式下,每個 Zenoh 節點會使用多播來發現同一網路中的其他節點。
這是預設行為,不需要任何中間的router。

Diagram: Peer-to-Peer Discovery

                      +------------------+
                      | Network (LAN)    |
                      +------------------+
                        ^    ^    ^    ^
                        |    |    |    | Multicast Discovery
+-----------------+     |    |    |    |     +--------------------+
|  echo_service   |<----/    |    |    \---->|  sensor_publisher  |
+-----------------+          |    |          +--------------------+
                             |    |
+------------------+         |    |         +---------------------+
| convert_service  |<-------/     \-------->|  sensor_subscriber  |
+------------------+                        +---------------------+
        ^                                           ^
        |
        \------------------------------------------/
                           |
                   +--------------+
                   |    client    |
                   +--------------+

程式碼

以下是每個檔案的程式碼範例。

echo_service.rs

use zenoh::Config;

#[tokio::main]
async fn main() {
    let session = zenoh::open(Config::default()).await.unwrap();
    println!("Echo service started!");

    let echo = session.declare_queryable("service/echo").await.unwrap();
    tokio::spawn(async move {
        while let Ok(query) = echo.recv_async().await {
            let msg = query.payload()
                .map(|p| p.try_to_string().unwrap_or_default())
                .unwrap_or_default();
            println!("[Echo service] Received request: {}", msg);
            query.reply(query.key_expr().clone(), format!("Echo: {}", msg)).await.unwrap();
        }
    });

    loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; }
}

convert_service.rs

use zenoh::Config;

#[tokio::main]
async fn main() {
    let session = zenoh::open(Config::default()).await.unwrap();
    println!("Convert service started!");

    let convert = session.declare_queryable("service/convert").await.unwrap();
    tokio::spawn(async move {
        while let Ok(query) = convert.recv_async().await {
            let msg = query.payload()
                .map(|p| p.try_to_string().unwrap_or_default())
                .unwrap_or_default();
            println!("[Convert service] Received request: {}", msg);
            let reply = msg.parse::<i64>()
                .map(|v| format!("0b{:b}", v))
                .unwrap_or_else(|_| "Error: not an integer".into());
            query.reply(query.key_expr().clone(), reply).await.unwrap();
        }
    });

    loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; }
}

sensor_publisher.rs

use zenoh::Config;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let session = zenoh::open(Config::default()).await.unwrap();
    let publisher = session.declare_publisher("sensor/temperature").await.unwrap();
    let mut value = 25.0;

    loop {
        let msg = format!("Temp = {:.1}", value);
        publisher.put(msg.clone()).await.unwrap();
        println!("[Publisher] {}", msg);
        value += 0.1;
        sleep(Duration::from_secs(2)).await;
    }
}

sensor_subscriber.rs

use zenoh::Config;

#[tokio::main]
async fn main() {
    let session = zenoh::open(Config::default()).await.unwrap();
    let subscriber = session.declare_subscriber("sensor/**").await.unwrap();

    while let Ok(sample) = subscriber.recv_async().await {
        let payload = sample.payload().try_to_string().unwrap_or_default().to_string();
        println!("[Subscriber] '{}' -> {}", sample.key_expr(), payload);
    }
}

client.rs

use zenoh::Config;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let session = zenoh::open(Config::default()).await.unwrap();
    sleep(Duration::from_secs(5)).await; // wait for services

    let mut counter = 0;
    loop {
        counter += 1;
        let replies = session.get("service/echo")
            .payload(format!("Hello Zenoh! #{}", counter))
            .await.unwrap();
        while let Ok(reply) = replies.recv_async().await {
            if let Ok(sample) = reply.result() {
                println!("[Client] Echo reply: {}", sample.payload().try_to_string().unwrap());
            }
        }

        let test_value = 42 + counter;
        let replies = session.get("service/convert")
            .payload(test_value.to_string())
            .await.unwrap();
        while let Ok(reply) = replies.recv_async().await {
            if let Ok(sample) = reply.result() {
                println!("[Client] Convert reply: {}", sample.payload().try_to_string().unwrap());
            }
        }

        sleep(Duration::from_secs(3)).await;
    }
}

執行方式

開啟五個不同的終端機,並分別執行每個程式:

cargo run --bin echo_service
cargo run --bin convert_service
cargo run --bin sensor_publisher
cargo run --bin sensor_subscriber
cargo run --bin client

這些節點將透過多播自動互相發現,您將看到 client 與服務互動,以及 subscriber 接收來自 publisher 的資料。


Case 2: Router Mode (zenohd)

在此模式下,我們使用一個 zenohd 實例作為集中式路由器來進行服務發現。這在較大規模的部署中,或當多播不可用或不適合使用時特別有用。

Diagram: Router Mode

+-----------------+      +--------------------+      +---------------------+
|  echo_service   |      |  convert_service   |      |  sensor_publisher   |
+-----------------+      +--------------------+      +---------------------+
        |                        |                             |
        \------------------------|----------------------------/
                                 |
                      +------------------+
                      |  Zenoh Router    |
                      | (`zenohd`)       |
                      +------------------+
                                 |
        /------------------------|----------------------------\
        |                        |                             |
+-----------------+      +--------------------+      +---------------------+
|     client      |      | sensor_subscriber  |      | (other services)    |
+-----------------+      +--------------------+      +---------------------+

如何執行

  1. 啟動 Zenoh 路由器

    首先,你需要安裝 zenohd。如果尚未安裝,可以從 Zenoh 官方網站 下載。

    執行以下命令啟動路由器:

    zenohd --listen tcp/0.0.0.0:7447
    
  2. 修改程式碼

    在五個執行檔中,修改 Zenoh session 設定以連接到路由器:

    let config = Config::from_file("config.json5").unwrap();
    let session = zenoh::open(config).await.unwrap();
    

    config.json5

    {
      connect: {
        endpoints: [
          "tcp/127.0.0.1:7447"
        ],
      },
      scouting: {
        multicast: {
          enabled: false,
        }
      }
    }
    
  3. 執行各個執行檔

    接下來,在不同的終端機中運行五個執行檔。它們會透過路由器進行服務發現,然後相互通信。一旦建立連接後,就算中間的zenohd斷掉了也還是可以正常通訊。


案例 3:跨多網路的通訊(使用公共路由器)

Zenoh 的路由器模式非常適合服務分散在不同網路的情境。只要每個服務能夠連接到路由器的公共 IP 位址,它們就可以相互通信。

Diagram: Multi-Network Communication

+-----------------+      +--------------------+
|    Machine 1    |      |   Public Internet  |
| (Network A)     |      |                    |
+-----------------+      +--------------------+
|                 |      |                    |
| +-------------+ |      |  +---------------+ |
| | echo_service| |<------>| Zenoh Router  |<------>
| +-------------+ |      |  |(Public IP)    | |
|                 |      |  +---------------+ |
| +-------------+ |      |                    |
| | client      | |<------>                  |
| +-------------+ |      |                    |
+-----------------+      +--------------------+
                         ^
                         |
+-----------------+      |
|    Machine 2    |      |
| (Network B)     |      |
+-----------------+      |
|                 |      |
| +-------------+ |      |
| |convert_service| -----/
| +-------------+ |
|                 |
| +-------------+ |
| |sensor_pub   | | -----\
| +-------------+ |      |
|                 |      |
| +-------------+ |      |
| |sensor_sub   | | <----/
| +-------------+ |
+-----------------+

執行方式

  1. 在公共伺服器啟動 Zenoh 路由器

    你需要一台具有公共 IP 位址的伺服器(例如雲端伺服器)。假設公共 IP 是 203.0.113.1,在此伺服器上啟動路由器:

    zenohd --listen tcp/0.0.0.0:7447
    
  2. 在機器 1(網路 A)配置並執行服務

    在第一台機器上,修改 echo_serviceclient 的程式碼,讓它們連接到公共路由器:

    let config = Config::from_file("config.json5").unwrap();
    let session = zenoh::open(config).await.unwrap();
    

    config.json5

    {
      connect: {
        endpoints: [
          "tcp/203.0.113.1:7447"
        ],
      },
      scouting: {
        multicast: {
          enabled: false,
        }
      }
    }
    

    然後執行服務:

    cargo run --bin echo_service
    cargo run --bin client
    
  3. 在機器 2(網路 B)配置並執行服務

    在第二台機器上,修改 convert_servicesensor_publishersensor_subscriber 的程式碼,連接到相同的公共路由器:

    let mut config = Config::default();
    config.connect.push("tcp/203.0.113.1:7447".to_string());
    let session = zenoh::open(config).await.unwrap();
    

    然後執行服務:

    cargo run --bin convert_service
    cargo run --bin sensor_publisher
    cargo run --bin sensor_subscriber
    

現在,機器 1 上的 client 將能夠查詢機器 2 上的 convert_service,而機器 2 上的 sensor_subscriber 會接收到同一台機器上 sensor_publisher 發佈的資料,整個過程透過公共路由器完成。


回顧

  • 多進程微服務:Zenoh 讓建立多進程的分散式系統變得容易。

  • 服務發現:Zenoh 提供兩種主要的發現模式:

    • 點對點模式 (透過多播來找尋) :簡單,無單點故障,但可能造成網路噪音。
    • 路由器模式:集中式發現,更可擴展,避免 multicast。
  • 通訊模式:Pub/Sub 與 Query/Reply 模式在所有模式下都能順暢運作。

  • 位置透明性:Zenoh 提供位置透明性,使服務跨網路通訊就像在同一網路中一樣。使用者只要專注在設定資料的Key Expression就可以找到對應的服務。


上一篇
Day 08: 🦀 用 Rust 來撰寫一個 Zenoh 微服務
下一篇
Day 10: Rust Macro 熱身:從 `macro_rules!` 到 Derive Macro
系列文
30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊10
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言