iT邦幫忙

2024 iThome 鐵人賽

DAY 30
0

簡介

Rust 設計的目標除了安全性、效能以外,還有並發性。現代電腦普遍配備多核心處理器。為了充分利用這些額外的運算能力,讓程式可以同時執行多個任務,提升整體效率,並行處理的能力也是近代程式語言很重要的指標之一。

多執行緒是並行的實現之一,可以想像成單執行緒是一個人負責所有事情,所以它知道所有的狀態,也只有它自己會改變狀態,然而多執行緒就相當於一個團隊分工合作,任務如何切分、分配,整體如何銜接與運作,會不會同時處理或是漏處理,這些問題讓多執行緒程式的設計變得複雜且充滿挑戰性。

在大部分的現代作業系統中,被執行的程式碼會在程序(process)中執行,在程序中處理獨立任務的是執行緒(threads),透過建立多個執行緒,我們可以同時處理多個任務。
然而,因為執行緒同時執行,無法保證執行的順序,因此也帶來競爭條件(race conditions)或死結(Deadlocks)等等問題。

最初 Rust 團隊認為記憶體安全與並行處理是獨立的議題,要用不同的方式處理,他們後來發現所有權與型別系統同時也可以應用在預防並行處理問題上
傳統的並行錯誤往往要等到程式執行之後才會發現,並且因為其複雜性可能難以重現或找出真正有問題的程式碼,然而 Rust 的編譯器可以在編譯期就找出有問題的程式碼,大幅降低並行問題的風險以及修正的成本。Rust 的這種特色被稱呼無懼並行(fearless concurrency)。

接下來就來介紹 Rust 並行處理的設計。

建立與等待執行緒

建立執行緒

在 Rust 標準庫 thread 提供建立新執行緒的函數 spawn,它採用的是 1:1 的執行緒實作模型,也就是程式語言產生的每一個執行緒會對應到一個作業系統實際的執行緒。

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 11..20 {
            println!("新執行緒: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("主執行緒: {}", i);
        thread::sleep(Duration::from_millis(1));
    }
} 

spawn接受一個閉包參數,代表新建立的執行緒要執行的任務,sleep則會讓執行緒暫停並進入睡眠狀態,時間到了才會繼續執行,我們用它來模擬運算時間。
在上面的程式碼,我們建立了一個新的執行緒,然後還會有一個原本程式執行時就會有的主執行緒,預期主執行緒要印出 5 以下的數字,新執行緒要印出 10 以上的數字,實際每次執行的結果可能會有些微不同。
其中一次輸出:

主執行緒: 1
新執行緒: 11
主執行緒: 2
新執行緒: 12
主執行緒: 3
新執行緒: 13
主執行緒: 4
新執行緒: 14
新執行緒: 15

從輸出可以觀察到幾件事情,主執行緒和子執行緒是同時執行,所以印出的結果會交錯,另一個是主執行緒執行完不會等新的執行緒就結束整個程式了

等待其他執行緒完成

要避免這件事我們可以用變數儲存 thread::spawn 回傳的數值為變數,這個數值的型別為 JoinHandle。當我們在主執行緒對它呼叫 join 方法時,主執行緒就會等待對應的執行緒完成。

