iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
Rust

30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊系列 第 18

Day 18: Zenoh 如何實作高效能的 Python 綁定 - 第二部

  • 分享至 

  • xImage
  •  

Zenoh 如何實作高效能的 Python 綁定 - 第二部

深入探討:橋接非同步系統、Handler系統與 GIL 管理


第一部分 中,我們探討了 zenoh-python 綁定系統的架構基礎。現在,我們將深入進階實作技術,解釋 zenoh-python 如何成為 Rust 生態系中最優雅的 Python 綁定之一。我們將研究如何精巧橋接非同步 Rust 與同步 Python、實現靈活的處理器系統、以及如何智慧管理 Python 全域直譯器鎖 (GIL) 以達成最佳效能。

非同步挑戰:橋接兩個世界

zenoh-python 面臨的核心挑戰是:如何在 Zenoh 的非同步 Rust 核心與 Python 開發者所習慣的同步程式模型之間橋接。Rust API 大量採用 async/awaitFuture 範式,而 Python 開發者則期望能使用直觀的同步介面,才能無縫整合至現有的程式架構中。

為什麼 zenoh-python 沒有使用 Python asyncio?

儘管 Zenoh 在 Rust 核心完全採用非同步架構,zenoh-python 卻刻意避免使用 Python 的 asyncio 模組。這主要是因為若要將 Rust 的非同步 API 完整對應到 Python 的 asyncio 模型,將需要在 Python 層重新實作或重複許多複雜的非同步邏輯,這會顯著增加維護成本。

