iT邦幫忙

2023 iThome 鐵人賽

DAY 22
1
Software Development

前端? 後端? 摻在一起做成全端就好了系列 第 22

22 是 await 我加了await:rust async runtime ー tokio

  • 分享至 

  • xImage
  •  

談到rust的async runtime就要講tokio

tokio 簡介

tokio官網

from: tokio官網

之前提到tokio是rust裡非同步(異步)的runtime,另外tokio也提供了很多實用的庫:

  • tokio runtime:tokio非同步的底層核心,我們先前已有使用,比如#[tokio::main]
  • hyper:http 通訊的基礎設施,我們先前用過的reqwestwarptonic,都是透過hyper處理http連線協議。
  • tonic:gRPC的客戶端/服務端,我們前一篇才使用過。
  • tower:處理cliet/server間的抽象,處理請求/回應類別的middleware層
  • mio: Metal I/O,處理更底層接近OS作業系統的api,值得注意的是如果專案要編譯成wasm的話,不能包含到mio。
  • tracing:我們先前用來收集日誌的套件。
  • bytes:處理bytes的輔助套件,主要用於網路傳輸資料的存放與處理。

WebAssembly跑在沙盒(Sandbox)裡,因為安全性的考量做了比較嚴格的隔離,所以無法直接呼叫作業系統層的功能。也不能自己開啟TCP連線,而mio就是在處理TCP/UDP等底層的連線

我們以下使用偽代碼(pseudo code)來對比一下:

// 一般的 請求 / 回應 , Request / Response pattern
let 結果 = 發送請求();

// 保持連線的通訊要使用監聽 
loop {
    if 監聽.取得資料 {
        處理取得的資料
    }
}

有看出問題了嗎,其實我們先前寫的 web 服務、gRPC server,都在main裡執行到某一段就掉到loop裡了,什麼有人說沒看到loop,不信你在main後面加上panic,讓程式死掉看看:

// web/src/main.rs
#[tokio::main]
async fn main() {
    config::init();
    let _logger = Logger::builder().use_env().build();
    let routers = routers::all_routers();
    warp::serve(routers).run(([0, 0, 0, 0], config::http_port())).await;

    panic!("讓子彈飛一會兒");
}

跑web服務結果

可以看到main跑到server.run那段跑的很開心,一點也不理我們在後面加的東西,因為在run裡面,有偷偷實作loop無限的迴圈,一直check有沒有用戶端發送請求進來。也很合理呀(?),因為服務器就是隨時都在監聽或等待用戶端的呼叫,只要有請求呼叫就要處理,所以要保持隨時待命的狀態。

當然也有人會說我用polling的方式,定時去查一下就好(還滿常看到大家這樣做),反正偷偷塞每N秒發一個request到後端最簡單, (我不想學,我就爛)

那我們如果想要使用保持連線的雙邊通訊,就會在用戶端遇到這個問題,以往我們在客戶端都是由玩家或使用者進行某項操作後,發動請求(Request)呼叫,取得後端內容進行處理,所以我們的用戶端程式都是依需執行的,大都是一次性的工作。

而如果我們要照剛剛的偽代碼開啟持久性的連線,也就是說要隨時監聽後端可能發送過來的訊息,就勢必要陷入loop中,但是這樣子的話其他程式都不用工作了,這就叫做blocking(有時候會看到翻成阻塞),表示程式執行到這裡就會被卡住,無法再往下運行了,想像我就只有一個人,所以只做一份工作也很合理。

但是我很苦命要兼兩份工怎麼辦,還好我是個練武奇才,從小就學了影分身之術,只要分身就好了(只是我的分身不能像別人一樣賺錢QAQ):

影分身之術

那麼 這個要怎麼用呢

好東西不藏私,那就在這裡免費跟大家分享,我們開一個example實驗一下:

@@ Cargo.toml @@
 [workspace.dependencies]
+chrono = { version = "0.4" }
# examples/tokio/Cargo.toml
[package]
name = "example-tokio"
version = "0.1.0"
edition = "2021"


[dependencies]
chrono = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true }
// examples/tokio/src/main.rs
#[tokio::main]
async fn main() {
    println!("======程式開始======");        // 標注讓我們知道發生什麼事
    tokio::spawn(async {                    // 分身詠唱之術
        for i in 1..6 {
            println!("分身-攻{}", i);        // 模擬分身做一些事情
        }
    });
    println!("======程式結束======");
}

跑起來試試:

cargo run -p example-tokio
# 或是
cargo watch -q -c -w examples -x 'run -p example-tokio'

