iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0
Rust

大家一起跟Rust當好朋友吧!系列 第 17

Day 17: 非同步程式設計 (Async/Await) 入門 with Tokio

  • 分享至 

  • xImage
  •  

嗨嗨!大家好!歡迎來到 Rust 三十天挑戰的第十七天!

昨天我們征服了多執行緒併發程式設計,學會了如何讓多個執行緒協同工作。今天我們要探索另一種併發模式:非同步程式設計

如果說多執行緒是「雇用更多工人同時工作」,那麼非同步程式設計就像是「讓一個聰明的工人在等待時去做別的事情」。在處理 I/O 密集型任務(如網路請求、檔案讀寫、資料庫查詢)時,非同步程式設計往往比多執行緒更有效率!

為什麼需要非同步程式設計?

在我學習 C# 的時候,async/await 徹底改變了我對併發的理解。在 Rust 中,非同步程式設計同樣重要,特別是對於:

  • Web 服務器:同時處理數千個Request
  • 資料庫應用:等待查詢結果時不阻塞其他操作
  • 網路爬蟲:同時發送大量 HTTP 請求
  • 即時應用:聊天室、遊戲服務器等

傳統同步 vs 非同步的差異

讓我們先看看傳統同步程式碼的問題:

use std::time::Duration;
use std::thread;

// 模擬同步的網路請求
fn sync_fetch_data(url: &str) -> String {
    println!("開始請求: {}", url);
    thread::sleep(Duration::from_secs(2)); // 模擬網路延遲
    println!("完成請求: {}", url);
    format!("來自 {} 的資料", url)
}

fn main() {
    let start = std::time::Instant::now();
    
    // 同步執行三個請求
    let data1 = sync_fetch_data("https://api1.example.com");
    let data2 = sync_fetch_data("https://api2.example.com");
    let data3 = sync_fetch_data("https://api3.example.com");
    
    println!("所有資料:{}, {}, {}", data1, data2, data3);
    println!("總耗時:{:.2}秒", start.elapsed().as_secs_f64());
}

這個程式會依序執行三個請求,總共需要 6 秒!但實際上,我們可以同時發送這些請求。

認識 Tokio:Rust 的非同步執行環境

在 Rust 中,async/await 語法是語言內建的,但我們需要一個執行環境 (runtime) 來實際執行非同步任務。Tokio 是目前最受歡迎的選擇!

設定 Tokio

首先,讓我們在專案中新增 Tokio:

cargo new async_example
cd async_example
cargo add tokio --features full

或者在 Cargo.toml 中手動新增:

[dependencies]
tokio = { version = "1.0", features = ["full"] }

第一個非同步程式

讓我們重寫剛才的同步範例:

use tokio::time::{sleep, Duration}; 

// 非同步函式使用 async 關鍵字
async fn async_fetch_data(url: &str) -> String {
    println!("開始請求: {}", url);
    sleep(Duration::from_secs(2)).await; // 使用 .await 等待
    println!("完成請求: {}", url);
    format!("來自 {} 的資料", url)
}

#[tokio::main] // Tokio 的 main 函式標記
async fn main() {
    let start = std::time::Instant::now();
    
    // 並行執行三個請求
    let (data1, data2, data3) = tokio::join!(
        async_fetch_data("https://api1.example.com"),
        async_fetch_data("https://api2.example.com"),
        async_fetch_data("https://api3.example.com")
    );
    
    println!("所有資料:{}, {}, {}", data1, data2, data3);
    println!("總耗時:{:.2}秒", start.elapsed().as_secs_f64());
}

執行這個程式,你會發現總耗時約為 2 秒,而不是 6 秒!三個請求幾乎同時執行。

深入理解 async/await

1. async 函式的本質

// async 函式實際上回傳一個 Future
async fn example() -> i32 {
    42
}

// 等價於
fn example_desugared() -> impl std::future::Future<Output = i32> {
    async { 42 }
}

2. .await 的作用

use tokio::time::{sleep, Duration};

async fn step1() -> String {
    sleep(Duration::from_millis(100)).await;
    "步驟一完成".to_string()
}

