iT邦幫忙

2025 iThome 鐵人賽

DAY 11
0

Zenoh 的Runtime: ZRuntime

在前一篇文章中,我們先回顧了 Rust 裡的 macro_rules!proc_macro
今天,讓我們更深入探討 Zenoh 本身是如何利用巨集 來驅動其最核心的組件之一:ZRuntime 併發模型


為什麼需要 ZRuntime

現代分散式系統需要強健且細緻的併發控制。
在 Zenoh 中,像是:

  • 應用邏輯 (Application logic)
  • 接受新的連接 (Link acceptor)
  • 傳送 (TX, transmit)
  • 接收 (RX, receive)
  • 一般網路工作 (General networking)

這些任務都有不同的效能需求。若把它們全部丟進同一個執行緒池,某一部分的繁重工作可能會導致另一部分的重要操作被餓死。

這就是 ZRuntime 的角色:
它是一個 enum,建模了多個邏輯執行環境,每個環境都有可調整的參數(執行緒數量、阻塞限制)。


核心 Enum

以下是 zenoh-runtime 中的實際定義:

#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, RegisterParam, Deserialize)]
#[param(RuntimeParam)]
pub enum ZRuntime {
    #[serde(rename = "app")]
    #[param(worker_threads = 1)]
    Application,

    #[serde(rename = "acc")]
    #[param(worker_threads = 1)]
    Acceptor,

    #[serde(rename = "tx")]
    #[param(worker_threads = 1)]
    TX,

    #[serde(rename = "rx")]
    #[param(worker_threads = 2)]
    RX,

    #[serde(rename = "net")]
    #[param(worker_threads = 1)]
    Net,
}

需要注意的地方:

  • #[serde(rename = "...")] → 將 enum 變體對應到 RON 設定檔中的鍵值。
  • #[param(...)] → 編碼預設值(例如 RX 預設 2 個工作執行緒)。
  • #[param(RuntimeParam)](在 enum 上)→ 將此 enum 與其設定結構體關聯。

透過環境變數進行設定

使用者可以完全透過環境變數 ZENOH_RUNTIME 來設定 runtimes。
範例如下:

ZENOH_RUNTIME='(
  rx: (handover: app),
  acc: (handover: app),
  app: (worker_threads: 2),
  tx: (max_blocking_threads: 1)
)'

這段 RON 字串會被解析成 RuntimeParam 結構:

#[derive(Deserialize, Debug, GenericRuntimeParam)]
#[serde(deny_unknown_fields, default)]
pub struct RuntimeParam {
    pub worker_threads: usize,
    pub max_blocking_threads: usize,
    pub handover: Option<ZRuntime>,
}

祕訣所在:Zenoh 的衍生巨集

連接 ZRuntimeRuntimeParam 與全域環境解析邏輯的關鍵,是兩個自訂的 derive 巨集:

  1. #[derive(GenericRuntimeParam)]
    → 套用在 RuntimeParam 結構上
    → 產生一個 輔助結構,讓 serde 能注入每個變體的預設值

  2. #[derive(RegisterParam)]
    → 套用在 ZRuntime enum 上
    → 會產生:
    * 每個變體的預設提供者
    * 全域設定載入器 (lazy_static!)
    * 針對遍歷、初始化與參數借用的實作區塊


範例

為了清楚理解,我們將 Zenoh 的設定簡化成一個玩具模型。

開發者編寫的程式碼

params.rs

#[derive(Debug, GenericRuntimeParam)]
pub struct MyParams {
    pub threads: usize,
}

impl Default for MyParams {
    fn default() -> Self {
        Self { threads: 1 }
    }
}

runtime.rs

#[derive(Debug, RegisterParam)]
#[param(MyParams)]
pub enum MyRuntime {
    #[serde(rename = "app")]
    #[param(threads = 1)]
    Application,

    #[serde(rename = "net")]
    #[param(threads = 2)]
    Network,
}

這就是 全部 開發者需要寫的程式碼,其餘交給巨集處理,非常的方便呢!


階段 1:展開 GenericRuntimeParam

這個 derive 會建立一個 泛型輔助結構

struct MyParamsHelper<T>
where
    T: DefaultParam,
{
    pub threads: usize,
    _phantom: PhantomData<T>,
}

impl<T> Default for MyParamsHelper<T>
where
    T: DefaultParam,
{
    fn default() -> Self {
        T::param().into()
    }
}

這讓 serde 可以說:
「如果設定檔中沒有提到 app,那就呼叫 DefaultParamOfApplication::param()吧。」


階段 2:展開 RegisterParam