fn main() {
    let handle = thread::spawn(|| {
        for i in 11..20 {
            println!("新執行緒: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("主執行緒: {}", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

輸出就可以觀察到新執行緒也執行完了。

主執行緒: 1
新執行緒: 11
主執行緒: 2
新執行緒: 12
主執行緒: 3
新執行緒: 13
主執行緒: 4
新執行緒: 14
新執行緒: 15
新執行緒: 16
新執行緒: 17
新執行緒: 18
新執行緒: 19

呼叫 join方法的位置也是有意義的,它代表要在哪些邏輯之前或之後等待,所以如果把它移到兩個 for 迴圈中間,就會先執行完新執行緒的迴圈才開始執行主執行緒的迴圈。

上面的例子其實主執行緒和新執行緒關聯不大,主執行緒也用不到新執行緒的結果,所以處理上單純很多,我們只需要注意需不需要等新的執行緒任務執行完成。

然而實務上更常是主執行緒會把複雜的任務交由其他執行緒處理,除了可能會需要傳遞必要參數外,也需要搜集結果做後續處理,這就牽涉到資料的同步,這種情況有兩種主要的處理方式:訊息傳遞共享狀態

多執行緒訊息傳遞:使用 channel

訊息傳遞是的概念是執行緒透過傳遞包含資料的訊息來溝通,資料會從一個執行緒傳到另外一個。
Rust 的標準函數庫有提供通道的實作,一個通道會有發送者(transmitter)與接收者(receiver)的角色,資料由發送者傳給接收者(這個函數庫是單向的)。通道可以重複利用傳遞多個訊息,而一旦發送者或接收者有一方被釋放掉時,該通道就會被關閉,關閉後不能再執行傳送或接收操作

實作通道

以下是用通道來實現並行的程式碼:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // Create multiple sender threads
    for i in 0..5 {
        let tx = tx.clone(); // Clone the sender for each thread
        thread::spawn(move || {
            let val = format!("Message from sender {}", i);
            tx.send(val).unwrap();
        });
    }

    // Receive messages from the channel
    for received in rx {
        println!("Received: {}", received);
    }
}

這邊用到的mpsc指的是多重生產者、唯一消費者(multiple producer, single consumer),通過呼叫 channel關聯函數,建立了一個通道並且回傳的元組包含 tx(發送者) 和 rx(接收者),如這個函數庫的名稱,tx可以呼叫clone,在最後一個tx被釋放前也不會造成通道被關閉,而 rx沒有則是為了保證資料安全性避免資料不一致。
spawn傳入的閉包如之前在閉包章節提到, 因為閉包捕獲了外面的變數 i,所以必須要用 move 關鍵字把所有權轉移。
呼叫send方法可以把訊息傳進通道,會回傳 Result<T, E> 代表成功或失敗,比如通道被關閉的時候就會失敗。雖然沒有處理這個 Result<T, E>編譯也會成功,但會跳警告訊息建議處理,這邊就簡單用 unwrap讓程式在失敗的時候崩潰,實務上可能不宜這麼暴力,有更好的錯誤處理方式。

for received in rx 是一種語法糖,讓我們可以持續處理傳來的訊息,等同於下面的程式碼:

    loop {
        thread::sleep(Duration::from_millis(1000));
        match rx.recv() {
            Ok(value) => {
                let received = value;
                println!("Received: {}", received);
            },
            Err(_) => break,
        }
    }

可以看到其實 recv也是回傳 Result<T, E>

如果執行上面的 main,還會遇到一個問題:程式沒有自動結束。
仔細看的話可以發現其中的原因:因為 txrx 都沒有被釋放。
在傳送訊息的迴圈裡我們是用 tx.clone 來讓多個執行緒傳送訊息,但原始的那個沒有人使用,而 rx 這頭則還在等發送者發訊息到通道。
所以我們要加一行 drop(tx); 放在傳送訊息的迴圈之後,代表訊息都傳送完了,那通道就會關閉,接收的迴圈就會中斷,程式結束。

fn main() {
    let (tx, rx) = mpsc::channel();

    // Create multiple sender threads
    for i in 0..5 {
        let tx = tx.clone(); // Clone the sender for each thread
        thread::spawn(move || {
            let val = format!("Message from sender {}", i);
            tx.send(val).unwrap();
        });
    }
    drop(tx);

    // Receive messages from the channel
    for received in rx {
        println!("Received: {}", received);
    }
}

阻塞與非阻塞的接收訊息

rx除了 recv還有提供另外一個方法 try_recv取得訊息,兩者最大的差別在於沒有訊息時的行為,recv阻塞執行緒,執行緒會等到有訊息為止,中間沒辦法做其他操作,**try_recv**則是當下沒收到訊息就會繼續往下走,不會阻塞執行緒

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        thread::sleep(Duration::from_secs(3)); // 故意等3秒後才傳送訊息
        let hello = String::from("Hello");
        tx.send(hello).unwrap();
    });

    println!("Trying to receive a message immediately:");
    match rx.try_recv() { // 這個時候還沒有訊息
        Ok(msg) => println!("Received: {}", msg),
        Err(mpsc::TryRecvError::Empty) => println!("No message available yet"),
        Err(e) => println!("Error: {:?}", e),
    }

    match rx.recv() { // 會等到有訊息為止
        Ok(msg) => println!("Received: {}", msg),
        Err(e) => println!("Error: {:?}", e),
    }
}

通道與所有權的關係

前面提到無懼並行的特色,所以來探討一下所有權系統怎麼避免並行的一些問題的。
我們改一下上面的程式碼,如果在訊息傳遞之後再嘗試使用該變數:

    thread::spawn(move || {
        thread::sleep(Duration::from_secs(3));
        let hello = String::from("Hello");
        tx.send(hello).unwrap();
        println!("{}", hello); // 借用錯誤
    });

預料內的錯誤,hello傳進send之後所有權就轉移後,後續就不能再使用,所以權在這裡的作用和並行的設計相性很好。因為如果這個值還可以用的話,原本的執行緒不會知道它有沒有被改變或清除,允許它被使用會造成資料不一致或空指標,而所有權機制漂亮的透過限制這個操作來避免並行可能造成的問題。

error[E0382]: borrow of moved value: `hello`
  --> src/main.rs:12:24
   |
10 |         let hello = String::from("Hello");
   |             ----- move occurs because `hello` has type `String`, which does not implement the `Copy` trait
11 |         tx.send(hello).unwrap();
   |                 ----- value moved here
12 |         println!("{}", hello);
   |                        ^^^^^ value borrowed here after move

多執行緒共享記憶體:使用 Arc<T>Mutex<T>

互斥鎖簡介

並行程式設計和所有權的設計有些相似性,通道就有點像是單一所有權的模式,訊息傳進通道之後原本的訊息傳遞者就不應該再對原本的資料做操作,就像是所有權從一個變數轉移到另外一個變數一樣。
與此相對,共享記憶體就像是共享所有權一樣,會有複數的擁有者同時對同一塊記憶體操作,更糟的是我們在 Rc<T>看到排除掉內部可變性的情況,至少還是不可變的,但共享記憶體就不是了,記憶體的資料會被不同執行緒更動,所以需要其他機制來管理誰可以存取這塊記憶體,其中之一就是互斥鎖Mutex<T>

互斥鎖的概念是,同一時間只允許一個執行緒存取特定資料,互斥鎖的鎖代表的是獨佔權,只有擁有鎖的執行緒可以對特定資料存取,所以任何執行緒在操作之前都要先嘗試索取互斥鎖的鎖,如果索取失敗代表有其他執行緒正在使用,那它的操作就會被拒絕。

也因此互斥鎖有兩個前提:

  1. 存取資料之前必須獲得鎖
  2. 操作完成後要釋放鎖,否則其他人無法使用

讀寫鎖RwLock 是互斥鎖另外一個更靈活的替代方案,它允許多個讀取者同時存在,但在寫入時仍然要求獨佔訪問,這邊就不多做介紹。

Mutex<T> 基本用法

先來看 Mutex<T> 的基本用法:

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);
    println!("m = {:?}", m); // m = Mutex { data: 5, poisoned: false, .. }

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    } // 這裡,num 變數離開作用域,自動釋放鎖

    println!("modified m = {:?}", m); // modified m = Mutex { data: 6, poisoned: false, .. }
}

