大家好,不知不覺一個禮拜又快過完了,前六天我們已經學習了 DataFusion 的基本使用,從環境建置、數據讀取到複雜查詢的撰寫。但在實際應用中,難免會遇到查詢出錯或查詢速度不佳的問題。當然這些問題也經常出現在傳統資料庫的查詢過程中,因此今天我們將學習在使用 DataFusion 的場景下診斷和解決這些問題。照慣例我們會創建新的 cargo 專案並配置 Cargo.toml
,這邊就不多做說明了,那我就開始吧!
首先在 data/sales.csv
中新增銷售數據
product_id,customer_id,quantity,amount,status,category,order_date
1,101,2,150.50,completed,Electronics,2025-09-15
2,102,1,89.99,completed,Clothing,2025-09-14
3,103,5,45.00,pending,Food,2025-09-13
1,104,1,150.50,completed,Electronics,2025-09-12
4,105,3,120.75,cancelled,Books,2025-09-11
5,101,2,200.00,completed,Sports,2025-09-10
6,106,1,75.50,refunded,Home,2025-09-09
2,107,4,359.96,completed,Clothing,2025-09-08
7,108,1,25.99,completed,Toys,2025-09-07
3,109,10,90.00,pending,Food,2025-09-06
8,110,2,499.98,completed,Electronics,2025-09-05
4,111,1,60.50,completed,Books,2025-09-04
9,112,3,150.00,completed,Sports,2025-09-03
5,113,2,200.00,cancelled,Sports,2025-09-02
10,114,1,45.00,completed,Home,2025-09-01
1,115,3,451.50,completed,Electronics,2025-08-31
2,116,2,179.98,pending,Clothing,2025-08-30
11,117,5,125.00,completed,Toys,2025-08-29
12,118,1,35.99,completed,Food,2025-08-28
13,119,4,180.00,refunded,Books,2025-08-27
8,120,1,249.99,completed,Electronics,2025-08-26
14,121,2,90.00,completed,Sports,2025-08-25
15,122,3,135.00,pending,Home,2025-08-24
16,123,1,55.50,completed,Clothing,2025-08-23
17,124,2,99.98,completed,Toys,2025-08-22
3,125,8,72.00,completed,Food,2025-08-21
18,126,1,299.99,cancelled,Electronics,2025-08-20
4,127,2,121.00,completed,Books,2025-08-19
19,128,5,250.00,completed,Sports,2025-08-18
20,129,1,80.00,completed,Home,2025-08-17
2,130,3,269.97,completed,Clothing,2025-08-16
21,131,1,45.99,pending,Toys,2025-08-15
22,132,4,160.00,completed,Food,2025-08-14
23,133,2,120.00,completed,Books,2025-08-13
8,134,1,249.99,refunded,Electronics,2025-08-12
24,135,3,180.00,completed,Sports,2025-08-11
25,136,2,150.00,completed,Home,2025-08-10
26,137,1,95.00,completed,Clothing,2025-08-09
27,138,5,199.95,completed,Toys,2025-08-08
3,139,6,54.00,pending,Food,2025-08-07
28,140,1,399.99,completed,Electronics,2025-08-06
29,141,2,85.00,completed,Books,2025-08-05
30,142,4,220.00,completed,Sports,2025-08-04
31,143,1,65.00,cancelled,Home,2025-08-03
2,144,2,179.98,completed,Clothing,2025-08-02
32,145,3,89.97,completed,Toys,2025-08-01
33,146,1,29.99,completed,Food,2025-07-31
34,147,2,140.00,completed,Books,2025-07-30
8,148,1,249.99,completed,Electronics,2025-07-29
這是最常見的錯誤類型,通常發生在欄位名稱不存在或型別不匹配時。以下範例則是嘗試透過不正確的查詢語法來觸發 DataFusion 的 Schema error。
use datafusion::prelude::*;
use datafusion::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("sales", "data/sales.csv", CsvReadOptions::new()).await?;
// 錯誤一: 欄位名稱不存在
let df = ctx.sql("SELECT produt_id FROM sales").await;
// 錯誤二: 嘗試將整數和字串相加
// let df = ctx.sql(
// "SELECT product_id + '100' FROM sales"
// ).await?;
match df {
Ok(_) => println!("查詢成功"),
Err(e) => {
eprintln!("Schema 錯誤: {}", e);
}
}
Ok(())
}
執行後分會得到以下錯誤訊息:
錯誤一
Schema 錯誤: Schema error: No field named produt_id. Did you mean 'sales.product_id'?.
錯誤二
Schema 錯誤: Error during planning: Cannot coerce arithmetic expression Int64 + Utf8 to valid types
老實說 error message 非常的詳細也很直觀,所以應該不難除錯啦XD
另一種常見的錯誤則是過度使用記憶體,通常發生在一次性讀取大量數據時。以下範例模擬資源不足的情況下讀取超過記憶體大小的資料。
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 設定極小的記憶體限制 (256KB) 來觸發錯誤
let memory_limit = 256 * 1024;
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit)))
.build_arc()?;
let config = SessionConfig::new()
.with_batch_size(8192) // 較大批次增加記憶體壓力
.with_target_partitions(1600); // 更多分區產生更多中間狀態
let ctx = SessionContext::new_with_config_rt(config, runtime);
ctx.register_csv("sales", "data/sales.csv", CsvReadOptions::new())
.await?;
// 這個查詢會因為記憶體限制而失敗
let query = "
SELECT product_id, customer_id, order_date,
COUNT(*) as cnt, SUM(amount) as total
FROM sales
GROUP BY product_id, customer_id, order_date
";
match ctx.sql(query).await?.collect().await {
Ok(batches) => {
println!(
"✅ 查詢成功,結果筆數: {}",
batches.iter().map(|b| b.num_rows()).sum::<usize>()
);
}
Err(e) => {
eprintln!("❌ 資源耗盡錯誤: {}", e);
println!("\n💡 建議:");
println!(" - 增加記憶體限制");
println!(" - 減少 GROUP BY 欄位");
println!(" - 增加 WHERE 條件過濾資料");
}
}
Ok(())
}
執行後的錯誤訊息:
❌ 資源耗盡錯誤: Resources exhausted: Failed to allocate additional 3.6 KB for GroupedHashAggregateStream[0] (count(1), sum(sales.amount)) with 0.0 B already allocated for this reservation - 84.0 B remain available for the total pool
EXPLAIN
是理解和優化查詢的關鍵工具。讓我們通過實例學習如何解讀執行計劃。
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("sales", "sales.csv", CsvReadOptions::new()).await?;
// 使用 EXPLAIN 查看執行計劃
println!("📊 查詢執行計劃分析\n");
let explain_df = ctx.sql(
"EXPLAIN SELECT category, SUM(amount) as total_sales
FROM sales
WHERE status = 'completed'
GROUP BY category
ORDER BY total_sales DESC"
).await?;
explain_df.show().await?;
Ok(())
}
輸出範例:
logical_plan
Sort: total_sales DESC
Projection: category, SUM(sales.amount) AS total_sales
Aggregate: groupBy=[[category]], aggr=[[SUM(amount)]]
Filter: status = Utf8("completed")
TableScan: sales projection=[category, amount, status]
physical_plan
SortPreservingMergeExec: [total_sales@1 DESC]
SortExec: expr=[total_sales@1 DESC]
ProjectionExec: expr=[category@0, SUM(sales.amount)@1 as total_sales]
AggregateExec: mode=FinalPartitioned, gby=[category@0], aggr=[SUM(amount)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([category@0], 16)
AggregateExec: mode=Partial, gby=[category@0], aggr=[SUM(amount)]
FilterExec: status@2 = completed
CsvExec: file_groups={1 group: [[sales.csv]]}
由下往上閱讀執行計劃:
性能指標解讀:
使用 EXPLAIN ANALYZE
可以看到實際執行的統計資訊:
let analyze_df = ctx.sql(
"EXPLAIN ANALYZE SELECT customer_id, SUM(amount)
FROM orders
WHERE status = 'completed'
GROUP BY customer_id"
).await?;
關鍵指標:
output_rows
: 該階段輸出的資料筆數elapsed_compute
: CPU 計算時間bytes_scanned
: 掃描的資料量DataFusion 預設會使用所有 CPU 核心,但有時需要手動調整:
use datafusion::execution::context::SessionConfig;
let config = SessionConfig::new()
.with_target_partitions(4) // 設定為 4 個分區
.with_batch_size(8192); // 每批次處理 8192 筆
let ctx = SessionContext::new_with_config(config);
何時調整:
DataFusion 會自動將過濾條件盡可能推到資料源:
// 好的寫法:過濾條件會被下推到 Parquet 讀取階段
let df = ctx.sql(
"SELECT * FROM large_table
WHERE date >= '2025-01-01' AND category = 'A'"
).await?;
// 較差的寫法:在子查詢外過濾,無法充分利用下推
let df = ctx.sql(
"SELECT * FROM (SELECT * FROM large_table) t
WHERE date >= '2025-01-01'"
).await?;
查看 EXPLAIN 確認過濾條件是否成功下推到資料源層級。
只選擇需要的欄位可以大幅減少記憶體使用:
// 好的寫法:明確指定需要的欄位
let df = ctx.sql(
"SELECT customer_id, total_amount FROM orders"
).await?;
// 避免的寫法:選擇所有欄位後再使用
let df = ctx.sql(
"SELECT * FROM orders"
).await?;
不同格式有不同的性能特性:
格式 | 優勢 | 適用場景 |
---|---|---|
Parquet | 列式儲存,壓縮率高,支援謂詞下推 | 大規模分析查詢 |
CSV | 人類可讀,通用性高 | 小數據集,資料交換 |
JSON | 靈活的結構 | 半結構化資料 |
use tracing_subscriber;
#[tokio::main]
async fn main() -> Result<()> {
// 初始化日誌系統
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let ctx = SessionContext::new();
// 現在可以看到 DataFusion 內部的詳細日誌
let df = ctx.sql("SELECT * FROM table").await?;
Ok(())
}
use std::time::Instant;
async fn execute_with_timing(ctx: &SessionContext, query: &str) -> Result<()> {
let start = Instant::now();
let df = ctx.sql(query).await?;
let result = df.collect().await?;
let duration = start.elapsed();
println!("查詢執行時間: {:?}", duration);
println!("結果筆數: {}", result.iter().map(|b| b.num_rows()).sum::<usize>());
Ok(())
}
假設我們有一個執行緩慢的查詢:
// 原始慢查詢
let slow_query = "
SELECT p.category, AVG(o.amount) as avg_amount
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.order_date >= '2025-01-01'
GROUP BY p.category
";
優化步驟:
let explain = ctx.sql(&format!("EXPLAIN ANALYZE {}", slow_query)).await?;
explain.show().await?;
order_date >= '2025-01-01'
是否下推到 TableScan// 如果發現沒有下推,可以嘗試重寫查詢
let optimized_query = "
SELECT p.category, AVG(o.amount) as avg_amount
FROM (
SELECT product_id, amount
FROM orders
WHERE order_date >= '2025-01-01'
) o
JOIN products p ON o.product_id = p.id
GROUP BY p.category
";
今天我們學習了 DataFusion 的除錯和性能優化基礎:
除錯技巧:
性能優化:
EXPLAIN
和 EXPLAIN ANALYZE
理解查詢執行這些技能將幫助你在實際專案中快速定位和解決問題,為接下來深入學習優化器和執行引擎打下基礎。
明日預告: 我們將開始第二週的學習,深入探討聚合查詢的使用技巧,並開始接觸 DataFusion 的內部實現機制。