這個 enum 巨集會做更多事:

  1. 定義 DefaultParam trait:

    trait DefaultParam {
        fn param() -> MyParams;
    }
    
  2. 為每個變體產生結構並實作它:

    struct DefaultParamOfApplication;
    impl DefaultParam for DefaultParamOfApplication {
        fn param() -> MyParams {
            MyParams { threads: 1, ..Default::default() }
        }
    }
    
    struct DefaultParamOfNetwork;
    impl DefaultParam for DefaultParamOfNetwork {
        fn param() -> MyParams {
            MyParams { threads: 2, ..Default::default() }
        }
    }
    
  3. 建立給 serde 使用的設定結構:

    struct AbstractRuntimeParam {
        #[serde(default)]
        app: MyParamsHelper<DefaultParamOfApplication>,
        #[serde(default)]
        net: MyParamsHelper<DefaultParamOfNetwork>,
    }
    
  4. 構建最終全域設定:

    pub struct GlobalRuntimeParam {
        pub app: MyParams,
        pub net: MyParams,
    }
    
    lazy_static! {
        pub static ref ZRUNTIME_PARAM: GlobalRuntimeParam = parse_env_var().unwrap();
    }
    
  5. 加入 trait 實作:

    impl Borrow<MyParams> for MyRuntime {
        fn borrow(&self) -> &MyParams {
            match self {
                MyRuntime::Application => &ZRUNTIME_PARAM.app,
                MyRuntime::Network => &ZRUNTIME_PARAM.net,
            }
        }
    }
    

階段 3:執行流程

假設程式以以下設定執行:

ZENOH_RUNTIME='(net: (threads: 8))'
  1. 啟動時,lazy_static! 將其解析為 AbstractRuntimeParam

  2. net → 明確設定:threads = 8

  3. app → 未設定,因此 serde 呼叫 MyParamsHelper<DefaultParamOfApplication>::default()

  4. 最終 ZRUNTIME_PARAM 為:

    app: MyParams { threads: 1 }
    net: MyParams { threads: 8 }
    
  5. 後續程式碼只需這樣呼叫:

    let params: &MyParams = MyRuntime::Network.borrow();
    println!("{}", params.threads); // 8
    

回到 Zenoh

Zenoh 的真實巨集遵循相同模式,只是規模更大:

  • GenericRuntimeParam 確保每個 runtime 都能內建預設值
  • RegisterParam 將 enum 變體、serde 解析、環境設定與全域初始化串接起來
  • 開發者只需標註 enum 與結構體 —— 不需要手動撰寫膠水程式碼

這使得 Zenoh 的併發模型既符合人體工學又高度可調


Runtime Pool 基礎設施

除了設定層,Zenoh 的 runtime 系統還包含多個關鍵基礎元件,用來管理執行環境實例並提供安全的存取模式。

ZRUNTIME_POOL

ZRUNTIME_POOL 是一個全域 lazy-static,管理所有 runtime 實例:

lazy_static! {
    pub static ref ZRUNTIME_POOL: ZRuntimePool = ZRuntimePool::new();
}

ZRuntimePoolHashMap<ZRuntime, OnceLock<Runtime>> 的封裝:

pub struct ZRuntimePool(HashMap<ZRuntime, OnceLock<Runtime>>);

impl ZRuntimePool {
    fn new() -> Self {
        Self(ZRuntime::iter().map(|zrt| (zrt, OnceLock::new())).collect())
    }

    pub fn get(&self, zrt: &ZRuntime) -> &Handle {
        // 檢查 handover 設定
        let param: &RuntimeParam = zrt.borrow();
        let zrt = match param.handover {
            Some(handover) => handover,
            None => *zrt,
        };

        // Lazy 初始化 runtime
        self.0
            .get(&zrt)
            .unwrap()
            .get_or_init(|| {
                zrt.init()
                    .unwrap_or_else(|_| panic!("Failed to init {zrt}"))
            })
            .handle()
    }
}

主要特性:

  • Lazy 初始化:執行環境僅在第一次被存取時建立
  • handover 支援:可根據設定將請求導向其他 runtime
  • 執行緒安全存取:多執行緒可同時安全存取

ZRUNTIME_INDEX

ZRUNTIME_INDEX 維護每個 runtime 的執行緒命名計數器:

lazy_static! {
    pub static ref ZRUNTIME_INDEX: HashMap<ZRuntime, AtomicUsize> = ZRuntime::iter()
        .map(|zrt| (zrt, AtomicUsize::new(0)))
        .collect();
}

這在 runtime 建構器中用來建立唯一的執行緒名稱:

impl RuntimeParam {
    pub fn build(&self, zrt: ZRuntime) -> Result<Runtime> {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(self.worker_threads)
            .max_blocking_threads(self.max_blocking_threads)
            .enable_io()
            .enable_time()
            .thread_name_fn(move || {
                let id = ZRUNTIME_INDEX
                    .get(&zrt)
                    .unwrap()
                    .fetch_add(1, Ordering::SeqCst);
                format!("{zrt}-{id}")
            })
            .build()?;
        Ok(rt)
    }
}

產生的執行緒名稱如:

  • Application-0, Application-1, …
  • RX-0, RX-1, …
  • TX-0, Net-0, …

讓除錯與效能剖析更容易。


ZRuntimePoolGuard

ZRuntimePoolGuard 是 RAII 守衛,用來確保全域 runtime 資源被正確清理:

pub struct ZRuntimePoolGuard;

impl Drop for ZRuntimePoolGuard {
    fn drop(&mut self) {
        unsafe {
            std::mem::drop((ZRUNTIME_POOL.deref() as *const ZRuntimePool).read());
            std::mem::drop(
                (ZRUNTIME_INDEX.deref() as *const HashMap<ZRuntime, AtomicUsize>).read(),
            );
        }
    }
}

為什麼需要它?

Rust 的 lazy_static! 產生的靜態變數不會自動被 drop。大多數應用程式這沒問題,因為作業系統會在程序結束時清理資源。
但在某些情境(例如測試、動態程式庫、嵌入式系統),你可能需要明確清理。

ZRuntimePoolGuard 提供顯式釋放資源的方法,例如:

fn main() {
    // 應用程式邏輯

    // 結束前進行清理
    let _guard = ZRuntimePoolGuard;
    // 當 _guard 離開作用域時,會清理全域資源
}

安全性注意:這裡的 unsafe 會手動讀取並 drop 靜態參考。必須保證之後不再有程式碼存取這些靜態變數。


Runtime 任務管理

ZRuntime enum 也提供方便的方法來在適當的 runtime 上排程任務或執行阻塞操作。

spawn

spawn 方法允許在指定的 runtime 上執行 future:

impl ZRuntime {
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        #[cfg(feature = "tracing-instrument")]
        let future = tracing::Instrument::instrument(future, tracing::Span::current());

        self.deref().spawn(future)
    }
}

使用範例:

// 在 RX runtime 上排程任務
let handle = ZRuntime::RX.spawn(async {
    process_incoming_data().await
});

// 在 TX runtime 上排程任務
ZRuntime::TX.spawn(async {
    send_data_to_peers().await
});

主要特性:

  • runtime 定向:任務在指定 runtime 的執行緒池中執行
  • 追蹤整合:啟用 tracing-instrument 功能時,自動傳遞 span
  • 型別安全:強制 Send + 'static 以確保跨執行緒安全

block_in_place

block_in_place 允許執行阻塞操作而不會卡住整個 runtime 的事件迴圈:

impl ZRuntime {
    pub fn block_in_place<F, R>(&self, f: F) -> R
    where
        F: Future<Output = R>,
    {
        match Handle::try_current() {
            Ok(handle) => {
                if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
                    panic!("Zenoh runtime doesn't support Tokio's current thread scheduler. Please use multi thread scheduler instead, e.g. a multi thread scheduler with one worker thread: `#[tokio::main(flavor = \"multi_thread\", worker_threads = 1)]`");
                }
            }
            Err(e) => {
                if e.is_thread_local_destroyed() {
                    panic!("The Thread Local Storage inside Tokio is destroyed. This might happen when Zenoh API is called at process exit, e.g. in the atexit handler. Calling the Zenoh API at process exit is not supported and should be avoided.");
                }
            }
        }

        #[cfg(feature = "tracing-instrument")]
        let f = tracing::Instrument::instrument(f, tracing::Span::current());

        tokio::task::block_in_place(move || self.block_on(f))
    }
}

使用範例:

// 在 Application runtime 中執行阻塞操作
let result = ZRuntime::Application.block_in_place(async {
    heavy_synchronous_computation().await
});

安全檢查:

  1. runtime 類型驗證:確保使用的是 multi-thread runtime
  2. TLS (Thread Local Storage) 健康檢查:避免在進程關閉時呼叫 API
  3. 追蹤整合:保留 span 上下文

適用場景:

  • 在特定 runtime 中將 async 程式轉為同步執行
  • 在 async 環境呼叫阻塞 API 而不阻塞整個 executor
  • 確保操作在正確的 runtime 執行緒池上執行

注意:若在 Tokio 的 current-thread runtime 或進程結束時呼叫,會觸發 panic,因為這些情境與 Zenoh 的執行緒模型不相容。


上一篇
Day 10: Rust Macro 熱身:從 `macro_rules!` 到 Derive Macro
下一篇
Day 12: Zenoh 的 非同步 Runtime 抉擇之路
系列文
30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言