iT邦幫忙

2025 iThome 鐵人賽

DAY 7
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 7

Day 7: 除錯與性能優化入門 - 讓查詢跑得又快又穩

  • 分享至 

  • xImage
  •  

前言

大家好,不知不覺一個禮拜又快過完了,前六天我們已經學習了 DataFusion 的基本使用,從環境建置、數據讀取到複雜查詢的撰寫。但在實際應用中,難免會遇到查詢出錯或查詢速度不佳的問題。當然這些問題也經常出現在傳統資料庫的查詢過程中,因此今天我們將學習在使用 DataFusion 的場景下診斷和解決這些問題。照慣例我們會創建新的 cargo 專案並配置 Cargo.toml ,這邊就不多做說明了,那我就開始吧!

常見錯誤類型與診斷

首先在 data/sales.csv 中新增銷售數據

product_id,customer_id,quantity,amount,status,category,order_date
1,101,2,150.50,completed,Electronics,2025-09-15
2,102,1,89.99,completed,Clothing,2025-09-14
3,103,5,45.00,pending,Food,2025-09-13
1,104,1,150.50,completed,Electronics,2025-09-12
4,105,3,120.75,cancelled,Books,2025-09-11
5,101,2,200.00,completed,Sports,2025-09-10
6,106,1,75.50,refunded,Home,2025-09-09
2,107,4,359.96,completed,Clothing,2025-09-08
7,108,1,25.99,completed,Toys,2025-09-07
3,109,10,90.00,pending,Food,2025-09-06
8,110,2,499.98,completed,Electronics,2025-09-05
4,111,1,60.50,completed,Books,2025-09-04
9,112,3,150.00,completed,Sports,2025-09-03
5,113,2,200.00,cancelled,Sports,2025-09-02
10,114,1,45.00,completed,Home,2025-09-01
1,115,3,451.50,completed,Electronics,2025-08-31
2,116,2,179.98,pending,Clothing,2025-08-30
11,117,5,125.00,completed,Toys,2025-08-29
12,118,1,35.99,completed,Food,2025-08-28
13,119,4,180.00,refunded,Books,2025-08-27
8,120,1,249.99,completed,Electronics,2025-08-26
14,121,2,90.00,completed,Sports,2025-08-25
15,122,3,135.00,pending,Home,2025-08-24
16,123,1,55.50,completed,Clothing,2025-08-23
17,124,2,99.98,completed,Toys,2025-08-22
3,125,8,72.00,completed,Food,2025-08-21
18,126,1,299.99,cancelled,Electronics,2025-08-20
4,127,2,121.00,completed,Books,2025-08-19
19,128,5,250.00,completed,Sports,2025-08-18
20,129,1,80.00,completed,Home,2025-08-17
2,130,3,269.97,completed,Clothing,2025-08-16
21,131,1,45.99,pending,Toys,2025-08-15
22,132,4,160.00,completed,Food,2025-08-14
23,133,2,120.00,completed,Books,2025-08-13
8,134,1,249.99,refunded,Electronics,2025-08-12
24,135,3,180.00,completed,Sports,2025-08-11
25,136,2,150.00,completed,Home,2025-08-10
26,137,1,95.00,completed,Clothing,2025-08-09
27,138,5,199.95,completed,Toys,2025-08-08
3,139,6,54.00,pending,Food,2025-08-07
28,140,1,399.99,completed,Electronics,2025-08-06
29,141,2,85.00,completed,Books,2025-08-05
30,142,4,220.00,completed,Sports,2025-08-04
31,143,1,65.00,cancelled,Home,2025-08-03
2,144,2,179.98,completed,Clothing,2025-08-02
32,145,3,89.97,completed,Toys,2025-08-01
33,146,1,29.99,completed,Food,2025-07-31
34,147,2,140.00,completed,Books,2025-07-30
8,148,1,249.99,completed,Electronics,2025-07-29

1. Schema 相關錯誤

這是最常見的錯誤類型,通常發生在欄位名稱不存在或型別不匹配時。以下範例則是嘗試透過不正確的查詢語法來觸發 DataFusion 的 Schema error。

use datafusion::prelude::*;
use datafusion::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    ctx.register_csv("sales", "data/sales.csv", CsvReadOptions::new()).await?;

    // 錯誤一: 欄位名稱不存在 
    let df = ctx.sql("SELECT produt_id FROM sales").await;
    
    // 錯誤二: 嘗試將整數和字串相加
    // let df = ctx.sql(
    //    "SELECT product_id + '100' FROM sales" 
		// ).await?;

    match df {
        Ok(_) => println!("查詢成功"),
        Err(e) => {
            eprintln!("Schema 錯誤: {}", e);
        }
    }

    Ok(())
}

執行後分會得到以下錯誤訊息:

錯誤一

Schema 錯誤: Schema error: No field named produt_id. Did you mean 'sales.product_id'?.

錯誤二