async fn step2() -> String {
    sleep(Duration::from_millis(200)).await;
    "步驟二完成".to_string()
}

async fn sequential_example() {
    println!("開始依序執行");
    
    let result1 = step1().await; // 等待步驟一
    println!("{}", result1);
    
    let result2 = step2().await; // 等待步驟二
    println!("{}", result2);
    
    println!("依序執行完成");
}

async fn concurrent_example() {
    println!("開始並行執行");
    
    // 同時啟動兩個任務
    let task1 = step1();
    let task2 = step2();
    
    // 等待兩個任務都完成
    let (result1, result2) = tokio::join!(task1, task2);
    
    println!("{}", result1);
    println!("{}", result2);
    println!("並行執行完成");
}

#[tokio::main]
async fn main() {
    sequential_example().await;
    println!("---");
    concurrent_example().await;
}

Tokio 的核心工具

1. tokio::spawn - 創建獨立任務

use tokio::time::{sleep, Duration};

async fn background_task(id: u32) {
    for i in 1..=3 {
        println!("背景任務 {} - 步驟 {}", id, i);
        sleep(Duration::from_millis(500)).await;
    }
    println!("背景任務 {} 完成", id);
}

#[tokio::main]
async fn main() {
    println!("啟動背景任務");
    
    // 啟動三個獨立的背景任務
    let handle1 = tokio::spawn(background_task(1));
    let handle2 = tokio::spawn(background_task(2));
    let handle3 = tokio::spawn(background_task(3));
    
    // 主任務的工作
    for i in 1..=5 {
        println!("主任務 - 步驟 {}", i);
        sleep(Duration::from_millis(300)).await;
    }
    
    // 等待所有背景任務完成
    let _ = tokio::try_join!(handle1, handle2, handle3);
    println!("所有任務完成");
}

2. tokio::select! - 等待第一個完成的任務

use tokio::time::{sleep, Duration};

async fn fast_task() -> &'static str {
    sleep(Duration::from_millis(100)).await;
    "快速任務完成"
}

async fn slow_task() -> &'static str {
    sleep(Duration::from_millis(500)).await;
    "慢速任務完成"
}

#[tokio::main]
async fn main() {
    let result = tokio::select! {
        res = fast_task() => {
            println!("快速任務先完成: {}", res);
            res
        }
        res = slow_task() => {
            println!("慢速任務先完成: {}", res);
            res
        }
    };
    
    println!("選擇的結果: {}", result);
}

實際應用:HTTP 客戶端

讓我們建立一個實際的 HTTP 客戶端來抓取網頁內容:

cargo add reqwest --features json
use reqwest;
use tokio::time::{timeout, Duration};

async fn fetch_url(url: &str) -> Result<String, Box<dyn std::error::Error>> {
    println!("正在請求: {}", url);
    
    // 設定 5 秒超時
    let response = timeout(
        Duration::from_secs(5),
        reqwest::get(url)
    ).await??;
    
    let content = response.text().await?;
    println!("完成請求: {} (長度: {})", url, content.len());
    
    Ok(content)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2", 
        "https://httpbin.org/json",
    ];
    
    let start = std::time::Instant::now();
    
    // 並行發送所有請求
    let mut tasks = Vec::new();
    for url in urls {
        tasks.push(tokio::spawn(fetch_url(url)));
    }
    
    // 等待所有請求完成
    for task in tasks {
        match task.await? {
            Ok(content) => println!("成功獲取內容 ({}字元)", content.len()),
            Err(e) => println!("請求失敗: {}", e),
        }
    }
    
    println!("總耗時: {:.2}秒", start.elapsed().as_secs_f64());
    Ok(())
}

非同步中的錯誤處理

非同步程式設計中的錯誤處理需要特別注意:

use tokio::time::{sleep, Duration};

#[derive(Debug)]
enum ApiError {
    NetworkError(String),
    ParseError(String),
}

impl std::fmt::Display for ApiError {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            ApiError::NetworkError(msg) => write!(f, "網路錯誤: {}", msg),
            ApiError::ParseError(msg) => write!(f, "解析錯誤: {}", msg),
        }
    }
}

