iT邦幫忙

2025 iThome 鐵人賽

DAY 16
1
Rust

大家一起跟Rust當好朋友吧!系列 第 16

Day 16: 併發 (Concurrency):不再害怕多執行緒

  • 分享至 

  • xImage
  •  

嗨嗨!大家好!歡迎來到 Rust 三十天挑戰的第十六天!

昨天我們學習了閉包和迭代器這些函數式程式設計的強大工具,今天我們要踏入另一個讓 Rust 引以為傲的領域:併發程式設計

老實說,多執行緒一直是我在寫程式時既愛又怕的領域。愛的是它能讓程式跑得更快,怕的是那些詭異的競態條件(race condition)和難以除錯的死鎖問題。在 C# 中,雖然有 lockTask 這些工具,但還是經常會遇到一些讓人頭痛的併發問題。

但 Rust 不一樣!它透過所有權系統和型別系統,在編譯時期就能防止大部分的併發問題,實現了「無懼併發」(Fearless Concurrency)。這聽起來很神奇對吧?讓我們一起來看看 Rust 是如何做到的!

為什麼併發程式設計很重要?

在現代多核心處理器當道的時代,如果我們的程式只能使用一個 CPU 核心,那就太浪費了。併發程式設計讓我們可以:

  • 提升效能:同時利用多個 CPU 核心
  • 改善響應性:I/O 操作不會阻塞主執行緒
  • 處理更多請求:Web 服務器能同時處理多個使用者請求
  • 更好的資源利用:等待一個任務時可以執行其他任務

Rust 的併發哲學

在深入程式碼之前,讓我們先理解 Rust 對併發的設計哲學:

  1. 所有權防止資料競爭:透過所有權規則,確保資料不會被多個執行緒同時修改
  2. 型別系統保證執行緒安全:只有實作了特定 trait 的型別才能在執行緒間傳遞
  3. 編譯時檢查:大部分併發錯誤在編譯時就能被發現
  4. 零成本抽象:併發抽象不會帶來額外的執行時開銷

建立執行緒:thread::spawn

讓我們從最基本的執行緒建立開始:

use std::thread;
use std::time::Duration;

fn main() {
    println!("主執行緒開始");
    
    // 建立一個新的執行緒
    thread::spawn(|| {
        for i in 1..=5 {
            println!("子執行緒:計數 {}", i);
            thread::sleep(Duration::from_millis(500));
        }
        println!("子執行緒結束");
    });
    
    // 主執行緒的工作
    for i in 1..=3 {
        println!("主執行緒:計數 {}", i);
        thread::sleep(Duration::from_millis(300));
    }
    
    println!("主執行緒結束");
}

咦?如果你執行這個程式,你可能會發現子執行緒還沒執行完,程式就結束了!這是因為當主執行緒結束時,整個程式就會終止,不管子執行緒是否還在執行。

等待執行緒完成:JoinHandle

要解決這個問題,我們需要使用 join() 方法:

use std::thread;
use std::time::Duration;

fn main() {
    println!("主執行緒開始");
    
    // spawn 會回傳一個 JoinHandle
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("子執行緒:計數 {}", i);
            thread::sleep(Duration::from_millis(500));
        }
        println!("子執行緒結束");
    });
    
    // 主執行緒的工作
    for i in 1..=3 {
        println!("主執行緒:計數 {}", i);
        thread::sleep(Duration::from_millis(300));
    }
    
    // 等待子執行緒完成
    handle.join().unwrap();
    
    println!("主執行緒結束");
}

執行緒間的資料傳遞

1. 移動所有權

use std::thread;

fn main() {
    let data = "Hello from main thread!".to_string();
    
    // 使用 move 關鍵字將 data 的所有權移動到執行緒中
    let handle = thread::spawn(move || {
        println!("子執行緒收到:{}", data);
        data.len() // 回傳資料長度
    });
    
    // data 已經被移動,這裡無法再使用
    // println!("{}", data); // 這會編譯錯誤
    
    // 從執行緒取得回傳值
    let result = handle.join().unwrap();
    println!("字串長度:{}", result);
}

2. 使用通道 (Channels) 通訊

Rust 提供了強大的通道機制來讓執行緒間安全地傳遞訊息:

use std::sync::mpsc; // multiple producer, single consumer
use std::thread;
use std::time::Duration;

fn main() {
    // 建立通道,取得發送者 (tx) 和接收者 (rx)
    let (tx, rx) = mpsc::channel();
    
    // 發送者執行緒
    thread::spawn(move || {
        let messages = vec![
            "第一個訊息",
            "第二個訊息",
            "第三個訊息",
        ];
        
        for message in messages {
            tx.send(message).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
        
        // 發送完畢,關閉通道
        println!("發送者:所有訊息已發送");
    });
    
    // 接收者(主執行緒)
    for received in rx {
        println!("接收到:{}", received);
    }
    
    println!("所有訊息接收完畢");
}

3. 多個發送者

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // 複製發送者來建立多個發送者
    for id in 1..=3 {
        let sender = tx.clone();
        thread::spawn(move || {
            for i in 1..=3 {
                let message = format!("執行緒 {} 的訊息 {}", id, i);
                sender.send(message).unwrap();
            }
        });
    }
    
    // 關閉原始發送者
    drop(tx);
    
    // 接收所有訊息
    for received in rx {
        println!("收到:{}", received);
    }
}

共享狀態:Mutex 和 Arc

有時候我們需要多個執行緒共享同一份資料,這時就需要使用互斥鎖(Mutex):

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc:原子參考計數,允許多個執行緒擁有同一份資料
    // Mutex:互斥鎖,確保同時只有一個執行緒能修改資料
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for i in 0..10 {
        let counter_clone = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for _ in 0..1000 {
                // 取得鎖並修改資料
                let mut num = counter_clone.lock().unwrap();
                *num += 1;
            }
            println!("執行緒 {} 完成", i);
        });
        handles.push(handle);
    }
    
    // 等待所有執行緒完成
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("最終計數:{}", *counter.lock().unwrap());
}

實際應用:並行資料處理

讓我們結合昨天學到的迭代器和今天的併發知識,來處理一個實際問題:

use std::sync::{Arc, Mutex};
use std::thread;

// 模擬一個計算密集的函式
fn expensive_calculation(n: u64) -> u64 {
    // 計算費波那契數列
    if n <= 1 {
        n
    } else {
        expensive_calculation(n - 1) + expensive_calculation(n - 2)
    }
}

fn main() {
    let numbers = vec![30, 31, 32, 33, 34];
    let results = Arc::new(Mutex::new(Vec::new()));
    let mut handles = vec![];
    
    println!("開始並行計算...");
    
    for number in numbers {
        let results_clone = Arc::clone(&results);
        let handle = thread::spawn(move || {
            let result = expensive_calculation(number);
            println!("fibonacci({}) = {}", number, result);
            
            // 將結果加入共享的向量
            let mut results_guard = results_clone.lock().unwrap();
            results_guard.push((number, result));
        });
        handles.push(handle);
    }
    
    // 等待所有計算完成
    for handle in handles {
        handle.join().unwrap();
    }
    
    // 顯示所有結果
    let final_results = results.lock().unwrap();
    println!("\n所有結果:");
    for (n, result) in final_results.iter() {
        println!("fibonacci({}) = {}", n, result);
    }
}

工作竊取模式 (Work Stealing)

讓我們實作一個簡單的工作竊取模式,這是現代併發系統常用的技術:

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

struct TaskQueue {
    tasks: Mutex<Vec<Box<dyn Fn() + Send>>>,
}

impl TaskQueue {
    fn new() -> Self {
        TaskQueue {
            tasks: Mutex::new(Vec::new()),
        }
    }
    
    fn add_task<F>(&self, task: F)
    where
        F: Fn() + Send + 'static,
    {
        let mut tasks = self.tasks.lock().unwrap();
        tasks.push(Box::new(task));
    }
    
    fn execute_next_task(&self) -> bool {
        let mut tasks = self.tasks.lock().unwrap();
        if let Some(task) = tasks.pop() {
            drop(tasks); // 釋放鎖
            task();
            true
        } else {
            false
        }
    }
}