第一次執行影分身之術

結果怎麼好像怪怪的,順序是不是錯了,再跑一次試試:

重跑一次影分身之術

分身怎麼沒出來?

用過TaskPromise的朋友可能已經猜到答案了,對的,就是剛剛的spawn會開啟另一個執行緒,就像分身會自己行動一樣,但是分身是依附於主體,所以當程式結束的時候(主體已消失),分身自然也不復存在了,那為什麼第一個圖分身還會留著呢,那個只是殘影而已(?),因為我們程式最後的印出結束不是真的結束 (你說的黑不是黑) ,結束是在 main {} 跑完才是真的結束。(執行完println!後,要結束main前還會有一點點時間在進行資源的清理)

我們讓main結束前停下來,以利觀測程式的行為:

println!("======程式結束======");
let mut line = String::new();    
let _ = std::io::stdin().read_line(&mut line).unwrap(); // 等待終端輸入的資料

剛剛這樣看不到主線程(本體)跟副執行緒(分身)同時並存,所以我們讓主線程也同起跑一些東西,另外,為了讓事情變的有趣,我們加一點料:

  • 上色:我們來著色一下(順便回憶一下那些年,我們一起上過的BBS站台),使用的是ANSI的顏色碼
  • 時間序:我們使用rust處理時間的套件chrono,讓我們可以清楚的知道時間序。
  • 隨機數:我們加了隨機等待時間,避免一成不變看不出差異。

這邊先貼上程式碼,再逐項解說:

// examples/tokio/src/main.rs
use rand::random;
use chrono::Utc;