impl std::error::Error for ApiError {}

async fn risky_operation(success: bool) -> Result<String, ApiError> {
    sleep(Duration::from_millis(100)).await;
    
    if success {
        Ok("操作成功".to_string())
    } else {
        Err(ApiError::NetworkError("模擬網路失敗".to_string()))
    }
}

async fn handle_multiple_operations() {
    let operations = vec![
        tokio::spawn(risky_operation(true)),
        tokio::spawn(risky_operation(false)),
        tokio::spawn(risky_operation(true)),
    ];
    
    for (i, operation) in operations.into_iter().enumerate() {
        match operation.await {
            Ok(Ok(result)) => println!("操作 {} 成功: {}", i, result),
            Ok(Err(e)) => println!("操作 {} 失敗: {}", i, e),
            Err(e) => println!("操作 {} 任務錯誤: {}", i, e),
        }
    }
}

#[tokio::main]
async fn main() {
    handle_multiple_operations().await;
}

非同步與多執行緒的選擇

何時使用非同步?

適合非同步的情況

  • I/O 密集型任務(檔案讀寫、網路請求、資料庫查詢)
  • 需要處理大量併發連線的服務器
  • 響應時間敏感的應用程式
  • 資源有限的環境

何時使用多執行緒?

適合多執行緒的情況

  • CPU 密集型計算任務
  • 需要真正並行處理的算法
  • 阻塞式的第三方函式庫
  • 簡單的工作分割場景

混合使用

use tokio::task;
use std::time::Duration;

// CPU 密集型任務
fn cpu_intensive_work(n: u64) -> u64 {
    (1..=n).sum()
}

// 在非同步上下文中執行 CPU 密集型任務
async fn hybrid_example() {
    // I/O 任務(非同步)
    let io_task = tokio::time::sleep(Duration::from_millis(100));
    
    // CPU 任務(在執行緒池中執行)
    let cpu_task = task::spawn_blocking(|| {
        cpu_intensive_work(1_000_000)
    });
    
    // 等待兩個任務都完成
    let (_, cpu_result) = tokio::join!(io_task, cpu_task);
    
    match cpu_result {
        Ok(sum) => println!("CPU 計算結果: {}", sum),
        Err(e) => println!("CPU 計算錯誤: {}", e),
    }
}

#[tokio::main]
async fn main() {
    hybrid_example().await;
}

實戰演練:簡單的網頁爬蟲

讓我們結合今天學到的知識,建立一個簡單的並行網頁爬蟲:

use reqwest;
use tokio::time::{timeout, Duration};
use std::collections::HashSet;

#[derive(Debug)]
struct CrawlResult {
    url: String,
    title: Option<String>,
    status_code: u16,
    response_time: Duration,
}

async fn crawl_page(url: String) -> Result<CrawlResult, Box<dyn std::error::Error + Send + Sync>> {
    let start = std::time::Instant::now();
    
    let response = timeout(
        Duration::from_secs(10),
        reqwest::get(&url)
    ).await??;
    
    let status_code = response.status().as_u16();
    let html = response.text().await?;
    
    // 簡單的標題提取(實際專案中應該使用 HTML 解析器)
    let title = extract_title(&html);
    
    Ok(CrawlResult {
        url,
        title,
        status_code,
        response_time: start.elapsed(),
    })
}

fn extract_title(html: &str) -> Option<String> {
    // 簡化的標題提取
    if let Some(start) = html.find("<title>") {
        if let Some(end) = html[start + 7..].find("</title>") {
            return Some(html[start + 7..start + 7 + end].trim().to_string());
        }
    }
    None
}

