iT邦幫忙

2025 iThome 鐵人賽

DAY 7
0

Zenoh 會議層(Session Layer)

延續前一篇介紹的 連結層(Link Layer),今天我們將探索 Zenoh 的 會議層(Session Layer)
Session 層 是 Zenoh 應用程式互動的 核心 API。它抽象化了傳輸與路由的複雜性,使開發者能專注於資料語意:發布(Publish)、訂閱(Subscribe)、查詢(Query)以及資源管理。


建立與配置 Session

在深入介紹通訊原語(communication primitives)之前,先來看看如何建立與配置 Zenoh Session:

use zenoh::Config;

// 預設組態
let session = zenoh::open(Config::default()).await.unwrap();

// 自訂組態
let mut config = Config::default();
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
config.transport.unicast.max_sessions = Some(1000);

let session = zenoh::open(config).await.unwrap();

Session 層負責處理:

  • 組態管理 — 傳輸設定、安全性、模式(Peer/Router/Client)
  • 執行期初始化 — 建立網路連線與路由表
  • 資源生命週期 — 在 Session 關閉時自動清理
  • 鍵表達式最佳化 — 快取與高效的協定編碼

Session API 概觀

Session API 是 Zenoh 應用程式的主要對外介面,提供:

  • Session 生命週期管理 — 建立、配置與優雅終止
  • 通訊原語的宣告與管理PublisherSubscriberQueryable
  • 直接通訊操作putgetdelete 以操作資源
  • 活性(Liveliness)管理 — 偵測應用或資源是否仍在運行

Session 結構是應用程式中所有 Zenoh 操作的 核心調度者


核心通訊原語(Core Communication Primitives)

1. 發布者(Publisher, pub)

Publisher 用來將資料值發送到指定的資源鍵,其他有匹配訂閱的節點會自動接收更新。

// 建立 Publisher
let publisher = session.declare_publisher("demo/example").await.unwrap();

// 發布資料
publisher.put("Hello from Zenoh!").await.unwrap();

進階 Publisher 設定:

use zenoh::qos::{Priority, CongestionControl};
use zenoh::bytes::Encoding;

// 進階 Publisher 組態
let publisher = session.declare_publisher("sensor/temperature")
    .priority(Priority::RealTime)
    .congestion_control(CongestionControl::Block)
    .await.unwrap();

// 發布帶有中繼資料的資料
publisher
    .put("25.5")
    .encoding(Encoding::TEXT_PLAIN)
    .timestamp(zenoh_util::timestamp::Timestamp::now())
    .await.unwrap();

特色:

  • QoS 政策 (QoS Policy) — 優先權(Background, DataHigh, RealTime)、可靠性(Reliable, BestEffort)
  • 壅塞控制 (Congestion Control) — Block 或 Drop 策略
  • 編碼中繼資料 (Encoding) — 不同資料型別的序列化提示
  • 附加資料 (Attachment) — 每則訊息可攜帶使用者定義的額外中繼資料

2. 訂閱者(Subscriber, sub)

Subscriber 會在有匹配 Publisher 發送更新時接收資料。

// 訂閱某個 key
let subscriber = session.declare_subscriber("demo/example").await.unwrap();

