在前一篇文章中,我們先回顧了 Rust 裡的 macro_rules!
與 proc_macro
。
今天,讓我們更深入探討 Zenoh 本身是如何利用巨集 來驅動其最核心的組件之一:ZRuntime
併發模型。
ZRuntime
?現代分散式系統需要強健且細緻的併發控制。
在 Zenoh 中,像是:
這些任務都有不同的效能需求。若把它們全部丟進同一個執行緒池,某一部分的繁重工作可能會導致另一部分的重要操作被餓死。
這就是 ZRuntime
的角色:
它是一個 enum,建模了多個邏輯執行環境,每個環境都有可調整的參數(執行緒數量、阻塞限制)。
以下是 zenoh-runtime
中的實際定義:
#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, RegisterParam, Deserialize)]
#[param(RuntimeParam)]
pub enum ZRuntime {
#[serde(rename = "app")]
#[param(worker_threads = 1)]
Application,
#[serde(rename = "acc")]
#[param(worker_threads = 1)]
Acceptor,
#[serde(rename = "tx")]
#[param(worker_threads = 1)]
TX,
#[serde(rename = "rx")]
#[param(worker_threads = 2)]
RX,
#[serde(rename = "net")]
#[param(worker_threads = 1)]
Net,
}
需要注意的地方:
#[serde(rename = "...")]
→ 將 enum 變體對應到 RON 設定檔中的鍵值。#[param(...)]
→ 編碼預設值(例如 RX 預設 2 個工作執行緒)。#[param(RuntimeParam)]
(在 enum 上)→ 將此 enum 與其設定結構體關聯。使用者可以完全透過環境變數 ZENOH_RUNTIME
來設定 runtimes。
範例如下:
ZENOH_RUNTIME='(
rx: (handover: app),
acc: (handover: app),
app: (worker_threads: 2),
tx: (max_blocking_threads: 1)
)'
這段 RON 字串會被解析成 RuntimeParam
結構:
#[derive(Deserialize, Debug, GenericRuntimeParam)]
#[serde(deny_unknown_fields, default)]
pub struct RuntimeParam {
pub worker_threads: usize,
pub max_blocking_threads: usize,
pub handover: Option<ZRuntime>,
}
連接 ZRuntime
、RuntimeParam
與全域環境解析邏輯的關鍵,是兩個自訂的 derive 巨集:
#[derive(GenericRuntimeParam)]
→ 套用在 RuntimeParam
結構上
→ 產生一個 輔助結構,讓 serde 能注入每個變體的預設值
#[derive(RegisterParam)]
→ 套用在 ZRuntime
enum 上
→ 會產生:
* 每個變體的預設提供者
* 全域設定載入器 (lazy_static!
)
* 針對遍歷、初始化與參數借用的實作區塊
為了清楚理解,我們將 Zenoh 的設定簡化成一個玩具模型。
params.rs
#[derive(Debug, GenericRuntimeParam)]
pub struct MyParams {
pub threads: usize,
}
impl Default for MyParams {
fn default() -> Self {
Self { threads: 1 }
}
}
runtime.rs
#[derive(Debug, RegisterParam)]
#[param(MyParams)]
pub enum MyRuntime {
#[serde(rename = "app")]
#[param(threads = 1)]
Application,
#[serde(rename = "net")]
#[param(threads = 2)]
Network,
}
這就是 全部 開發者需要寫的程式碼,其餘交給巨集處理,非常的方便呢!
GenericRuntimeParam
這個 derive 會建立一個 泛型輔助結構:
struct MyParamsHelper<T>
where
T: DefaultParam,
{
pub threads: usize,
_phantom: PhantomData<T>,
}
impl<T> Default for MyParamsHelper<T>
where
T: DefaultParam,
{
fn default() -> Self {
T::param().into()
}
}
這讓 serde 可以說:
「如果設定檔中沒有提到 app
,那就呼叫 DefaultParamOfApplication::param()
吧。」
RegisterParam
這個 enum 巨集會做更多事:
定義 DefaultParam
trait:
trait DefaultParam {
fn param() -> MyParams;
}
為每個變體產生結構並實作它:
struct DefaultParamOfApplication;
impl DefaultParam for DefaultParamOfApplication {
fn param() -> MyParams {
MyParams { threads: 1, ..Default::default() }
}
}
struct DefaultParamOfNetwork;
impl DefaultParam for DefaultParamOfNetwork {
fn param() -> MyParams {
MyParams { threads: 2, ..Default::default() }
}
}
建立給 serde 使用的設定結構:
struct AbstractRuntimeParam {
#[serde(default)]
app: MyParamsHelper<DefaultParamOfApplication>,
#[serde(default)]
net: MyParamsHelper<DefaultParamOfNetwork>,
}
構建最終全域設定:
pub struct GlobalRuntimeParam {
pub app: MyParams,
pub net: MyParams,
}
lazy_static! {
pub static ref ZRUNTIME_PARAM: GlobalRuntimeParam = parse_env_var().unwrap();
}
加入 trait 實作:
impl Borrow<MyParams> for MyRuntime {
fn borrow(&self) -> &MyParams {
match self {
MyRuntime::Application => &ZRUNTIME_PARAM.app,
MyRuntime::Network => &ZRUNTIME_PARAM.net,
}
}
}
假設程式以以下設定執行:
ZENOH_RUNTIME='(net: (threads: 8))'
啟動時,lazy_static!
將其解析為 AbstractRuntimeParam
。
net
→ 明確設定:threads = 8
。
app
→ 未設定,因此 serde 呼叫 MyParamsHelper<DefaultParamOfApplication>::default()
。
最終 ZRUNTIME_PARAM
為:
app: MyParams { threads: 1 }
net: MyParams { threads: 8 }
後續程式碼只需這樣呼叫:
let params: &MyParams = MyRuntime::Network.borrow();
println!("{}", params.threads); // 8
Zenoh 的真實巨集遵循相同模式,只是規模更大:
GenericRuntimeParam
確保每個 runtime 都能內建預設值RegisterParam
將 enum 變體、serde 解析、環境設定與全域初始化串接起來這使得 Zenoh 的併發模型既符合人體工學又高度可調。
除了設定層,Zenoh 的 runtime 系統還包含多個關鍵基礎元件,用來管理執行環境實例並提供安全的存取模式。
ZRUNTIME_POOL
是一個全域 lazy-static,管理所有 runtime 實例:
lazy_static! {
pub static ref ZRUNTIME_POOL: ZRuntimePool = ZRuntimePool::new();
}
ZRuntimePool
是 HashMap<ZRuntime, OnceLock<Runtime>>
的封裝:
pub struct ZRuntimePool(HashMap<ZRuntime, OnceLock<Runtime>>);
impl ZRuntimePool {
fn new() -> Self {
Self(ZRuntime::iter().map(|zrt| (zrt, OnceLock::new())).collect())
}
pub fn get(&self, zrt: &ZRuntime) -> &Handle {
// 檢查 handover 設定
let param: &RuntimeParam = zrt.borrow();
let zrt = match param.handover {
Some(handover) => handover,
None => *zrt,
};
// Lazy 初始化 runtime
self.0
.get(&zrt)
.unwrap()
.get_or_init(|| {
zrt.init()
.unwrap_or_else(|_| panic!("Failed to init {zrt}"))
})
.handle()
}
}
主要特性:
ZRUNTIME_INDEX
維護每個 runtime 的執行緒命名計數器:
lazy_static! {
pub static ref ZRUNTIME_INDEX: HashMap<ZRuntime, AtomicUsize> = ZRuntime::iter()
.map(|zrt| (zrt, AtomicUsize::new(0)))
.collect();
}
這在 runtime 建構器中用來建立唯一的執行緒名稱:
impl RuntimeParam {
pub fn build(&self, zrt: ZRuntime) -> Result<Runtime> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.worker_threads)
.max_blocking_threads(self.max_blocking_threads)
.enable_io()
.enable_time()
.thread_name_fn(move || {
let id = ZRUNTIME_INDEX
.get(&zrt)
.unwrap()
.fetch_add(1, Ordering::SeqCst);
format!("{zrt}-{id}")
})
.build()?;
Ok(rt)
}
}
產生的執行緒名稱如:
Application-0
, Application-1
, …RX-0
, RX-1
, …TX-0
, Net-0
, …讓除錯與效能剖析更容易。
ZRuntimePoolGuard
是 RAII 守衛,用來確保全域 runtime 資源被正確清理:
pub struct ZRuntimePoolGuard;
impl Drop for ZRuntimePoolGuard {
fn drop(&mut self) {
unsafe {
std::mem::drop((ZRUNTIME_POOL.deref() as *const ZRuntimePool).read());
std::mem::drop(
(ZRUNTIME_INDEX.deref() as *const HashMap<ZRuntime, AtomicUsize>).read(),
);
}
}
}
為什麼需要它?
Rust 的 lazy_static!
產生的靜態變數不會自動被 drop。大多數應用程式這沒問題,因為作業系統會在程序結束時清理資源。
但在某些情境(例如測試、動態程式庫、嵌入式系統),你可能需要明確清理。
ZRuntimePoolGuard
提供顯式釋放資源的方法,例如:
fn main() {
// 應用程式邏輯
// 結束前進行清理
let _guard = ZRuntimePoolGuard;
// 當 _guard 離開作用域時,會清理全域資源
}
安全性注意:這裡的 unsafe
會手動讀取並 drop 靜態參考。必須保證之後不再有程式碼存取這些靜態變數。
ZRuntime
enum 也提供方便的方法來在適當的 runtime 上排程任務或執行阻塞操作。
spawn
方法允許在指定的 runtime 上執行 future:
impl ZRuntime {
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[cfg(feature = "tracing-instrument")]
let future = tracing::Instrument::instrument(future, tracing::Span::current());
self.deref().spawn(future)
}
}
使用範例:
// 在 RX runtime 上排程任務
let handle = ZRuntime::RX.spawn(async {
process_incoming_data().await
});
// 在 TX runtime 上排程任務
ZRuntime::TX.spawn(async {
send_data_to_peers().await
});
主要特性:
tracing-instrument
功能時,自動傳遞 spanSend + 'static
以確保跨執行緒安全block_in_place
允許執行阻塞操作而不會卡住整個 runtime 的事件迴圈:
impl ZRuntime {
pub fn block_in_place<F, R>(&self, f: F) -> R
where
F: Future<Output = R>,
{
match Handle::try_current() {
Ok(handle) => {
if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
panic!("Zenoh runtime doesn't support Tokio's current thread scheduler. Please use multi thread scheduler instead, e.g. a multi thread scheduler with one worker thread: `#[tokio::main(flavor = \"multi_thread\", worker_threads = 1)]`");
}
}
Err(e) => {
if e.is_thread_local_destroyed() {
panic!("The Thread Local Storage inside Tokio is destroyed. This might happen when Zenoh API is called at process exit, e.g. in the atexit handler. Calling the Zenoh API at process exit is not supported and should be avoided.");
}
}
}
#[cfg(feature = "tracing-instrument")]
let f = tracing::Instrument::instrument(f, tracing::Span::current());
tokio::task::block_in_place(move || self.block_on(f))
}
}
使用範例:
// 在 Application runtime 中執行阻塞操作
let result = ZRuntime::Application.block_in_place(async {
heavy_synchronous_computation().await
});
安全檢查:
適用場景:
注意:若在 Tokio 的 current-thread runtime 或進程結束時呼叫,會觸發 panic,因為這些情境與 Zenoh 的執行緒模型不相容。