透過 Mutex::new 關聯函數,數值 5 現在受到互斥鎖管理,如果存取裡面的數值必須要呼叫 lock 方法,它會回傳 LockResult,沒錯誤的話我們會取得** MutexGuard,這也是一個智慧指標**,所以可以用解參考的方式讀取被管理的數值,另外它在離開作用域的時候也會透過 Drop 特徵自動把鎖釋放掉。

如果我們沒有把呼叫 lock 的地方用大括號獨立一個作用域,那我們就必須要自己手動釋放鎖:

use std::sync::Mutex;
use std::mem::drop;

fn main() {
    let m = Mutex::new(5);

    let mut num = m.lock().unwrap();
    *num = 6;

    drop(num); // 手動釋放鎖

    println!("number = {:?}", m.lock().unwrap()); // number = 6
}

lockresc 一樣會阻塞執行緒,所以上面的程式碼如果把 drop 註解掉程式就會一直卡在那邊了,這是在使用上要很小心的地方。
同樣地,lock也有對應不會卡住執行緒的版本:try_lock,把上述最後一句的 m.lock().unwrap() 改成 m.try_lock().unwrap()就可以觀察到程式執行中報錯,因為使用 unwrap的關係,這裡會程式會崩潰。

thread 'main' panicked at src/main.rs:12:44:
called `Result::unwrap()` on an `Err` value: "WouldBlock"
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