Schema 錯誤: Error during planning: Cannot coerce arithmetic expression Int64 + Utf8 to valid types

老實說 error message 非常的詳細也很直觀,所以應該不難除錯啦XD

2. 資源耗盡錯誤

另一種常見的錯誤則是過度使用記憶體,通常發生在一次性讀取大量數據時。以下範例模擬資源不足的情況下讀取超過記憶體大小的資料。

use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 設定極小的記憶體限制 (256KB) 來觸發錯誤
    let memory_limit = 256 * 1024; 

    let runtime = RuntimeEnvBuilder::new()
        .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit)))
        .build_arc()?;

    let config = SessionConfig::new()
        .with_batch_size(8192) // 較大批次增加記憶體壓力
        .with_target_partitions(1600); // 更多分區產生更多中間狀態

    let ctx = SessionContext::new_with_config_rt(config, runtime);
    ctx.register_csv("sales", "data/sales.csv", CsvReadOptions::new())
        .await?;

    // 這個查詢會因為記憶體限制而失敗
    let query = "
        SELECT product_id, customer_id, order_date,
               COUNT(*) as cnt, SUM(amount) as total
        FROM sales 
        GROUP BY product_id, customer_id, order_date
    ";

    match ctx.sql(query).await?.collect().await {
        Ok(batches) => {
            println!(
                "✅ 查詢成功,結果筆數: {}",
                batches.iter().map(|b| b.num_rows()).sum::<usize>()
            );
        }
        Err(e) => {
            eprintln!("❌ 資源耗盡錯誤: {}", e);
            println!("\n💡 建議:");
            println!("   - 增加記憶體限制");
            println!("   - 減少 GROUP BY 欄位");
            println!("   - 增加 WHERE 條件過濾資料");
        }
    }

    Ok(())
}

執行後的錯誤訊息:

❌ 資源耗盡錯誤: Resources exhausted: Failed to allocate additional 3.6 KB for GroupedHashAggregateStream[0] (count(1), sum(sales.amount)) with 0.0 B already allocated for this reservation - 84.0 B remain available for the total pool

EXPLAIN 計劃解讀實戰

EXPLAIN 是理解和優化查詢的關鍵工具。讓我們通過實例學習如何解讀執行計劃。

基本 EXPLAIN 使用

use datafusion::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    ctx.register_csv("sales", "sales.csv", CsvReadOptions::new()).await?;
    
    // 使用 EXPLAIN 查看執行計劃
    println!("📊 查詢執行計劃分析\n");
    
    let explain_df = ctx.sql(
        "EXPLAIN SELECT category, SUM(amount) as total_sales
         FROM sales 
         WHERE status = 'completed' 
         GROUP BY category
         ORDER BY total_sales DESC"
    ).await?;
    
    explain_df.show().await?;
    
    Ok(())
}

輸出範例:

logical_plan
  Sort: total_sales DESC
    Projection: category, SUM(sales.amount) AS total_sales
      Aggregate: groupBy=[[category]], aggr=[[SUM(amount)]]
        Filter: status = Utf8("completed")
          TableScan: sales projection=[category, amount, status]

physical_plan
  SortPreservingMergeExec: [total_sales@1 DESC]
    SortExec: expr=[total_sales@1 DESC]
      ProjectionExec: expr=[category@0, SUM(sales.amount)@1 as total_sales]
        AggregateExec: mode=FinalPartitioned, gby=[category@0], aggr=[SUM(amount)]
          CoalesceBatchesExec: target_batch_size=8192
            RepartitionExec: partitioning=Hash([category@0], 16)
              AggregateExec: mode=Partial, gby=[category@0], aggr=[SUM(amount)]
                FilterExec: status@2 = completed
                  CsvExec: file_groups={1 group: [[sales.csv]]}

解讀重點

由下往上閱讀執行計劃:

  1. CsvExec: 從 CSV 檔案讀取資料
  2. FilterExec: 過濾 status = 'completed' 的記錄
  3. AggregateExec (Partial): 在每個分區進行部分聚合
  4. RepartitionExec: 根據 customer_id 的 hash 值重新分配資料
  5. AggregateExec (Final): 合併所有分區的聚合結果
  6. ProjectionExec: 選擇最終需要的欄位

性能指標解讀:

使用 EXPLAIN ANALYZE 可以看到實際執行的統計資訊:

let analyze_df = ctx.sql(
    "EXPLAIN ANALYZE SELECT customer_id, SUM(amount)
     FROM orders
     WHERE status = 'completed'
     GROUP BY customer_id"
).await?;

關鍵指標:

  • output_rows: 該階段輸出的資料筆數
  • elapsed_compute: CPU 計算時間
  • bytes_scanned: 掃描的資料量

基礎性能調優技巧

1. 調整並行度

DataFusion 預設會使用所有 CPU 核心,但有時需要手動調整:

use datafusion::execution::context::SessionConfig;