// 持續接收資料
while let Ok(sample) = subscriber.recv_async().await {
    let payload = sample.payload().try_to_string().unwrap_or_else(|e| e.to_string().into());
    println!("Received: {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(), payload);
}

進階 Subscriber 設定:

// 設定 QoS 的 Subscriber
let subscriber = session.declare_subscriber("sensor/**")
    .reliability(zenoh::qos::Reliability::Reliable)
    .locality(zenoh::sample::Locality::Remote) // 僅接收遠端樣本
    .await.unwrap();

// 使用 callback 取代 polling
let _subscriber = session.declare_subscriber("sensor/**")
    .callback(|sample| {
        println!("Received: {} = {}", sample.key_expr(), sample.payload());
    })
    .await.unwrap();

特色:

  • 鍵表達式萬用字元*(單層)與 **(多層)
  • 處理方式靈活 — 可選擇 FIFO channel、環狀緩衝區(ring buffer)、或 callback
  • QoS 過濾 — 僅接收可靠或最佳努力訊息
  • 本地性控制 (Locality) — 僅本地、僅遠端、或全部樣本
  • 內建Backpressure控制 — 自動流量控制避免記憶體耗盡

3. 查詢(Query)與 可查詢(Queryable)

Zenoh 提供 分散式查詢,透過 getqueryable 原語:

  • 查詢(Query, get — 依需求擷取資料
  • 可查詢(Queryable) — 對傳入查詢動態回應
// 發出查詢
let replies = session.get("demo/info").await.unwrap();
while let Ok(reply) = replies.recv_async().await {
    match reply.result() {
        Ok(sample) => {
            let payload = sample.payload().try_to_string().unwrap_or_else(|e| e.to_string().into());
            println!("Got reply: ('{}': '{}')", sample.key_expr().as_str(), payload);
        }
        Err(err) => {
            let payload = err.payload().try_to_string().unwrap_or_else(|e| e.to_string().into());
            println!("Got error reply: {}", payload);
        }
    }
}

// 宣告一個 Queryable 來回應查詢
let queryable = session.declare_queryable("demo/info").await.unwrap();

tokio::spawn(async move {
    while let Ok(query) = queryable.recv_async().await {
        println!("Received Query: '{}'", query.selector());
        query.reply("demo/info", "Dynamic response").await.unwrap();
    }
});

進階查詢設定:

use zenoh::query::{QueryTarget, ConsolidationMode};
use std::time::Duration;

// 帶有進階選項的查詢
let replies = session.get("config/**")
    .target(QueryTarget::AllComplete) // 等待完整回應
    .consolidation(ConsolidationMode::Latest) // 只保留最新回覆
    .timeout(Duration::from_secs(5)) // 設定timeout
    .payload("payload") // 查詢payload
    .await.unwrap();

// 帶有完成標記的 Queryable
let queryable = session.declare_queryable("database/users/*")
    .complete(true) // 標註為此 key 空間的完整來源
    .await.unwrap();

tokio::spawn(async move {
    while let Ok(query) = queryable.recv_async().await {
        if let Some(user_id) = query.key_expr().as_str().split('/').last() {
            let user_data = format!("{{\"id\":\"{}\", \"name\":\"User {}\"}}", user_id, user_id);
            query.reply(query.key_expr().clone(), user_data)
                .encoding(Encoding::APPLICATION_JSON)
                .await.unwrap();
        }
    }
});

功能特色:

  • 查詢目標(Query Target) — BestMatching(最快)、All(全部)、AllComplete(完整回覆集合)
  • 回覆整合模式 — None、Latest、Monotonic,用於處理重複回覆
  • 超時控制 — 可設定查詢超時與回覆收集時間
  • Queryable 完整性 — 可標註 Queryable 為某範圍的權威來源
  • 分散式路由 — 自動多跳查詢傳遞與回覆聚合

4. 活性標記(Liveliness Tokens)

Zenoh 支援 活性檢測,讓應用程式監控哪些節點仍存活。

// 宣告一個活性標記
let token = session.liveliness().declare_token("demo/alive").await.unwrap();

// 在 session 存活期間保持標記
println!("Liveliness token declared. Others can detect I'm alive!");

活性查詢 可讓應用探索目前有哪些節點處於活躍狀態:

// 查詢目前的活性狀態
let replies = session.liveliness().get("group1/**").await.unwrap();
while let Ok(reply) = replies.recv_async().await {
    if let Ok(sample) = reply.result() {
        println!("Currently alive: {}", sample.key_expr());
    }
}

// 訂閱活性變化
let subscriber = session.liveliness().declare_subscriber("group1/**").await.unwrap();
while let Ok(sample) = subscriber.recv_async().await {
    match sample.kind() {
        zenoh::sample::SampleKind::Put => println!("'{}' 我跳進來了!", sample.key_expr()),
        zenoh::sample::SampleKind::Delete => println!("'{}' 我跳出去了!", sample.key_expr()),
    }
}

活性檢測功能:

  • 自動清理 — Session 關閉時,標記自動釋放
  • 網路失效檢測 — 偵測網路分割或節點故障
  • 階層化組織 — 使用鍵表達式進行邏輯分群
  • 即時狀態查詢 — 隨時探索目前存活節點

直接資料操作(Direct Data Operations)

除了 Publisher 與 Subscriber 之外,Zenoh 也支援對資源進行 直接操作

// 直接寫入一個值
session.put("demo/direct", "Direct put value").await.unwrap();

// 刪除一個資源
session.delete("demo/direct").await.unwrap();

直接操作 提供即時的資料互動,而不需要維護長時間存活的實體:

// 帶有選項的進階 put
session.put("config/database_url", "postgresql://localhost/mydb")
    .encoding(Encoding::TEXT_PLAIN)
    .priority(Priority::DataHigh)
    .congestion_control(CongestionControl::Block)
    .await.unwrap();

// 條件式操作(若儲存端支援)
session.put("counter?condition=true", "1")
    .await.unwrap();

// 帶時間戳記的刪除
session.delete("temp/cache")
    .timestamp(session.new_timestamp())
    .await.unwrap();

這些操作的特性包括:

  • 無狀態操作 — 適合一次性的操作,無需宣告 Publisher
  • 完整 QoS 支援 — 與 Publisher 相同的優先權與可靠性選項
  • 中繼資料支援 — 編碼(Encoding)、時間戳記與附件(Attachment)
  • 儲存整合 — 可與 Zenoh 的儲存外掛無縫配合

Session 資訊與管理

Session 提供執行階段的檢視與管理能力:

// 取得 Session 資訊
let info = session.info();
println!("Session ID: {}", info.zid());
println!("網路中的 Routers: {:?}", info.routers_zid().await);
println!("已連線的 Peers: {:?}", info.peers_zid().await);

// Key expression 最佳化
let optimized_ke = session.declare_keyexpr("sensor/temperature/room1").await.unwrap();
// 使用最佳化後的 key expression 以獲得更佳效能
let publisher = session.declare_publisher(&optimized_ke).await.unwrap();

// 優雅地關閉 session
session.close().await.unwrap();

Session 內部架構

Session 層位於 RuntimeTransport 層之上:

  • SessionInner — 管理核心的 session 狀態與通訊原語
  • Primitive handlers — Publisher、Subscriber、Queryable 的狀態機
  • Key expression 快取 — 針對線路協議進行編碼最佳化
  • QoS 執行 — 優先權排程與壅塞控制
  • Callback 執行 — 非同步任務與事件處理

Session 會自動處理:

  • 資源釋放 — 確保所有宣告的實體都能正確關閉
  • 傳輸多工 — 多個 session 可共享相同的網路連線
  • 線路協議最佳化 — Key expression 與路由會被自動優化
  • 錯誤傳遞 — 將傳輸層與網路錯誤上報給應用程式

總結

Session 層 是 Zenoh 應用程式的基石,提供:

核心通訊模式:

  • Pub/Sub — 高效能的資料分發並具備 QoS 保證
  • Query/Reply — 分散式請求-回覆,適合服務與資料庫查詢
  • Direct operations — 不需持續實體的即時資料存取
  • Liveliness monitoring — 內建的節點存活偵測與發現

進階功能:

  • QoS 政策 — 優先權、可靠性與壅塞控制
  • Key expression 最佳化 — 高效的線路協議與路由
  • 處理器彈性 — 支援 callbacks、channels 或 polling
  • Session 檢視 — 執行階段的網路拓撲與效能指標

開發效益:

  • 自動資源管理 — 無記憶體洩漏或資源清理問題
  • 傳輸獨立性 — 可運行於 TCP、UDP、QUIC、共享記憶體等多種傳輸
  • 可擴展架構 — 從嵌入式裝置到雲端部署皆可適用

藉由這些原語,開發者能構建出反應式、分散式、具容錯能力的系統,並能無縫擴展至嵌入式裝置、資料中心或雲-邊緣混合環境。Session 層將複雜性封裝起來,同時保留足夠的控制能力以支援效能關鍵的應用。


上一篇
Day 06: Zenoh 內部結構:理解 Link Layer(連結層)
下一篇
Day 08: 🦀 用 Rust 來撰寫一個 Zenoh 微服務
系列文
30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊10
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言