至於要不要阻塞執行緒則是自己依照情況決定。

結合互斥鎖與多執行緒

接下來我們要結合互斥鎖和多執行緒來使用,我們用一個計數器來舉例,我們把計數器的累計放在閉包裡讓不同的執行緒來執行,不過 Mutex<T> ,本身並不支援共享所有權,馬上就會想到使用 Rc<T>,但如當初Rc<T>有提到它非執行緒安全的,所以我們最終要使用具備原子性的 Arc<T>,它的用法和 Rc<T>是相同的。

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![]; // 搜集所有執行緒的狀態

    for _ in 0..10 {
        let counter = Arc::clone(&counter); // 取得共享所有權
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap(); // 取得 MutexGuard

            *num += 1; // 解參考操作數值
        });
        handles.push(handle);
    }

    for handle in handles { // 等待所有的執行緒做完
        handle.join().unwrap();
    }

    println!("{}", counter.lock().unwrap()); // 10
}

lock 的錯誤

既然 lock會阻塞等到鎖被釋放,那 lock 怎樣的情況會取得錯誤呢?
通常有幾種情況,鎖的持有者執行緒發生恐慌或 Mutex<T> 的內部錯誤。

Mutex<T> 的內部錯誤,理論上如果操作系統或硬體層級發生了問題,導致鎖的行為異常,可能會導致lock返回錯誤。然而這種情況極為罕見。

另一種則是持有鎖的的執行緒在持有鎖的過程中發生了恐慌,那麼這個鎖就會被認為是中毒的(poisoned)。當其他執行緒嘗試獲取這個鎖時,lock會返回一個錯誤,因為數據可能處於不一致的狀態,這是 lock產生錯誤最常見的原因。

舉個例子:

fn main() {
    let m = Arc::new(Mutex::new(5));

    let m1 = Arc::clone(&m);
    let handle = thread::spawn(move || {
        let mut num = m1.lock().unwrap();
        *num = 6;
        panic!("Oops! Something went wrong");
    });

    handle.join().unwrap_err(); // 確保執行緒已經發生 panic

    println!("{:?}", m); // Mutex { data: 6, poisoned: true, .. }
}

在這段程式碼中,由於執行緒在持有鎖的過程中發生了恐慌,我們可以和基本用法那邊的輸出比較,便可以觀察到鎖的狀態變成中毒了(poisoned: true)。
Rust 使用中毒鎖來提醒某個執行緒在持有鎖時崩潰了,這可能導致共享資料的損壞,至於後續怎麼處理也是看狀況決定。

Rayon 庫

前面介紹的都是來自標準函數庫比較底層的操作,我們來看看別的 crate 會怎麼處理並行。
Rayon是一個簡化數據並行(data parallelism)處理的庫,透過它我們可以很容易地把本來需要依序執行的程式邏輯改成並行執行,提高效能。

Rayon最大的特色在於並行疊代器(parallel iterators),它和一般的疊代器 API 高度相容,所以我們可以很簡單的把 iter 替換成 Rayonpar_iter 就完成轉換成並行版本。

基本用法

記得要先把 Rayon 加到專案的依賴中。

use rayon::prelude::*;

fn main() {
    let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    let sum: i32 = numbers.par_iter().sum();
    println!("Sum: {}", sum);
}

除了 sum 它也支援mapfor_eachfilter等等常見操作,同樣可以做串接,讓複雜的並行操作變得很精簡並且直覺。

fn main() {
    let vec: Vec<i32> = (0..100).collect();
    let even_numbers: Vec<i32> = vec.par_iter()
        .filter(|&&x| x % 2 == 0)
        .map(|&x| x * 2)
        .collect();
    println!("{:?}", even_numbers);
}

同時Rayon也保證不會可以避免資料競爭,並在內部實作了自動負載平衡(load balancing)的機制,
Rayon 的負載平衡算法是基於工作竊取(work-stealing)模型,當一個執行緒完成任務並處於空閒狀態時,它會從其他執行緒的工作隊列的隊尾中竊取一些未完成的工作,而原本的執行緒會優先從頭端開始做,透過這種方式動態分配任務給執行緒,減少任務分配不均導致效能浪費的情況,保持高效的 CPU 利用率。