#[tokio::main]
async fn main() {
    println!("======程式開始======");
    tokio::spawn(async {                             // === 分身 part ===
        for i in 1..6 {
            let rand = random::<u64>() % 1000;       // 產生隨機 0~1000 豪秒數
            let delay = std::time::Duration::from_millis(rand);    // 時間區段
            tokio::time::sleep(delay).await;              // 進行等待
            let padding = " ".repeat((1 * 9) as usize);   // 插入空白讓版面整齊
            println!(
                "{:12}: {} \x1b[94m分身-攻{}\x1b[0m",
                 Utc::now().format("%S%.f"),              // 時間戳記
                 padding,                                 // 空白
                 i);                                      // 序

        }
    });
    for i in 1..6 {                                  // === 主體 part ===
        let rand = random::<u64>() % 1000;           // 產生隨機 0~1000 豪秒數
        let delay = std::time::Duration::from_millis(rand);
        tokio::time::sleep(delay).await;
        println!(
            "{:12}: \x1b[93m主體-攻{}\x1b[0m",
            Utc::now().format("%S%.f"),
            i);
    }
    println!("======程式結束======");
    let mut line = String::new();
    let _ = std::io::stdin().read_line(&mut line).unwrap();
}
  • \x1b[0m:有點開剛剛色碼連結看的話,就知道ASCII的跳脫字元ESC對應的16進位代碼是0x1b,所以rust裡字串插入跳脫字元使用\,其他字串使用方式可以看官方說明,後面數字則是色碼。
  • UTC應該不用解釋,格式化日期時間有點小麻煩的點是,每個程式語言或框架都不見得一樣的格式標記,所以每次用之前最好再check一下,避免誤用造成災難(?),因為我們在這裡跑的小程序幾秒就結束,所以我省略分鐘以上的部分,只留秒以下的部分。

敝人工作上就遇過因為前後端語言不同,使用時間格式的不一樣,所以把hh和HH誤用,一般走 12 小時制只在早上8點起排程沒有問題(?),某天突然要加班 突然晚上8點也跟著跑起來就GG了。

附帶一提,聰明的你應該可以想到平常寫的批次或console程式也可以上色嘍,對的,不管是powershell還是bash,大家有興趣可以自己去查怎麼使用。

跑完結果如下,是不是感覺看到兩條線了?就是2個程序(主、副)同時在跑:

一主體一分身同時跑

我們還可以開啟多個分身,在開分身外面再加個for iteration:

println!("======程式開始======");
for p in 1..6 {                    // 再多包一層 p for person
    tokio::spawn(async move {      // 這裡必需多一個 move
        for i in 1..6 {
            let rand = random::<u64>() % 1000;
            let delay = std::time::Duration::from_millis(rand);
            tokio::time::sleep(delay).await;
            let padding = " ".repeat((p * 9) as usize); // 把外面的p move進來
            println!("{:12}: {} \x1b[9{}m分{}-攻{}\x1b[0m",
                Utc::now().format("%S%.f"), padding, p, p, i);
        }
    });
}

跑起來長得像下面這樣:

開5個分身的結果

剛剛出現一個move關聯字,因為在我們的子程序裡,會使用到spawn外面的p,為了遵照rust的所有權借用機制,要把p移動到async的fn裡面,不過move到裡面的東西就出不來了(?)。跨不同執行緒資料同步是個不容易處理的議題,我們先一步步來,rust裡面有一個常見的跨執行緒溝通機制mpsc(Multi-producer, single-consumer FIFO queue),就是透過事件通道,實現多個事件發佈者,一個事件接受者,我們看一下具體怎麼使用:

#[tokio::main]
async fn main() {  // mpsc
    println!("======程式開始======");
    let (tx, rx) = std::sync::mpsc::channel();   // 產生事件通道
    let tx1 = tx.clone();                  // tx是發送,透過clone產生多個發送者
    tokio::spawn(async move {
        for i in 1..6 {
            let rand = random::<u64>() % 1000;
            let delay = std::time::Duration::from_millis(rand);
            tokio::time::sleep(delay).await;
            let padding = " ".repeat((1 * 14) as usize);
            println!("{:12}: {} \x1b[93m客戶1 tx:{}\x1b[0m",
                     Utc::now().format("%S%.f"), padding, rand);
            tx1.send((i, rand)).unwrap();      // tx1 被move進來,後面不能再使用
        }
    });
    let tx2 = tx.clone();                      // 產生另一個發送者tx
    tokio::spawn(async move {
        for i in 1..6 {
            let rand = random::<u64>() % 1000;
            let delay = std::time::Duration::from_millis(rand);
            tokio::time::sleep(delay).await;
            let padding = " ".repeat((2 * 14) as usize);
            println!("{:12}: {} \x1b[91m客戶2 tx:{}\x1b[0m",
                     Utc::now().format("%S%.f"), padding, rand);
            tx2.send((i, rand)).unwrap();      // 傳遞資料進channel queue
        }
    });
    loop {                                     // 一直在監聽
        let msg = rx.recv().unwrap();          // 如果接收到訊息再往下處理
        let (i, rand) = msg;
        println!("{:12}: \x1b[95m主機rx:{}\x1b[0m",
                 Utc::now().format("%S%.f"), rand);
    }                                          // forever loop
}

大家習慣用縮寫 tx, rx來代表資料送出(transmitter)及資料接收(receiver),如果看一下USB3的規格,也會提到tx, rx等縮寫。

使用mpsc開2客戶端同時跑

可以看到主機(主線程)正確接收到不同客戶端(子線程)發送過來的訊息,這個監聽的感覺,就很像我們平常寫的後端,web server跑起來就是這樣,一直等待著有人來呼叫我們寫好的rest api,處理好後繼續等待,只是很多東西都是framework幫我們做掉了,所以有些人沒有細究的話,就比較不知道底層的運作邏輯。

補充說明

Future

rust 裡的 async/await 是使用了 Future trait,我們寫的fn如果是async fn,rust編譯會讓這個fn回傳Future,有點類似C#裡的Task或Js裡的Promise

而在rust裡面的Future是lazy的,就是不使用 .await 呼叫的話則不會執行。

作業系統的小遊戲

這裡有個扮演作業系統的小遊戲https://plbrault.github.io/youre-the-os/第7篇有提到,還沒玩的朋友可以點開玩看看),玩家在遊戲裡面扮演著作業系統的角色,要分派工作給CPU,其中工作有CPU bounded和IO bounded,要如何切換工作以善加利用CPU的時間,分配記憶體的空間,對於理解多執行緒應該會有幫助(?)。

執行緒也有分硬體層軟體層,而rust號稱可以讓我們無懼並行(Fearless Concurrency),就是因為這個很特別,讓Rustacean又愛又恨的所有權機制,可以讓Rust處理Concurrency時,安全且有效的使用、傳遞及同步資料,不安全的寫法基本上就會被這個機制過濾掉而無法編譯通過。

Rustacean

剛剛提到的 Rustacean 是 Rust 程式設計師。你,今天rusty了嗎。

Ferris說明

From: https://www.rust-lang.org/zh-TW/learn/get-started

參考資料

本系列專案源始碼放置於 https://github.com/kenstt/demo-app


上一篇
21 CRUD w/ rust gRPC
下一篇
23 是websocket,不是socket。使用rust websocket
系列文
前端? 後端? 摻在一起做成全端就好了30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言