延續前一篇介紹的 連結層(Link Layer),今天我們將探索 Zenoh 的 會議層(Session Layer)。
Session 層 是 Zenoh 應用程式互動的 核心 API。它抽象化了傳輸與路由的複雜性,使開發者能專注於資料語意:發布(Publish)、訂閱(Subscribe)、查詢(Query)以及資源管理。
在深入介紹通訊原語(communication primitives)之前,先來看看如何建立與配置 Zenoh Session:
use zenoh::Config;
// 預設組態
let session = zenoh::open(Config::default()).await.unwrap();
// 自訂組態
let mut config = Config::default();
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
config.transport.unicast.max_sessions = Some(1000);
let session = zenoh::open(config).await.unwrap();
Session 層負責處理:
Session API 是 Zenoh 應用程式的主要對外介面,提供:
put
、get
、delete
以操作資源Session
結構是應用程式中所有 Zenoh 操作的 核心調度者。
Publisher 用來將資料值發送到指定的資源鍵,其他有匹配訂閱的節點會自動接收更新。
// 建立 Publisher
let publisher = session.declare_publisher("demo/example").await.unwrap();
// 發布資料
publisher.put("Hello from Zenoh!").await.unwrap();
進階 Publisher 設定:
use zenoh::qos::{Priority, CongestionControl};
use zenoh::bytes::Encoding;
// 進階 Publisher 組態
let publisher = session.declare_publisher("sensor/temperature")
.priority(Priority::RealTime)
.congestion_control(CongestionControl::Block)
.await.unwrap();
// 發布帶有中繼資料的資料
publisher
.put("25.5")
.encoding(Encoding::TEXT_PLAIN)
.timestamp(zenoh_util::timestamp::Timestamp::now())
.await.unwrap();
特色:
Subscriber 會在有匹配 Publisher 發送更新時接收資料。
// 訂閱某個 key
let subscriber = session.declare_subscriber("demo/example").await.unwrap();
// 持續接收資料
while let Ok(sample) = subscriber.recv_async().await {
let payload = sample.payload().try_to_string().unwrap_or_else(|e| e.to_string().into());
println!("Received: {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(), payload);
}
進階 Subscriber 設定:
// 設定 QoS 的 Subscriber
let subscriber = session.declare_subscriber("sensor/**")
.reliability(zenoh::qos::Reliability::Reliable)
.locality(zenoh::sample::Locality::Remote) // 僅接收遠端樣本
.await.unwrap();
// 使用 callback 取代 polling
let _subscriber = session.declare_subscriber("sensor/**")
.callback(|sample| {
println!("Received: {} = {}", sample.key_expr(), sample.payload());
})
.await.unwrap();
特色:
*
(單層)與 **
(多層)Zenoh 提供 分散式查詢,透過 get
與 queryable
原語:
get
) — 依需求擷取資料// 發出查詢
let replies = session.get("demo/info").await.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.result() {
Ok(sample) => {
let payload = sample.payload().try_to_string().unwrap_or_else(|e| e.to_string().into());
println!("Got reply: ('{}': '{}')", sample.key_expr().as_str(), payload);
}
Err(err) => {
let payload = err.payload().try_to_string().unwrap_or_else(|e| e.to_string().into());
println!("Got error reply: {}", payload);
}
}
}
// 宣告一個 Queryable 來回應查詢
let queryable = session.declare_queryable("demo/info").await.unwrap();
tokio::spawn(async move {
while let Ok(query) = queryable.recv_async().await {
println!("Received Query: '{}'", query.selector());
query.reply("demo/info", "Dynamic response").await.unwrap();
}
});
進階查詢設定:
use zenoh::query::{QueryTarget, ConsolidationMode};
use std::time::Duration;
// 帶有進階選項的查詢
let replies = session.get("config/**")
.target(QueryTarget::AllComplete) // 等待完整回應
.consolidation(ConsolidationMode::Latest) // 只保留最新回覆
.timeout(Duration::from_secs(5)) // 設定timeout
.payload("payload") // 查詢payload
.await.unwrap();
// 帶有完成標記的 Queryable
let queryable = session.declare_queryable("database/users/*")
.complete(true) // 標註為此 key 空間的完整來源
.await.unwrap();
tokio::spawn(async move {
while let Ok(query) = queryable.recv_async().await {
if let Some(user_id) = query.key_expr().as_str().split('/').last() {
let user_data = format!("{{\"id\":\"{}\", \"name\":\"User {}\"}}", user_id, user_id);
query.reply(query.key_expr().clone(), user_data)
.encoding(Encoding::APPLICATION_JSON)
.await.unwrap();
}
}
});
功能特色:
Zenoh 支援 活性檢測,讓應用程式監控哪些節點仍存活。
// 宣告一個活性標記
let token = session.liveliness().declare_token("demo/alive").await.unwrap();
// 在 session 存活期間保持標記
println!("Liveliness token declared. Others can detect I'm alive!");
活性查詢 可讓應用探索目前有哪些節點處於活躍狀態:
// 查詢目前的活性狀態
let replies = session.liveliness().get("group1/**").await.unwrap();
while let Ok(reply) = replies.recv_async().await {
if let Ok(sample) = reply.result() {
println!("Currently alive: {}", sample.key_expr());
}
}
// 訂閱活性變化
let subscriber = session.liveliness().declare_subscriber("group1/**").await.unwrap();
while let Ok(sample) = subscriber.recv_async().await {
match sample.kind() {
zenoh::sample::SampleKind::Put => println!("'{}' 我跳進來了!", sample.key_expr()),
zenoh::sample::SampleKind::Delete => println!("'{}' 我跳出去了!", sample.key_expr()),
}
}
活性檢測功能:
除了 Publisher 與 Subscriber 之外,Zenoh 也支援對資源進行 直接操作:
// 直接寫入一個值
session.put("demo/direct", "Direct put value").await.unwrap();
// 刪除一個資源
session.delete("demo/direct").await.unwrap();
直接操作 提供即時的資料互動,而不需要維護長時間存活的實體:
// 帶有選項的進階 put
session.put("config/database_url", "postgresql://localhost/mydb")
.encoding(Encoding::TEXT_PLAIN)
.priority(Priority::DataHigh)
.congestion_control(CongestionControl::Block)
.await.unwrap();
// 條件式操作(若儲存端支援)
session.put("counter?condition=true", "1")
.await.unwrap();
// 帶時間戳記的刪除
session.delete("temp/cache")
.timestamp(session.new_timestamp())
.await.unwrap();
這些操作的特性包括:
Session
提供執行階段的檢視與管理能力:
// 取得 Session 資訊
let info = session.info();
println!("Session ID: {}", info.zid());
println!("網路中的 Routers: {:?}", info.routers_zid().await);
println!("已連線的 Peers: {:?}", info.peers_zid().await);
// Key expression 最佳化
let optimized_ke = session.declare_keyexpr("sensor/temperature/room1").await.unwrap();
// 使用最佳化後的 key expression 以獲得更佳效能
let publisher = session.declare_publisher(&optimized_ke).await.unwrap();
// 優雅地關閉 session
session.close().await.unwrap();
Session 層位於 Runtime 與 Transport 層之上:
Session 會自動處理:
Session 層 是 Zenoh 應用程式的基石,提供:
核心通訊模式:
進階功能:
開發效益:
藉由這些原語,開發者能構建出反應式、分散式、具容錯能力的系統,並能無縫擴展至嵌入式裝置、資料中心或雲-邊緣混合環境。Session 層將複雜性封裝起來,同時保留足夠的控制能力以支援效能關鍵的應用。