async fn crawl_urls(urls: Vec<String>, max_concurrent: usize) -> Vec<CrawlResult> {
    let mut results = Vec::new();
    let mut tasks = Vec::new();
    
    println!("開始爬取 {} 個網址,最大並行數: {}", urls.len(), max_concurrent);
    
    for url in urls {
        tasks.push(tokio::spawn(crawl_page(url)));
        
        // 控制並行數量
        if tasks.len() >= max_concurrent {
            let completed = futures::future::join_all(tasks).await;
            for result in completed {
                match result {
                    Ok(Ok(crawl_result)) => {
                        println!("✓ {}: {} ({:.2}秒)", 
                               crawl_result.url, 
                               crawl_result.status_code,
                               crawl_result.response_time.as_secs_f64());
                        results.push(crawl_result);
                    }
                    Ok(Err(e)) => println!("✗ 爬取錯誤: {}", e),
                    Err(e) => println!("✗ 任務錯誤: {}", e),
                }
            }
            tasks.clear();
        }
    }
    
    // 處理剩餘的任務
    if !tasks.is_empty() {
        let completed = futures::future::join_all(tasks).await;
        for result in completed {
            match result {
                Ok(Ok(crawl_result)) => {
                    println!("✓ {}: {} ({:.2}秒)", 
                           crawl_result.url, 
                           crawl_result.status_code,
                           crawl_result.response_time.as_secs_f64());
                    results.push(crawl_result);
                }
                Ok(Err(e)) => println!("✗ 爬取錯誤: {}", e),
                Err(e) => println!("✗ 任務錯誤: {}", e),
            }
        }
    }
    
    results
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 需要先添加 futures 依賴:cargo add futures
    let urls = vec![
        "https://httpbin.org/delay/1".to_string(),
        "https://httpbin.org/html".to_string(),
        "https://httpbin.org/json".to_string(),
        "https://example.com".to_string(),
    ];
    
    let start = std::time::Instant::now();
    let results = crawl_urls(urls, 3).await;
    
    println!("\n爬取摘要:");
    println!("成功爬取: {} 個網頁", results.len());
    println!("總耗時: {:.2}秒", start.elapsed().as_secs_f64());
    
    // 統計結果
    let avg_response_time: f64 = results.iter()
        .map(|r| r.response_time.as_secs_f64())
        .sum::<f64>() / results.len() as f64;
    
    println!("平均響應時間: {:.2}秒", avg_response_time);
    
    Ok(())
}

別忘了新增必要的依賴:

cargo add futures

今天的收穫

今天我們深入探索了 Rust 的非同步程式設計:

核心概念

  • ✅ 理解非同步 vs 同步的差異與適用場景
  • ✅ 掌握 async/await 語法與 Future 概念
  • ✅ 學會使用 Tokio 作為非同步執行環境

實用技巧

  • tokio::join!tokio::select! 的並行控制
  • tokio::spawn 創建獨立的背景任務
  • timeout 處理超時情況
  • ✅ 非同步錯誤處理的最佳實務

實戰能力

  • ✅ 建立並行的 HTTP 客戶端
  • ✅ 實作簡單的網頁爬蟲
  • ✅ 混合使用非同步與多執行緒

效能優勢

  • ✅ I/O 密集型任務的效能提升顯著
  • ✅ 資源使用更有效率
  • ✅ 可擴展性大幅改善

明天預告

明天我們將學習 模組系統 (Module System),探討如何組織和管理大型 Rust 專案的程式碼結構。這對於我們即將在第四週開始的部落格後端專案非常重要!

本日挑戰

為了練習今天學到的非同步程式設計,試著完成以下挑戰:

挑戰目標:建立一個多功能的網路工具

功能需求

  1. 批次 URL 檢查器:檢查多個網址的可用性
  2. 響應時間統計:計算平均、最快、最慢的響應時間
  3. 狀態碼分類:統計不同 HTTP 狀態碼的數量
  4. 並行控制:支援設定最大並行請求數
  5. 超時處理:可配置的請求超時時間
  6. 進度顯示:即時顯示檢查進度

進階挑戰

  • 實作重試機制
  • 支援從檔案讀取 URL 列表
  • 將結果輸出到 JSON 或 CSV 檔案
  • 添加基本的 HTML 標題提取功能

這個挑戰將讓你綜合運用今天學到的所有非同步程式設計技巧!

我們明天見!


上一篇
Day 16: 併發 (Concurrency):不再害怕多執行緒
下一篇
Day 18: 模組系統 (Module System):整理你的專案
系列文
大家一起跟Rust當好朋友吧!18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言