談到rust的async runtime就要講tokio。
from: tokio官網
之前提到tokio是rust裡非同步(異步)的runtime,另外tokio也提供了很多實用的庫:
#[tokio::main]
。WebAssembly跑在沙盒(Sandbox)裡,因為安全性的考量做了比較嚴格的隔離,所以無法直接呼叫作業系統層的功能。也不能自己開啟TCP連線,而mio就是在處理TCP/UDP等底層的連線
我們以下使用偽代碼(pseudo code)來對比一下:
// 一般的 請求 / 回應 , Request / Response pattern
let 結果 = 發送請求();
// 保持連線的通訊要使用監聽
loop {
if 監聽.取得資料 {
處理取得的資料
}
}
有看出問題了嗎,其實我們先前寫的 web 服務、gRPC server,都在main裡執行到某一段就掉到loop裡了,什麼有人說沒看到loop,不信你在main後面加上panic,讓程式死掉看看:
// web/src/main.rs
#[tokio::main]
async fn main() {
config::init();
let _logger = Logger::builder().use_env().build();
let routers = routers::all_routers();
warp::serve(routers).run(([0, 0, 0, 0], config::http_port())).await;
panic!("讓子彈飛一會兒");
}
可以看到main跑到server.run那段跑的很開心,一點也不理我們在後面加的東西,因為在run裡面,有偷偷實作loop無限的迴圈,一直check有沒有用戶端發送請求進來。也很合理呀(?),因為服務器就是隨時都在監聽或等待用戶端的呼叫,只要有請求呼叫就要處理,所以要保持隨時待命的狀態。
當然也有人會說我用polling的方式,定時去查一下就好(還滿常看到大家這樣做),反正偷偷塞每N秒發一個request到後端最簡單,
(我不想學,我就爛)。
那我們如果想要使用保持連線的雙邊通訊,就會在用戶端遇到這個問題,以往我們在客戶端都是由玩家或使用者進行某項操作後,發動請求(Request)呼叫,取得後端內容進行處理,所以我們的用戶端程式都是依需執行的,大都是一次性的工作。
而如果我們要照剛剛的偽代碼開啟持久性的連線,也就是說要隨時監聽後端可能發送過來的訊息,就勢必要陷入loop中,但是這樣子的話其他程式都不用工作了,這就叫做blocking(有時候會看到翻成阻塞),表示程式執行到這裡就會被卡住,無法再往下運行了,想像我就只有一個人,所以只做一份工作也很合理。
但是我很苦命要兼兩份工怎麼辦,還好我是個練武奇才,從小就學了影分身之術,只要分身就好了(只是我的分身不能像別人一樣賺錢QAQ):
好東西不藏私,那就在這裡免費跟大家分享,我們開一個example實驗一下:
@@ Cargo.toml @@
[workspace.dependencies]
+chrono = { version = "0.4" }
# examples/tokio/Cargo.toml
[package]
name = "example-tokio"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true }
// examples/tokio/src/main.rs
#[tokio::main]
async fn main() {
println!("======程式開始======"); // 標注讓我們知道發生什麼事
tokio::spawn(async { // 分身詠唱之術
for i in 1..6 {
println!("分身-攻{}", i); // 模擬分身做一些事情
}
});
println!("======程式結束======");
}
跑起來試試:
cargo run -p example-tokio
# 或是
cargo watch -q -c -w examples -x 'run -p example-tokio'
結果怎麼好像怪怪的,順序是不是錯了,再跑一次試試:
分身怎麼沒出來?
用過Task或Promise的朋友可能已經猜到答案了,對的,就是剛剛的spawn會開啟另一個執行緒,就像分身會自己行動一樣,但是分身是依附於主體,所以當程式結束的時候(主體已消失),分身自然也不復存在了,那為什麼第一個圖分身還會留著呢,那個只是殘影而已(?),因為我們程式最後的印出結束不是真的結束 (你說的黑不是黑) ,結束是在 main {}
跑完才是真的結束。(執行完println!
後,要結束main
前還會有一點點時間在進行資源的清理)
我們讓main
結束前停下來,以利觀測程式的行為:
println!("======程式結束======");
let mut line = String::new();
let _ = std::io::stdin().read_line(&mut line).unwrap(); // 等待終端輸入的資料
剛剛這樣看不到主線程(本體)跟副執行緒(分身)同時並存,所以我們讓主線程也同起跑一些東西,另外,為了讓事情變的有趣,我們加一點料:
這邊先貼上程式碼,再逐項解說:
// examples/tokio/src/main.rs
use rand::random;
use chrono::Utc;
#[tokio::main]
async fn main() {
println!("======程式開始======");
tokio::spawn(async { // === 分身 part ===
for i in 1..6 {
let rand = random::<u64>() % 1000; // 產生隨機 0~1000 豪秒數
let delay = std::time::Duration::from_millis(rand); // 時間區段
tokio::time::sleep(delay).await; // 進行等待
let padding = " ".repeat((1 * 9) as usize); // 插入空白讓版面整齊
println!(
"{:12}: {} \x1b[94m分身-攻{}\x1b[0m",
Utc::now().format("%S%.f"), // 時間戳記
padding, // 空白
i); // 序
}
});
for i in 1..6 { // === 主體 part ===
let rand = random::<u64>() % 1000; // 產生隨機 0~1000 豪秒數
let delay = std::time::Duration::from_millis(rand);
tokio::time::sleep(delay).await;
println!(
"{:12}: \x1b[93m主體-攻{}\x1b[0m",
Utc::now().format("%S%.f"),
i);
}
println!("======程式結束======");
let mut line = String::new();
let _ = std::io::stdin().read_line(&mut line).unwrap();
}
\x1b[0m
:有點開剛剛色碼連結看的話,就知道ASCII的跳脫字元ESC
對應的16進位代碼是0x1b
,所以rust裡字串插入跳脫字元使用\
,其他字串使用方式可以看官方說明,後面數字則是色碼。敝人工作上就遇過因為前後端語言不同,使用時間格式的不一樣,所以把hh和HH誤用,一般走 12 小時制只在早上8點起排程沒有問題(?),某天突然要加班 突然晚上8點也跟著跑起來就GG了。
附帶一提,聰明的你應該可以想到平常寫的批次或console程式也可以上色嘍,對的,不管是powershell還是bash,大家有興趣可以自己去查怎麼使用。
跑完結果如下,是不是感覺看到兩條線了?就是2個程序(主、副)同時在跑:
我們還可以開啟多個分身,在開分身外面再加個for iteration:
println!("======程式開始======");
for p in 1..6 { // 再多包一層 p for person
tokio::spawn(async move { // 這裡必需多一個 move
for i in 1..6 {
let rand = random::<u64>() % 1000;
let delay = std::time::Duration::from_millis(rand);
tokio::time::sleep(delay).await;
let padding = " ".repeat((p * 9) as usize); // 把外面的p move進來
println!("{:12}: {} \x1b[9{}m分{}-攻{}\x1b[0m",
Utc::now().format("%S%.f"), padding, p, p, i);
}
});
}
跑起來長得像下面這樣:
剛剛出現一個move關聯字,因為在我們的子程序裡,會使用到spawn外面的p
,為了遵照rust的所有權借用機制,要把p
移動到async的fn裡面,不過move到裡面的東西就出不來了(?)。跨不同執行緒資料同步是個不容易處理的議題,我們先一步步來,rust裡面有一個常見的跨執行緒溝通機制mpsc(Multi-producer, single-consumer FIFO queue),就是透過事件通道,實現多個事件發佈者,一個事件接受者,我們看一下具體怎麼使用:
#[tokio::main]
async fn main() { // mpsc
println!("======程式開始======");
let (tx, rx) = std::sync::mpsc::channel(); // 產生事件通道
let tx1 = tx.clone(); // tx是發送,透過clone產生多個發送者
tokio::spawn(async move {
for i in 1..6 {
let rand = random::<u64>() % 1000;
let delay = std::time::Duration::from_millis(rand);
tokio::time::sleep(delay).await;
let padding = " ".repeat((1 * 14) as usize);
println!("{:12}: {} \x1b[93m客戶1 tx:{}\x1b[0m",
Utc::now().format("%S%.f"), padding, rand);
tx1.send((i, rand)).unwrap(); // tx1 被move進來,後面不能再使用
}
});
let tx2 = tx.clone(); // 產生另一個發送者tx
tokio::spawn(async move {
for i in 1..6 {
let rand = random::<u64>() % 1000;
let delay = std::time::Duration::from_millis(rand);
tokio::time::sleep(delay).await;
let padding = " ".repeat((2 * 14) as usize);
println!("{:12}: {} \x1b[91m客戶2 tx:{}\x1b[0m",
Utc::now().format("%S%.f"), padding, rand);
tx2.send((i, rand)).unwrap(); // 傳遞資料進channel queue
}
});
loop { // 一直在監聽
let msg = rx.recv().unwrap(); // 如果接收到訊息再往下處理
let (i, rand) = msg;
println!("{:12}: \x1b[95m主機rx:{}\x1b[0m",
Utc::now().format("%S%.f"), rand);
} // forever loop
}
大家習慣用縮寫 tx, rx來代表資料送出(transmitter)及資料接收(receiver),如果看一下USB3的規格,也會提到tx, rx等縮寫。
可以看到主機(主線程)正確接收到不同客戶端(子線程)發送過來的訊息,這個監聽的感覺,就很像我們平常寫的後端,web server跑起來就是這樣,一直等待著有人來呼叫我們寫好的rest api,處理好後繼續等待,只是很多東西都是framework幫我們做掉了,所以有些人沒有細究的話,就比較不知道底層的運作邏輯。
rust 裡的 async/await 是使用了 Future trait,我們寫的fn如果是async fn
,rust編譯會讓這個fn回傳Future,有點類似C#裡的Task或Js裡的Promise,
而在rust裡面的Future是lazy的,就是不使用 .await
呼叫的話則不會執行。
這裡有個扮演作業系統的小遊戲https://plbrault.github.io/youre-the-os/(第7篇有提到,還沒玩的朋友可以點開玩看看),玩家在遊戲裡面扮演著作業系統的角色,要分派工作給CPU,其中工作有CPU bounded和IO bounded,要如何切換工作以善加利用CPU的時間,分配記憶體的空間,對於理解多執行緒應該會有幫助(?)。
執行緒也有分硬體層和軟體層,而rust號稱可以讓我們無懼並行(Fearless Concurrency),就是因為這個很特別,讓Rustacean又愛又恨的所有權機制,可以讓Rust處理Concurrency時,安全且有效的使用、傳遞及同步資料,不安全的寫法基本上就會被這個機制過濾掉而無法編譯通過。
剛剛提到的 Rustacean 是 Rust 程式設計師。你,今天rusty了嗎。
本系列專案源始碼放置於 https://github.com/kenstt/demo-app