Rayon就比較不適合用在 I/O 密集型操作,因為並行化效益較低,反而因為執行緒切換的開銷而降低性能,另外 Rayon 的並行運算可能會消耗更多的記憶體,因為每個執行緒可能需要複製一份獨立資料。

排序

除了這些 Rayon 也提供了並序排序的方法 par_sort,我們可以看一下和原始版本的比較:

use std::time::Instant;
use rayon::prelude::*;

fn main() {
    // 生成一個包含100萬個隨機數的向量
    let mut large_vec: Vec<i32> = (0..1_000_000).map(|_| rand::random::<i32>()).collect();
    // 重新生成一個相同的向量,用於並行排序
    let mut large_vec_copy = large_vec.clone();

    let now = Instant::now();
    large_vec.sort(); // 一般排序
    let elapsed = now.elapsed();
    println!("{:.2?}", elapsed); // 342.72ms

    let now = Instant::now();
    large_vec_copy.par_sort(); // 並行排序
    let elapsed = now.elapsed();
    println!("{:.2?}", elapsed); // 64.13ms
}

效能是預期之內,重點是 Rayon 提供的語法能如此輕鬆實現並行處理,讓開發者可以專注在業務邏輯上,而不用自己去管理和維護並行處理。

SyncSend 特徵

這兩個特徵在並行處理上比較特別,它們不用特別實作什麼功能,反而比較像一種標記,告訴編譯器哪些型別能做哪些操作。

幾乎所有的 Rust 型別都有 Send 特徵,代表它們可以將其數值的所有權在執行緒間轉移。不過像 Rc<T>就沒有,所以我們在誤用 Rc<T> 的時候編譯器就會報錯,然後我們就會知道要改成用執行緒安全的 Arc<T>。同樣的,具有Sync特徵代表能安全從多個執行緒來參考,而 &TSend 特徵的話,代表參考可以安全地傳給其他執行緒,那 T 就必定具有 Sync 特徵,而 Rc<T>RefCell<T>都沒有 Sync,所以我們在多執行緒的情況會用 Arc<T><Mutex<T>

這邊介紹的目的有兩個,一個是從前述就知道自己手動實作這兩個特徵是不安全且有風險的,因為編譯器可以正確識別的前提就是這兩個特徵是正確的,另一個是我們可能會滿常在錯誤訊息看到它們,這有助於我們理解那些錯誤訊息

the trait `Send` is not implemented for `Rc<Mutex<i32>>`

結語

最後介紹了並行在 Rust 有哪些實作以及這些設計和所有權是怎麼協作來迴避並發程式設計會遇到的一些問題。
總結 Rust 在並行的主要優勢:

  • 所有權系統與並行安全:透過所有權機制在編譯的時候就能發現大部分並行設計會有的問題,相對其他語言可能要等到執行時才會發現問題。
  • 靜態類型檢查:
    SendSync 特徵提供了額外的類型安全保障,確保我們使用的型別可以讓我們的資料安全地在執行緒傳遞和共享。
    豐富的標準庫支持:
    從底層的 thread::spawn 到高級的 Mutex<T>Arc<T>,再到 mpsc 通道,Rust 標準庫提供了全面的並行原語。
  • 零成本抽象:
    Rust 的並行工具在提供高層抽象的同時,不會造成額外的運行時開銷

除此之外還有像 Rayon 的第三方庫讓提供更高層級的並行操作抽象,讓 Rust 對並行的支援十分全面。

到了今天把 Rust 的各種基礎都介紹一遍,也已經踩了很多 Rust 開發會遇到的坑,即使如此還是有很多想介紹的來不及介紹。這也是我個人第一次寫這種介紹文章以及刻意去鑽研及嘗試各種細節,畢竟以往學習程式語言通常是應用導向,相較起來這種方式可能比較無聊,但我倒是從其中學到滿多也得到滿多樂趣的。
之後應該就會開始試著用 Rust 開發一些小工具,目前已經有想嘗試的東西了。
最後希望學 Rust 的人越來越多XD,總之感謝看到這邊的人。


上一篇
Day29 - 巨集
系列文
螃蟹幼幼班:Rust 入門指南30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言