fn main() {
    let task_queue = Arc::new(TaskQueue::new());
    
    // 添加一些任務
    for i in 1..=10 {
        let task = move || {
            println!("執行任務 {}", i);
            thread::sleep(Duration::from_millis(100));
        };
        task_queue.add_task(task);
    }
    
    // 建立工作執行緒
    let mut handles = vec![];
    for worker_id in 1..=3 {
        let queue_clone = Arc::clone(&task_queue);
        let handle = thread::spawn(move || {
            loop {
                if !queue_clone.execute_next_task() {
                    println!("工作者 {} 沒有更多任務", worker_id);
                    break;
                }
            }
        });
        handles.push(handle);
    }
    
    // 等待所有工作完成
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("所有任務執行完畢");
}

併發程式設計的最佳實務

1. 優先使用訊息傳遞而非共享狀態

// 推薦:使用通道
let (tx, rx) = mpsc::channel();

// 較少使用:共享狀態
let shared_data = Arc::new(Mutex::new(data));

2. 最小化鎖的持有時間

// 不好的例子
let data = mutex.lock().unwrap();
expensive_operation(); // 長時間持有鎖
*data += 1;

// 好的例子
let value = {
    let data = mutex.lock().unwrap();
    *data
}; // 鎖在這裡釋放

expensive_operation();

{
    let mut data = mutex.lock().unwrap();
    *data = value + 1;
} // 鎖快速釋放

3. 避免巢狀鎖

// 可能導致死鎖
let _lock1 = mutex1.lock().unwrap();
let _lock2 = mutex2.lock().unwrap(); // 危險!

// 更安全的方式:確保鎖的順序一致
// 或使用更高階的併發原語

Rust 的併發優勢

1. 編譯時安全檢查

use std::thread;

fn main() {
    let data = vec![1, 2, 3];
    
    thread::spawn(|| {
        // println!("{:?}", data); // 編譯錯誤!沒有 move
    });
    
    // Rust 會強制你明確處理所有權:
    thread::spawn(move || {
        println!("{:?}", data); // 正確!
    });
}

2. 防止資料競爭

use std::sync::Mutex;

fn main() {
    let mutex = Mutex::new(5);
    
    {
        let data1 = mutex.lock().unwrap();
        // let data2 = mutex.lock().unwrap(); // 這會阻塞,不是編譯錯誤
        // 但 Rust 的型別系統防止了更危險的情況
    }
}

3. Send 和 Sync Traits

// Send:型別可以在執行緒間轉移所有權
// Sync:型別可以被多個執行緒同時參考

use std::rc::Rc;
use std::thread;

fn main() {
    let rc = Rc::new(5);
    
    // 這會編譯錯誤,因為 Rc<T> 沒有實作 Send
    // thread::spawn(move || {
    //     println!("{}", rc);
    // });
    
    // 應該使用 Arc<T>
    use std::sync::Arc;
    let arc = Arc::new(5);
    thread::spawn(move || {
        println!("{}", arc);
    });
}

效能比較:單執行緒 vs 多執行緒

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;

fn cpu_intensive_task(n: u32) -> u32 {
    (0..n).map(|i| i * i).sum()
}

