Zenoh Kotlin 是一個先進的語言綁定專案,將 Zenoh 高效能的 Rust 實作帶到了 Kotlin 生態系統。
此專案展現了極致的跨語言互操作性,透過 Java Native Interface(JNI),為 JVM 及 Android 平台創建無縫綁定。
┌─────────────────────────────────────┐
│ Kotlin API Layer │ ← High-level, idiomatic APIs
├─────────────────────────────────────┤
│ JNI Bridge Layer │ ← Native interface binding
├─────────────────────────────────────┤
│ Rust Core (Zenoh) │ ← High-performance core
└─────────────────────────────────────┘
zenoh-jni
)綁定系統的核心是 zenoh-jni
crate,它透過 JNI 對外傳遞 Zenoh 功能:
// 範例 JNI 函式模式
#[no_mangle]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_openSessionViaJNI(
mut env: JNIEnv, // Java 互操作的 JNI 環境
_: JClass, // Java 類參考(未使用)
config_ptr: *const Config, // 指向 Zenoh 組態的原始指標
) -> *const Session {
match open_session(config_ptr) {
// 將 Session 的所有權以原始指標形式轉移給 Kotlin
Ok(session) => Arc::into_raw(Arc::new(session)),
Err(err) => {
throw_exception!(env, zerror!(err)); // 將 Rust 錯誤轉為 Java 例外
null()
}
}
}
主要特點:
#[no_mangle]
和 extern "C"
匯出 C 介面。jni
crate(v0.21.1)實現順暢的 Java 互操作。Gradle 建構流程協調 Rust 編譯與函式庫封裝:
// Gradle 建構設定
fun buildZenohJNI(mode: BuildMode = BuildMode.DEBUG) {
val cargoCommand = mutableListOf("cargo", "build")
if (mode == BuildMode.RELEASE) cargoCommand.add("--release")
// 在 Kotlin 編譯前執行 Rust 建構
project.exec {
commandLine(*(cargoCommand.toTypedArray()),
"--manifest-path", "../zenoh-jni/Cargo.toml")
}
}
// 確保依賴順序
tasks.named("compileKotlinJvm") {
dependsOn("buildZenohJni")
}
函式庫自動載入,且有補救的備案:
// JVM 實作,並自動偵測目標平台
init {
if (tryLoadingLocalLibrary().isFailure) {
val target = determineTarget().getOrThrow() // 偵測作業系統/架構
tryLoadingLibraryFromJarPackage(target).getOrThrow() // 從 JAR 抽取
}
}
以 Rust 的 Arc<T>
進行跨語言安全的所有權轉移:
// 建立:將所有權轉給 Kotlin
#[no_mangle]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_openSessionViaJNI(
/* ... */
) -> *const Session {
Arc::into_raw(Arc::new(session)) // 將 Arc 轉為原始指標
}
// 操作:暫時回收所有權
pub unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_getViaJNI(
/* ... */
querier_ptr: *const Querier,
/* ... */
) {
let querier = Arc::from_raw(querier_ptr); // 從原始指標回收 Arc
// 使用 querier 進行非同步操作 ...
let get_builder = querier.get().callback(move |reply| {
// 處理非同步回應 ...
});
get_builder.wait().map_err(|err| throw_exception!(env, err));
std::mem::forget(querier); // 將所有權歸還給 Kotlin
}
// 清理:最終回收並丟棄所有權
pub unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_freePtrViaJNI(
_env: JNIEnv,
_: JClass,
querier_ptr: *const Querier,
) {
Arc::from_raw(querier_ptr); // 回收並自動釋放
}
// 利用 finalizer 進行資源管理
class Session internal constructor(private val config: Config) : AutoCloseable {
private var jniSession: JNISession? = null
override fun close() {
// 清理強參考與弱參考的宣告
strongDeclarations.removeIf { it.undeclare(); true }
weakDeclarations.removeIf { it.get()?.undeclare(); true }
jniSession?.close()
jniSession = null
}
protected fun finalize() {
close() // 作為安全補救,防止用戶忘記呼叫 close()
}
}
pub struct SampleCallback {
callback_global_ref: GlobalRef, // 防止 Kotlin 回呼被 GC 回收
java_vm: JavaVM, // JVM 全域參考
}
impl SampleCallback {
pub fn call(&self, sample: Sample) {
// 將目前線程附加到 JVM(線程安全)
let env = self.java_vm.attach_current_thread().unwrap();
// 將 Rust 資料轉為 Java 物件
let java_sample = convert_sample_to_java(&env, sample);
// 呼叫 Kotlin 回呼:(Sample)-> Unit
env.call_method(
&self.callback_global_ref,
"call",
"(Lio/zenoh/sample/Sample;)V", // JNI 方法簽名
&[JValue::Object(&java_sample)]
).unwrap();
}
}
Zenoh Kotlin 採用回呼和 Channel 模式取代傳統協程:
// 回呼方式—即時處理
fun Querier.get(callback: Callback<Reply>) {
jniQuerier?.performGet(keyExpr, parameters, callback, {}, Unit)
}
// Channel 方式—收集後處理
fun Querier.get(channel: Channel<Reply>): Result<Channel<Reply>> {
val handler = ChannelHandler(channel)
return jniQuerier?.performGet(
keyExpr, parameters,
handler::handle, // 處理個別回應
handler::onClose, // 完成時清理
handler.receiver() // 回傳 channel 便於取用
)
}
利用 Kotlin 類型系統進行進階序列化:
// 帶有運行時類型資訊的序列化
#[no_mangle]
pub extern "C" fn Java_io_zenoh_jni_JNIZBytes_serializeViaJNI(
mut env: JNIEnv,
_class: JClass,
any: JObject, // 待序列化的 Kotlin 物件
ktype: JObject, // Kotlin 反射的運行時類型資訊
) -> jobject {
let mut serializer = ZSerializer::new();
let ktype = decode_ktype(&mut env, ktype)?; // 解析 Kotlin 類型資訊
serialize(&mut env, &mut serializer, any, &ktype)?;
// 將序列化完成的 ZBytes 物件回傳給 Kotlin
let zbytes = serializer.finish();
create_zbytes_object(&env, zbytes)
}
支援型別:
Boolean
、String
、ByteArray
、Byte
、Short
、Int
、Long
、Float
、Double
UByte
、UShort
、UInt
、ULong
List<T>
、Map<K,V>
(以 VarInt 長度編碼)Pair<A,B>
、Triple<A,B,C>
跨語言結構化錯誤傳遞:
// 宏式錯誤處理
#[macro_export]
macro_rules! throw_exception {
($env:expr, $err:expr) => {{
let _ = $err.throw_on_jvm(&mut $env); // 轉換為 Java 例外
}};
}
// 與 JVM 整合的自訂錯誤型別
#[derive(Debug)]
pub(crate) struct ZError(pub String);
impl ZError {
const KOTLIN_EXCEPTION_NAME: &'static str = "io/zenoh/exceptions/ZError";
pub fn throw_on_jvm(&self, env: &mut JNIEnv) -> ZResult<()> {
let exception_class = env.find_class(Self::KOTLIN_EXCEPTION_NAME)?;
env.throw_new(exception_class, self.to_string()) // 丟給 Kotlin
}
}
// JNI 函式中的安全錯誤處理
#[no_mangle]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_putViaJNI(/* ... */) {
let _ = || -> ZResult<()> {
// 核心操作邏輯...
session.put(&key_expr, payload).wait().map_err(|err| zerror!(err))
}()
.map_err(|err| throw_exception!(env, err)); // 錯誤轉為例外
std::mem::forget(session); // 維持所有權
}
// 初始化與設定
Zenoh.initLogFromEnvOr("error")
val session = Zenoh.open(Config.default()).getOrThrow()
// 發布者
val pubKeyExpr = "demo/example/pub".intoKeyExpr().getOrThrow()
val publisher = session.declarePublisher(pubKeyExpr).getOrThrow()
publisher.put(ZBytes.from("Hello Zenoh!")).getOrThrow()
// 以 Channel 訂閱(coroutine-friendly)
val subKeyExpr = "demo/example/**".intoKeyExpr().getOrThrow()
val channel = Channel<Sample>()
val subscriber = session.declareSubscriber(subKeyExpr, channel).getOrThrow()
runBlocking {
for (sample in subscriber.receiver) {
println("Received: ${sample.payload}")
}
}
// 以回呼訂閱(即時處理)
val callbackSubscriber = session.declareSubscriber(subKeyExpr) { sample ->
println("Callback: ${sample.payload}")
}.getOrThrow()
// 以 Channel 收集回覆
val selector = "demo/example/**".intoSelector().getOrThrow()
val replyChannel = Channel<Reply>()
session.get(replyChannel, selector).getOrThrow()
runBlocking {
for (reply in replyChannel) {
reply.result.onSuccess { sample ->
println("Reply: ${sample.payload}")
}
}
}
// 常駐查詢器
val querier = session.declareQuerier(selector.keyExpr).getOrThrow()
querier.get { reply ->
reply.result.onSuccess { sample ->
println("Querier result: ${sample.payload}")
}
}
Zenoh Kotlin 展現了先進的跨語言綁定開發典範,以慎密的架構設計完美銜接高效能系統語言與開發者友善平台,在安全性、效能與易用性之間取得平衡,值得作為同類型跨語言整合的典範參考。