在這篇文章中,我們將把單一節點的微服務擴展成一個多進程應用程式。這將讓我們體驗 Zenoh 在服務發現與可擴展性上的強大功能。
我們將建立五個獨立的可執行檔(binaries):
echo_service
:一個簡單的服務,會回應接收到的任何訊息。convert_service
:將整數轉換成二進位表示的服務。sensor_publisher
:模擬溫度感測器並定期發布資料的發佈者。sensor_subscriber
:訂閱並接收溫度更新的訂閱者。client
:定期查詢 echo
和 convert
服務的客戶端。我們將探討微服務的三種不同運行情境:
zenohd
路由器進行服務發現,但通訊仍是點對點。我們將沿用前一章建立的 micro-service
專案,並把新的可執行檔放在 src/bin
目錄下。
Cargo.toml
請確保你的 Cargo.toml
檔案包含以下相依套件與 binary 定義:
[package]
name = "mirco-service"
version = "0.1.0"
edition = "2024"
[dependencies]
zenoh = "1.5.1"
tokio = { version = "1", features = ["full"] }
[[bin]]
name = "echo_service"
path = "src/bin/echo_service.rs"
[[bin]]
name = "convert_service"
path = "src/bin/convert_service.rs"
[[bin]]
name = "sensor_publisher"
path = "src/bin/sensor_publisher.rs"
[[bin]]
name = "sensor_subscriber"
path = "src/bin/sensor_subscriber.rs"
[[bin]]
name = "client"
path = "src/bin/client.rs"
在此模式下,每個 Zenoh 節點會使用多播來發現同一網路中的其他節點。
這是預設行為,不需要任何中間的router。
+------------------+
| Network (LAN) |
+------------------+
^ ^ ^ ^
| | | | Multicast Discovery
+-----------------+ | | | | +--------------------+
| echo_service |<----/ | | \---->| sensor_publisher |
+-----------------+ | | +--------------------+
| |
+------------------+ | | +---------------------+
| convert_service |<-------/ \-------->| sensor_subscriber |
+------------------+ +---------------------+
^ ^
|
\------------------------------------------/
|
+--------------+
| client |
+--------------+
以下是每個檔案的程式碼範例。
echo_service.rs
use zenoh::Config;
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).await.unwrap();
println!("Echo service started!");
let echo = session.declare_queryable("service/echo").await.unwrap();
tokio::spawn(async move {
while let Ok(query) = echo.recv_async().await {
let msg = query.payload()
.map(|p| p.try_to_string().unwrap_or_default())
.unwrap_or_default();
println!("[Echo service] Received request: {}", msg);
query.reply(query.key_expr().clone(), format!("Echo: {}", msg)).await.unwrap();
}
});
loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; }
}
convert_service.rs
use zenoh::Config;
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).await.unwrap();
println!("Convert service started!");
let convert = session.declare_queryable("service/convert").await.unwrap();
tokio::spawn(async move {
while let Ok(query) = convert.recv_async().await {
let msg = query.payload()
.map(|p| p.try_to_string().unwrap_or_default())
.unwrap_or_default();
println!("[Convert service] Received request: {}", msg);
let reply = msg.parse::<i64>()
.map(|v| format!("0b{:b}", v))
.unwrap_or_else(|_| "Error: not an integer".into());
query.reply(query.key_expr().clone(), reply).await.unwrap();
}
});
loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; }
}
sensor_publisher.rs
use zenoh::Config;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).await.unwrap();
let publisher = session.declare_publisher("sensor/temperature").await.unwrap();
let mut value = 25.0;
loop {
let msg = format!("Temp = {:.1}", value);
publisher.put(msg.clone()).await.unwrap();
println!("[Publisher] {}", msg);
value += 0.1;
sleep(Duration::from_secs(2)).await;
}
}
sensor_subscriber.rs
use zenoh::Config;
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).await.unwrap();
let subscriber = session.declare_subscriber("sensor/**").await.unwrap();
while let Ok(sample) = subscriber.recv_async().await {
let payload = sample.payload().try_to_string().unwrap_or_default().to_string();
println!("[Subscriber] '{}' -> {}", sample.key_expr(), payload);
}
}
client.rs
use zenoh::Config;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).await.unwrap();
sleep(Duration::from_secs(5)).await; // wait for services
let mut counter = 0;
loop {
counter += 1;
let replies = session.get("service/echo")
.payload(format!("Hello Zenoh! #{}", counter))
.await.unwrap();
while let Ok(reply) = replies.recv_async().await {
if let Ok(sample) = reply.result() {
println!("[Client] Echo reply: {}", sample.payload().try_to_string().unwrap());
}
}
let test_value = 42 + counter;
let replies = session.get("service/convert")
.payload(test_value.to_string())
.await.unwrap();
while let Ok(reply) = replies.recv_async().await {
if let Ok(sample) = reply.result() {
println!("[Client] Convert reply: {}", sample.payload().try_to_string().unwrap());
}
}
sleep(Duration::from_secs(3)).await;
}
}
開啟五個不同的終端機,並分別執行每個程式:
cargo run --bin echo_service
cargo run --bin convert_service
cargo run --bin sensor_publisher
cargo run --bin sensor_subscriber
cargo run --bin client
這些節點將透過多播自動互相發現,您將看到 client 與服務互動,以及 subscriber 接收來自 publisher 的資料。
zenohd
)在此模式下,我們使用一個 zenohd
實例作為集中式路由器來進行服務發現。這在較大規模的部署中,或當多播不可用或不適合使用時特別有用。
+-----------------+ +--------------------+ +---------------------+
| echo_service | | convert_service | | sensor_publisher |
+-----------------+ +--------------------+ +---------------------+
| | |
\------------------------|----------------------------/
|
+------------------+
| Zenoh Router |
| (`zenohd`) |
+------------------+
|
/------------------------|----------------------------\
| | |
+-----------------+ +--------------------+ +---------------------+
| client | | sensor_subscriber | | (other services) |
+-----------------+ +--------------------+ +---------------------+
啟動 Zenoh 路由器
首先,你需要安裝 zenohd
。如果尚未安裝,可以從 Zenoh 官方網站 下載。
執行以下命令啟動路由器:
zenohd --listen tcp/0.0.0.0:7447
修改程式碼
在五個執行檔中,修改 Zenoh session 設定以連接到路由器:
let config = Config::from_file("config.json5").unwrap();
let session = zenoh::open(config).await.unwrap();
config.json5
{
connect: {
endpoints: [
"tcp/127.0.0.1:7447"
],
},
scouting: {
multicast: {
enabled: false,
}
}
}
執行各個執行檔
接下來,在不同的終端機中運行五個執行檔。它們會透過路由器進行服務發現,然後相互通信。一旦建立連接後,就算中間的zenohd斷掉了也還是可以正常通訊。
Zenoh 的路由器模式非常適合服務分散在不同網路的情境。只要每個服務能夠連接到路由器的公共 IP 位址,它們就可以相互通信。
+-----------------+ +--------------------+
| Machine 1 | | Public Internet |
| (Network A) | | |
+-----------------+ +--------------------+
| | | |
| +-------------+ | | +---------------+ |
| | echo_service| |<------>| Zenoh Router |<------>
| +-------------+ | | |(Public IP) | |
| | | +---------------+ |
| +-------------+ | | |
| | client | |<------> |
| +-------------+ | | |
+-----------------+ +--------------------+
^
|
+-----------------+ |
| Machine 2 | |
| (Network B) | |
+-----------------+ |
| | |
| +-------------+ | |
| |convert_service| -----/
| +-------------+ |
| |
| +-------------+ |
| |sensor_pub | | -----\
| +-------------+ | |
| | |
| +-------------+ | |
| |sensor_sub | | <----/
| +-------------+ |
+-----------------+
在公共伺服器啟動 Zenoh 路由器
你需要一台具有公共 IP 位址的伺服器(例如雲端伺服器)。假設公共 IP 是 203.0.113.1
,在此伺服器上啟動路由器:
zenohd --listen tcp/0.0.0.0:7447
在機器 1(網路 A)配置並執行服務
在第一台機器上,修改 echo_service
和 client
的程式碼,讓它們連接到公共路由器:
let config = Config::from_file("config.json5").unwrap();
let session = zenoh::open(config).await.unwrap();
config.json5
{
connect: {
endpoints: [
"tcp/203.0.113.1:7447"
],
},
scouting: {
multicast: {
enabled: false,
}
}
}
然後執行服務:
cargo run --bin echo_service
cargo run --bin client
在機器 2(網路 B)配置並執行服務
在第二台機器上,修改 convert_service
、sensor_publisher
和 sensor_subscriber
的程式碼,連接到相同的公共路由器:
let mut config = Config::default();
config.connect.push("tcp/203.0.113.1:7447".to_string());
let session = zenoh::open(config).await.unwrap();
然後執行服務:
cargo run --bin convert_service
cargo run --bin sensor_publisher
cargo run --bin sensor_subscriber
現在,機器 1 上的 client 將能夠查詢機器 2 上的 convert_service
,而機器 2 上的 sensor_subscriber
會接收到同一台機器上 sensor_publisher
發佈的資料,整個過程透過公共路由器完成。
多進程微服務:Zenoh 讓建立多進程的分散式系統變得容易。
服務發現:Zenoh 提供兩種主要的發現模式:
通訊模式:Pub/Sub 與 Query/Reply 模式在所有模式下都能順暢運作。
位置透明性:Zenoh 提供位置透明性,使服務跨網路通訊就像在同一網路中一樣。使用者只要專注在設定資料的Key Expression就可以找到對應的服務。