fn main() {
    let data_size = 1_000_000;
    let thread_count = 4;
    let chunk_size = data_size / thread_count;
    
    // 單執行緒版本
    let start = Instant::now();
    let single_result = cpu_intensive_task(data_size);
    let single_duration = start.elapsed();
    
    // 多執行緒版本
    let start = Instant::now();
    let results = Arc::new(Mutex::new(Vec::new()));
    let mut handles = vec![];
    
    for i in 0..thread_count {
        let start_idx = i * chunk_size;
        let end_idx = if i == thread_count - 1 {
            data_size
        } else {
            (i + 1) * chunk_size
        };
        
        let results_clone = Arc::clone(&results);
        let handle = thread::spawn(move || {
            let partial_result = cpu_intensive_task(end_idx - start_idx);
            let mut results_guard = results_clone.lock().unwrap();
            results_guard.push(partial_result);
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    let multi_result: u32 = results.lock().unwrap().iter().sum();
    let multi_duration = start.elapsed();
    
    println!("單執行緒結果:{}", single_result);
    println!("多執行緒結果:{}", multi_result);
    println!("單執行緒耗時:{:?}", single_duration);
    println!("多執行緒耗時:{:?}", multi_duration);
    
    if single_duration > multi_duration {
        let speedup = single_duration.as_secs_f64() / multi_duration.as_secs_f64();
        println!("加速比:{:.2}x", speedup);
    }
}

今天的收穫

今天我們深入探討了 Rust 併發程式設計的核心概念:

執行緒管理

  • thread::spawn:建立新執行緒
  • JoinHandle:等待執行緒完成並取得結果
  • move 閉包:將資料所有權轉移到執行緒

通訊機制

  • Channels (mpsc):執行緒間安全的訊息傳遞
  • 多發送者模式:複製發送者處理複雜通訊

共享狀態

  • Arc:原子參考計數,實現共享所有權
  • Mutex:互斥鎖,確保執行緒安全的可變性
  • Send/Sync Traits:型別系統保證的執行緒安全

設計原則

  • 優先使用訊息傳遞而非共享狀態
  • 最小化鎖的持有時間
  • 利用型別系統防止併發錯誤

為什麼重要?

  • 無懼併發:編譯時期就能防止大部分併發問題
  • 效能提升:充分利用多核心處理器
  • 安全保證:所有權系統確保記憶體安全
  • 可擴展性:為高效能應用打下基礎

Rust 的併發模型真的讓人印象深刻。它讓我們能夠享受併發程式設計帶來的效能提升,同時又不用擔心那些傳統併發程式設計中的陷阱。

今天的小挑戰

為了鞏固今天的學習,嘗試實作一個並行網頁爬蟲

功能需求

  1. URL 管理:維護一個待爬取的 URL 佇列
  2. 並行爬取:同時處理多個 URL
  3. 結果收集:收集每個頁面的標題和連結數量
  4. 錯誤處理:優雅地處理無效 URL 或網路錯誤
  5. 統計報告:顯示爬取統計和結果

技術要求

  • 使用多個工作執行緒進行並行爬取
  • 使用通道進行任務分發和結果收集
  • 實作適當的錯誤處理機制
  • 限制同時進行的請求數量

技術提示

use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;

struct CrawlResult {
    url: String,
    title: Option<String>,
    link_count: usize,
    success: bool,
}

struct WebCrawler {
    max_workers: usize,
    request_delay: Duration,
}

impl WebCrawler {
    fn new(max_workers: usize) -> Self {
        WebCrawler {
            max_workers,
            request_delay: Duration::from_millis(100),
        }
    }
    
    fn crawl_urls(&self, urls: Vec<String>) -> Vec<CrawlResult> {
        // 實作並行爬蟲邏輯
        // 1. 建立工作執行緒池
        // 2. 使用通道分發 URL 任務
        // 3. 收集爬取結果
        // 4. 回傳完整結果
        vec![]
    }
    
    fn crawl_page(&self, url: String) -> CrawlResult {
        // 模擬網頁爬取(實際專案中會使用 HTTP 客戶端)
        // 這裡可以使用 thread::sleep 模擬網路延遲
        thread::sleep(self.request_delay);
        
        CrawlResult {
            url,
            title: Some("模擬標題".to_string()),
            link_count: 10,
            success: true,
        }
    }
}

fn main() {
    let urls = vec![
        "https://example.com".to_string(),
        "https://rust-lang.org".to_string(),
        "https://github.com".to_string(),
        // 添加更多 URL...
    ];
    
    let crawler = WebCrawler::new(3);
    let results = crawler.crawl_urls(urls);
    
    // 顯示統計
    let successful = results.iter().filter(|r| r.success).count();
    println!("爬取完成:{}/{} 成功", successful, results.len());
}

這個挑戰將讓你綜合運用執行緒、通道和共享狀態來解決實際問題。重點是要設計一個既高效又安全的併發架構。

明天我們將學習 非同步程式設計 (Async/Await),探討 Rust 如何處理 I/O 密集型任務。結合今天學到的併發知識,我們將能夠建立真正高效能的網路服務!

如果在實作過程中遇到任何問題,歡迎在留言區討論。併發程式設計確實需要一些時間來掌握,但 Rust 的安全保證讓這個過程變得更加愉快!

我們明天見!


上一篇
Day 15: 閉包 (Closures) 與迭代器 (Iterators):函數式程式設計的優雅之道
下一篇
Day 17: 非同步程式設計 (Async/Await) 入門 with Tokio
系列文
大家一起跟Rust當好朋友吧!18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言