因此,Zenoh 團隊採取了更加務實的方法:提供 簡潔直觀的同步 API,以降低使用複雜度,同時在底層透過精巧的阻塞橋接機制讓 Rust 非同步核心正常運作。儘管社群對 asyncio 支援仍有需求(如 Issue #95),但官方 API 仍以同步為主要設計。

這種設計哲學完美平衡了 效能與可用性:確保 Python 開發者能以熟悉的方式使用 API,同時將複雜的非同步運作完全封裝在底層實作中。


wait 函式:非同步橋接核心

核心橋接函式在 utils.rs 中:

pub(crate) fn wait<T: Send>(
    py: Python,
    resolve: impl zenoh::Resolve<zenoh::Result<T>>,
) -> PyResult<T> {
    // 在呼叫 Rust 非同步等待前,暫時釋放 GIL
    py.allow_threads(|| resolve.wait()).into_pyres()
}

關鍵技術要點

  1. py.allow_threads():執行 Rust 阻塞操作前智慧釋放 Python GIL,避免整個直譯器停頓
  2. resolve.wait():將 Rust 非同步 Future 轉換為同步等待操作
  3. .into_pyres():將 Rust Result 型別無縫轉換為 Python 例外機制

這樣的設計確保在執行網路 I/O 等長時間操作時,不會阻塞整個 Python 應用程式。


實務應用:Session 操作中的 put

#[pyo3(signature = (key_expr, payload, *, encoding = None, /* ... */))]
fn put(
    &self,
    py: Python,
    key_expr: KeyExpr,
    payload: ZBytes,
    encoding: Option<Encoding>,
    // ... 其他參數
) -> PyResult<()> {
    let build = build!(
        self.0.put(key_expr, payload),
        encoding,
        congestion_control,
        priority,
        express,
        attachment,
        timestamp,
        allowed_destination,
    );
    // 將 Rust 非同步操作轉換為 Python 同步呼叫
    wait(py, build)
}

Python 側的使用方式非常直觀:

session.put("sensors/temperature", "23.5°C")

對 Python 開發者而言,這只是一個普通的同步函式呼叫,但底層實際執行的流程是:釋放 GIL → 等待 Rust 非同步操作完成 → 重獲 GIL 並回傳結果


Handler 系統:統一回呼與訊息管道

zenoh-python 的另一亮點是 通用處理器系統,能支援多種資料消費模式:

  • Callback 函式:事件驅動的即時處理
  • Channel (FIFO/RingBuffer):基於佇列的批次消費
  • 背景執行:避免阻塞主執行緒

統一介面設計

pub(crate) trait Receiver {
    fn type_name(&self) -> &'static str;
    fn try_recv(&self, py: Python) -> PyResult<PyObject>;
    fn recv(&self, py: Python) -> PyResult<PyObject>;
}

提供統一介面,讓各種 handler 能以 Python 友好方式接收訊息。

Handler 實作類別

#[pyclass]
pub(crate) struct DefaultHandler;   // 簡單回呼佇列

#[pyclass]
pub(crate) struct FifoChannel(usize);  // FIFO 佇列

#[pyclass]
pub(crate) struct RingChannel(usize);  // 環狀佇列(丟棄舊訊息)

每種 handler 都能轉換為 Zenoh 原生處理器,在 Rust 與 Python 兩側都得到最佳化。

統一的 HandlerImpl 列舉

精妙之處體現在 HandlerImpl 列舉中:

pub(crate) enum HandlerImpl<T> {
    Rust(Py<Handler>, PhantomData<T>),   // 原生 Rust 處理器
    Python(PyObject),                    // 純 Python 處理器
}

此列舉允許相同的 API 同時支援 Rust 最佳化處理器任意 Python 可呼叫對象,並在執行期自動選擇最佳路徑。

回呼處理:執行緒安全與效能

對於 Python 回呼,系統實作了進階的執行緒管理:

fn python_callback<T: IntoPython + CallbackParameter>(
    callback: &Bound<PyAny>,
) -> PyResult<RustCallback<T>> {
    let py = callback.py();
    let callback = PythonCallback::new(callback);
    Ok(if callback.0.indirect {
        // 背景執行緒處理
        let (rust_callback, receiver) = DefaultHandler.into_rust().into_handler();
        let target = PyCFunction::new_closure(py, None, None, move |args, _| {
            let py = args.py();
            while let Ok(x) = py.allow_threads(|| receiver.recv()) {
                callback.call(py, x);
            }
        })?;
        let thread = import!(py, threading.Thread).call((), Some(&kwargs))?;
        thread.call_method0("start")?;
        rust_callback
    } else {
        // 直接回呼(必須確保 GIL 安全)
        RustCallback::new(Arc::new(move |t| {
            Python::with_gil(|gil| callback.call(gil, t))
        }))
    })
}

此實作提供兩種回呼模式:

  1. 間接模式(預設):回呼在背景 Python 執行緒中執行,可避免阻塞 GIL
  2. 直接模式:回呼立即執行,但必須具備 GIL 安全性

型別轉換:downcast_or_new 設計模式

zenoh-python 最優雅的設計模式之一是 downcast_or_new 巨集,它提供了靈活的型別轉換功能:

macro_rules! downcast_or_new {
    ($ty:ty $(=> $new:ty)? $(, $other:expr)*) => {
        impl $ty {
            pub(crate) fn from_py(obj: &Bound<PyAny>) -> PyResult<Self> {
                if let Ok(obj) = obj.extract::<Self>() {
                    return Ok(obj);  // 已是正確型別
                }
                // 嘗試從 Python 型別建構
                let this = Self::new(PyResult::Ok(obj)$(.and_then(|obj| obj.extract::<$new>()))??.into(), $($other,)*);
                IntoResult::into_result(this)
            }
        }
    };
}

應用於 ZBytes 型別:

wrapper!(zenoh::bytes::ZBytes: Clone, Default);
downcast_or_new!(ZBytes);

#[pymethods]
impl ZBytes {
    #[new]
    fn new(obj: Option<&Bound<PyAny>>) -> PyResult<Self> {
        let Some(obj) = obj else { return Ok(Self::default()); };
        if let Ok(bytes) = obj.downcast::<PyByteArray>() {
            Ok(Self(bytes.to_vec().into()))
        } else if let Ok(bytes) = obj.downcast::<PyBytes>() {
            Ok(Self(bytes.as_bytes().into()))
        } else if let Ok(string) = obj.downcast::<PyString>() {
            Ok(Self(string.to_string().into()))
        } else {
            Err(PyTypeError::new_err(format!(
                "expected bytes/str type, found '{}'",
                obj.get_type().name().unwrap()
            )))
        }
    }
}

此模式允許 ZBytes 從以下型別建構:

  • 已存在的 ZBytes 物件(零拷貝)
  • Python bytes 物件
  • Python str 物件
  • Python bytearray 物件

統一的 API 可自動選擇最佳轉換路徑,實現 Python 資料型別與 Rust 型別之間的無縫轉換。


資源生命週期管理:Option 包裝器模式

在 Python 與 Rust 之間管理資源生命週期非常複雜。zenoh-python 使用一種「option 包裝器」模式來處理可被關閉或解除宣告的資源:

macro_rules! option_wrapper {
    ($($path:ident)::*, $error:literal) => {
        #[pyclass]
        pub(crate) struct $ty(pub(crate) Option<$path>);

        impl $ty {
            fn none() -> PyErr {
                zerror!($error)
            }
            fn get_ref(&self) -> PyResult<&$path> {
                self.0.as_ref().ok_or_else(Self::none)
            }
            fn take(&mut self) -> PyResult<$path> {
                self.0.take().ok_or_else(Self::none)
            }
        }
    };
}

應用於 Publisher:

option_wrapper!(zenoh::pubsub::Publisher<'static>, "Undeclared publisher");

此模式將資源狀態編碼於型別系統中:

  • Some(publisher):活躍可用的資源
  • None:已關閉/解除宣告的資源,使用時觸發錯誤

與 Context Manager 整合

#[pymethods]
impl Publisher {
    fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> PyResult<&'a Bound<'py, Self>> {
        Self::check(this)  // 驗證資源仍有效
    }

    fn __exit__(&mut self, py: Python, /* ... */) -> PyResult<PyObject> {
        self.undeclare(py)?;  // 自動清理
        Ok(py.None())
    }

    fn undeclare(&mut self, py: Python) -> PyResult<()> {
        wait(py, self.take()?.undeclare())  // 從 Option 移出並清理
    }
}

Python 使用方式:

with session.declare_publisher("demo/hello") as pub:
    pub.put("Hello World!")
# Publisher 在此自動清理

錯誤處理:上下文完整保留

zenoh-python 提供完整錯誤處理,跨越 Rust 與 Python 邊界保留完整錯誤訊息:

pyo3::create_exception!(zenoh, ZError, pyo3::exceptions::PyException);
pyo3::create_exception!(zenoh, ZDeserializeError, pyo3::exceptions::PyException);

pub(crate) trait IntoPyErr {
    fn into_pyerr(self) -> PyErr;
}

impl<E: ToString> IntoPyErr for E {
    fn into_pyerr(self) -> PyErr {
        ZError::new_err(self.to_string())
    }
}

特點:

  • 保留完整錯誤訊息
  • 不同錯誤類別 → 不同 Python 例外型別
  • 相容 Python try/except 模式

效能最佳化:GIL 管理與靜態快取

GIL 管理策略

// 長時操作會釋放 GIL
pub(crate) fn wait<T: Send>(py: Python, resolve: impl zenoh::Resolve<zenoh::Result<T>>) -> PyResult<T> {
    py.allow_threads(|| resolve.wait()).into_pyres()
}

// 資源清理同樣釋放 GIL
impl Drop for $ty {
    fn drop(&mut self) {
        Python::with_gil(|gil| gil.allow_threads(|| drop(self.0.take())));
    }
}

靜態快取

透過快取常用物件避免重複查找:

macro_rules! py_static {
    ($py:expr, $tp:ty, $expr:expr) => {{
        static CELL: pyo3::sync::GILOnceCell<Py<$tp>> = pyo3::sync::GILOnceCell::new();
        CELL.get_or_try_init($py, $expr).map(|obj| obj.bind($py))
    }};
}

常用於熱路徑,例如 logging handler,顯著提升頻繁操作的效能。


進階功能:迭代器協定與泛型支援

迭代器整合

#[pymethods]
impl Subscriber {
    fn __iter__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyIterator>> {
        self.handler(py)?.bind(py).try_iter()
    }
}

impl Handler {
    fn __next__(&self, py: Python) -> PyResult<Option<PyObject>> {
        match self.0.recv(py) {
            Ok(obj) => Ok(Some(obj)),
            Err(err) if err.is_instance_of::<ZError>(py) => Ok(None),  // StopIteration
            Err(err) => Err(err),
        }
    }
}

Python 使用:

for sample in subscriber:
    print(f"Received: {sample}")

泛型型別支援

#[pymethods]
impl Handler {
    #[classmethod]
    fn __class_getitem__(cls: &Bound<PyType>, args: &Bound<PyAny>) -> PyObject {
        generic(cls, args)  // 建立 Handler[Sample] 等
    }
}

Python 類型提示:

from zenoh import Handler, Sample
handler: Handler[Sample] = session.declare_subscriber("demo/**")

結論

zenoh-python 展現了 Rust 高效能Python 易用性 的完美結合,其關鍵亮點包括:

  1. 明確的非同步策略:底層 Rust 採用 async,對外提供同步 Python API
  2. 非同步橋接 wait:釋放 GIL + 同步等待 Rust Future 完成
  3. 零拷貝型別轉換 (downcast_or_new):優先零拷貝,必要時自動轉換
  4. 統一的Handler系統:支援 Callback、FIFO、RingBuffer 等多種接收模式
  5. 效能最佳化:有效管理 GIL 與靜態快取
  6. 資源生命周期管理:型別系統保證正確與自動清理
  7. 完整錯誤處理:保持 Rust → Python 錯誤上下文,支援 Pythonic 例外處理

這些設計模式不僅適用於 zenoh-python,更能為其他 Rust-Python 綁定專案提供珍貴的參考範本,完美詮釋了如何在保持系統程式語言高效能的同時,為應用開發者提供直觀易用的介面。


上一篇
Day 17: Zenoh 如何實作高效能 Python 綁定 - 第一部分
下一篇
Day 19: Zenoh Kotlin:結合 Rust 的效能與 Kotlin 的優雅
系列文
30 天玩轉 Zenoh:Rust 助力物聯網、機器人與自駕的高速通訊19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言