let config = SessionConfig::new()
    .with_target_partitions(4)  // 設定為 4 個分區
    .with_batch_size(8192);      // 每批次處理 8192 筆

let ctx = SessionContext::new_with_config(config);

何時調整:

  • 小數據集:減少分區數避免過度切割
  • 記憶體受限:減少分區數讓每個分區有更多記憶體
  • 大數據集:增加分區數提升並行度

2. 謂詞下推(Predicate Pushdown)

DataFusion 會自動將過濾條件盡可能推到資料源:

// 好的寫法:過濾條件會被下推到 Parquet 讀取階段
let df = ctx.sql(
    "SELECT * FROM large_table
     WHERE date >= '2025-01-01' AND category = 'A'"
).await?;

// 較差的寫法:在子查詢外過濾,無法充分利用下推
let df = ctx.sql(
    "SELECT * FROM (SELECT * FROM large_table) t
     WHERE date >= '2025-01-01'"
).await?;

查看 EXPLAIN 確認過濾條件是否成功下推到資料源層級。

3. 欄位投影最小化

只選擇需要的欄位可以大幅減少記憶體使用:

// 好的寫法:明確指定需要的欄位
let df = ctx.sql(
    "SELECT customer_id, total_amount FROM orders"
).await?;

// 避免的寫法:選擇所有欄位後再使用
let df = ctx.sql(
    "SELECT * FROM orders"
).await?;

4. 使用適當的檔案格式

不同格式有不同的性能特性:

格式 優勢 適用場景
Parquet 列式儲存,壓縮率高,支援謂詞下推 大規模分析查詢
CSV 人類可讀,通用性高 小數據集,資料交換
JSON 靈活的結構 半結構化資料

監控和日誌記錄策略

啟用詳細日誌

use tracing_subscriber;

#[tokio::main]
async fn main() -> Result<()> {
    // 初始化日誌系統
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .init();

    let ctx = SessionContext::new();

    // 現在可以看到 DataFusion 內部的詳細日誌
    let df = ctx.sql("SELECT * FROM table").await?;

    Ok(())
}

記錄查詢執行時間

use std::time::Instant;

async fn execute_with_timing(ctx: &SessionContext, query: &str) -> Result<()> {
    let start = Instant::now();

    let df = ctx.sql(query).await?;
    let result = df.collect().await?;

    let duration = start.elapsed();
    println!("查詢執行時間: {:?}", duration);
    println!("結果筆數: {}", result.iter().map(|b| b.num_rows()).sum::<usize>());

    Ok(())
}

實戰案例:優化慢查詢

假設我們有一個執行緩慢的查詢:

// 原始慢查詢
let slow_query = "
    SELECT p.category, AVG(o.amount) as avg_amount
    FROM orders o
    JOIN products p ON o.product_id = p.id
    WHERE o.order_date >= '2025-01-01'
    GROUP BY p.category
";

優化步驟:

  1. 使用 EXPLAIN ANALYZE 診斷瓶頸
let explain = ctx.sql(&format!("EXPLAIN ANALYZE {}", slow_query)).await?;
explain.show().await?;

  1. 檢查是否有謂詞下推
  • 確認 order_date >= '2025-01-01' 是否下推到 TableScan
  1. 確認 JOIN 策略
  • 查看是否使用了 Hash Join
  • 小表是否在 build 側
  1. 優化後的查詢:
// 如果發現沒有下推,可以嘗試重寫查詢
let optimized_query = "
    SELECT p.category, AVG(o.amount) as avg_amount
    FROM (
        SELECT product_id, amount
        FROM orders
        WHERE order_date >= '2025-01-01'
    ) o
    JOIN products p ON o.product_id = p.id
    GROUP BY p.category
";

小結

今天我們學習了 DataFusion 的除錯和性能優化基礎:

除錯技巧:

  • 認識常見錯誤類型:Schema 錯誤、型別不匹配、資源耗盡
  • 善用錯誤訊息中的詳細資訊
  • 使用日誌系統追蹤問題

性能優化:

  • 使用 EXPLAINEXPLAIN ANALYZE 理解查詢執行
  • 調整並行度和批次大小適應不同工作負載
  • 確保謂詞下推和投影下推生效
  • 選擇適當的檔案格式

這些技能將幫助你在實際專案中快速定位和解決問題,為接下來深入學習優化器和執行引擎打下基礎。

明日預告: 我們將開始第二週的學習,深入探討聚合查詢的使用技巧,並開始接觸 DataFusion 的內部實現機制。

參考資料

  1. DataFusion EXPLAIN Usage Guide
  2. DataFusion Configuration Settings
  3. DataFusion Query Optimizer Documentation
  4. Optimizing SQL in DataFusion - Part 1
  5. DataFusion Error Handling (GitHub Issue)

上一篇
Day 6: SQL 查詢進階技巧 - 解決實際分析問題
下一篇
Day 8: DataFusion 專案結構 - 認識各個 Crate
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅9
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言