繼上一篇 Day 22: 使用 Rust 剖析 Zenoh:深入了解 Wireshark 插件 zenoh-dissector 之後,今天我們來深入探討一下 Protocol 細節吧!
Zenoh Protocol是一種先進的 pub/sub 及查詢/回復訊息協定,設計上能同時高效支援資源受限的物聯網裝置與高通量資料中心應用。本文將探討其分層架構與實作細節。
Zenoh Protocol以清楚分層為核心設計,兼顧分工與運作效率。Protocol 包含兩層:
每一層皆有獨立職責與wire format,但又能協作實現 Zenoh 的訊息能力。
傳輸層構成所有 Zenoh 通訊的基礎,通過 TransportMessage
結構處理連線生命週期、可靠性、訊息包裝等。
Session 管理訊息:
**InitSyn/InitAck **:在特定locator進行 Session 啟動握手
whatami
)、批次大小OpenSyn/OpenAck (0x02):完成成功 INIT 後 final link 初始化
Close (0x03):結束 session 或 link
KeepAlive (0x04):心跳機制
Join (0x07):router-to-router 聯邦,多播通訊用
數據框架訊息:
Frame (0x05):主要的訊息包裝型別
NetworkMessage
Fragment (0x06):處理超大訊息
NetworkMessage
分片不同通訊型態,transport message 限制不同:
pub mod batch_size {
pub const UNICAST: BatchSize = 65535; // 單播最大
pub const MULTICAST: BatchSize = 8192; // 多播 8KB
}
網路層於 Frame
訊息 payload 實現核心 pub/sub 與查詢/回復功能。網路訊息用 0x19-0x1f ID 空間,避免與傳輸 ID 衝突。
Push (0x1d):發佈資料訊息
Put
/Del
操作Request (0x1c):發出查詢請求
RequestId
供回應對應Response (0x1b):個別查詢回應
RequestId
連結(32 bit,協商於 session)ResponseBody
,實際資料ResponseFinal (0x1a):串流查詢結束標記
Interest (0x19):發現暨訂閱管理
Declare (0x1e):資源管理
網路訊息具高效 key expr 編碼方式:
訊息有效資料定義於網路訊息內:
Push 訊息 Payload (PushBody
):
ZBuf
)Request 訊息 Payload (RequestBody
):
Response 訊息 Payload (ResponseBody
):
PushBody
數據所有層都常見的型別:
RequestId
、ExprId
、SubscriberId
管理訊息關聯與資源ext_*
metadata,供 QoS、timestamp、路由等CongestionControl
、Priority
、Reliability
枚舉提供 QoS 設定zenoh-codec
crate 提供簡潔的 trait 為主的序列化設計:
// 讀 operation - 反序列化
pub trait RCodec<Message, Buffer> {
type Error;
fn read(self, buffer: Buffer) -> Result<Message, Self::Error>;
}
// 寫 operation - 序列化
pub trait WCodec<Message, Buffer> {
type Output;
fn write(self, buffer: Buffer, message: Message) -> Self::Output;
}
// 長度計算 - 缓冲區預分配
pub trait LCodec<Message> {
fn w_len(self, message: Message) -> usize;
}
Zenoh080
struct 實作這些 trait,對應Protocol版本。
codec 流程非常有效:
codec 架構採精益求精原則:
完整建立 session 需多重 message:
A B
| InitSyn |
|------------------>| A=0, version, whatami, ZID
| |
| InitAck |
|<------------------| A=1, accepted parameters
| |
| OpenSyn |
|------------------>| A=0, lease period, initial_sn, cookie
| |
| OpenAck |
|<------------------| A=1, lease period, initial_sn
| |
| Frame(...) |
|<----------------->| 雙向數據傳輸
大訊息需分片:
A B
| Fragment(M=1, #1) |
|------------------>| 序列起始片段
| Fragment(M=1, #2) |
|------------------>| 中間片段
| Fragment(M=1, #3) |
|------------------>| 另一中片段
| Fragment(M=0, #4) |
|------------------>| 最後片段 (M=0)
| | B 組回完整訊息
Zenoh 用 Interest 進行發現:
A B
| Interest |
|------------------->| mode=Current, keyexpr="sensors/*"
| |
| Declare(Subscriber)|
|<-------------------| interest_id=456, "sensors/temp"
| Declare(Subscriber)|
|<-------------------| interest_id=456, "sensors/pressure"
| DeclareFinal |
|<-------------------| interest_id=456, end of declarations
多回查詢顯示對應關聯:
A B
| Request(Query) |
|------------------>| RequestId=123, keyexpr="db/*"
| |
| Response(Reply) |
|<------------------| RequestId=123, "db/users" data
| Response(Reply) |
|<------------------| RequestId=123, "db/products" data
| ResponseFinal |
|<------------------| RequestId=123, end of results
發佈端(出自 examples/z_pub.rs
):
// 應用程式碼
let publisher = session.declare_publisher(&key_expr).await.unwrap();
publisher
.put(buf)
.encoding(Encoding::TEXT_PLAIN)
.attachment(attachment.clone())
.await
.unwrap();
Protocol堆疊如下:
訂閱端(出自 examples/z_sub.rs
):
// 應用程式碼
let subscriber = session.declare_subscriber(&key_expr).await.unwrap();
while let Ok(sample) = subscriber.recv_async().await {
let payload = sample.payload().try_to_string().unwrap();
println!("Received: {}", payload);
}
對應流程:
Zenoh Protocol展示了如何以分層設計與 trait 為基礎序列化,打造橫跨 diverse 場景的高效 robust 訊息系統。重點亮點:
ZSlice
/ZBuf
杜絕不必要的記憶體配置