在 Day 27,我們探討了 StringView 如何提升字串處理效能。這是一種「靜態」的優化——透過更好的數據表示來加速操作。今天,我們將探討另一個層面的優化:動態決策優化,也就是讓查詢優化器根據實際數據特徵來做出最佳選擇。
回顧前幾週的學習,我們在 Day 13 認識了優化器框架,知道 DataFusion 有許多優化規則。在 Day 20-21 學習 Join 算子時,我們也提到過一個關鍵問題:如何選擇 Build 側和 Probe 側?當時我們知道「應該讓小表作為 Build 側」,但優化器怎麼知道哪個表更小呢?
答案就在今天的主題:統計資訊(Statistics)與基於成本的優化(Cost-Based Optimization, CBO)。
在傳統數據庫(如 PostgreSQL、MySQL)中,統計資訊是查詢優化的核心。透過收集表的行數、欄位分佈、唯一值數量等資訊,優化器能夠估算不同執行計劃的成本,選擇最優方案。DataFusion 作為現代化的查詢引擎,同樣具備這樣的能力,並且在設計上更加靈活和可擴展。
今天我們將深入理解:
想像一個簡單的 JOIN 查詢:
SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id;
優化器需要決定:
如果沒有統計資訊,優化器只能「猜測」:
盲目決策場景:
假設 orders 有 1000 萬行,customers 有 1 萬行
錯誤決策 1:用 orders 建立 Hash Table
結果:需要 2GB 記憶體,可能觸發 Spilling
正確決策:用 customers 建立 Hash Table
結果:只需 20MB 記憶體,完全在記憶體中完成
性能差異:3-5 倍!
有了統計資訊,優化器可以做出數據驅動的決策:
統計資訊驅動決策:
orders 表統計:
- 行數:10,000,000
- 平均行大小:200 bytes
- 預估大小:2GB
customers 表統計:
- 行數:10,000
- 平均行大小:150 bytes
- 預估大小:1.5MB
優化器計算:
方案 A(orders 為 Build 側):
- Build 階段記憶體:2GB(可能 Spill)
- Probe 階段 I/O:讀取 1.5MB
- 預估成本:高(Spilling 開銷)
方案 B(customers 為 Build 側):✓
- Build 階段記憶體:1.5MB(無需 Spill)
- Probe 階段 I/O:讀取 2GB
- 預估成本:低(全記憶體操作)
選擇:方案 B
統計資訊使優化器能夠:
DataFusion 的統計資訊定義在 datafusion-common/src/stats.rs
中:
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Statistics {
/// 表的總行數(精確值或估算值)
pub num_rows: Precision<usize>,
/// 輸出數據的總字節數
pub total_byte_size: Precision<usize>,
/// 每個欄位的統計資訊
pub column_statistics: Vec<ColumnStatistics>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnStatistics {
/// 空值數量
pub null_count: Precision<usize>,
/// 最小值
pub min_value: Precision<ScalarValue>,
/// 最大值
pub max_value: Precision<ScalarValue>,
/// 總和值(用於估算 AVG 等聚合)
pub sum_value: Precision<ScalarValue>,
/// 唯一值數量(基數,Cardinality)
pub distinct_count: Precision<usize>,
}
DataFusion 用 Precision
枚舉來表達統計資訊的精確程度:
pub enum Precision<T> {
/// 精確值(例如從 Parquet 元數據讀取)
Exact(T),
/// 估算值(例如通過採樣或傳播計算)
Inexact(T),
/// 缺失值(沒有統計資訊)
Absent,
}
設計理念:
統計資訊的精確度層次:
Exact(精確):
來源:Parquet 元數據、實際計數
可信度:100%
例子:num_rows = Exact(1000000)
Inexact(估算):
來源:統計傳播、基於規則的估算
可信度:70-90%
例子:過濾後行數 = Inexact(500000) // 根據選擇性估算
Absent(缺失):
來源:無法獲取統計
可信度:0%
例子:CSV 文件沒有內建統計
處理:優化器使用預設假設
這種設計讓優化器能夠區分「確定的事實」和「有根據的猜測」,做出更穩健的決策。
Parquet 是最「統計友好」的格式,內建豐富的統計資訊:
Parquet 檔案結構:
┌─────────────────────────────────────┐
│ File Metadata │
│ - 總行數:10,000,000 │
│ - Row Group 數量:100 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Row Group 0 Metadata │
│ - 行數:100,000 │
│ - Column Chunks: │
│ ┌─────────────────────────────┐ │
│ │ Column "user_id" │ │
│ │ - min: 1 │ │
│ │ - max: 50000 │ │
│ │ - null_count: 0 │ │
│ │ - distinct_count: 45231 (*)│ │
│ └─────────────────────────────┘ │
│ ┌─────────────────────────────┐ │
│ │ Column "age" │ │
│ │ - min: 18 │ │
│ │ - max: 95 │ │
│ │ - null_count: 123 │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
(*) distinct_count 需要額外計算,不是所有 Parquet 文件都有
DataFusion 如何利用:
// 從 Parquet 元數據提取統計資訊的實際實現
// 來源:datafusion/datasource-parquet/src/metadata.rs
pub fn statistics_from_parquet_metadata(
metadata: &ParquetMetaData,
table_schema: &SchemaRef,
) -> Result<Statistics> {
let row_groups_metadata = metadata.row_groups();
let mut statistics = Statistics::new_unknown(table_schema);
let mut has_statistics = false;
let mut num_rows = 0_usize;
let mut total_byte_size = 0_usize;
// 遍歷所有 Row Group
for row_group_meta in row_groups_metadata {
num_rows += row_group_meta.num_rows() as usize;
total_byte_size += row_group_meta.total_byte_size() as usize;
// 檢查是否有任何欄位包含統計資訊
if !has_statistics {
has_statistics = row_group_meta
.columns()
.iter()
.any(|column| column.statistics().is_some());
}
}
// 行數和字節數總是精確的
statistics.num_rows = Precision::Exact(num_rows);
statistics.total_byte_size = Precision::Exact(total_byte_size);
// 如果有統計資訊,提取欄位級別的 min/max/null_count
if has_statistics {
// 聚合所有 Row Group 的欄位統計
// 對於 min:取所有 Row Group 中的最小值
// 對於 max:取所有 Row Group 中的最大值
// 對於 null_count:累加所有 Row Group 的空值數量
statistics.column_statistics = aggregate_column_stats(
row_groups_metadata,
table_schema
);
} else {
// 沒有統計資訊時,返回未知狀態
statistics.column_statistics = Statistics::unknown_column(table_schema);
}
Ok(statistics)
}
關鍵處理邏輯:
Precision::Exact
),直接從 Row Group 元數據累加min_value
:取所有 Row Group 的最小值中的最小值max_value
:取所有 Row Group 的最大值中的最大值null_count
:累加所有 Row Group 的空值數量null_count = Precision::Exact(num_rows)
)自定義數據源可以實現 TableProvider::statistics()
方法:
#[async_trait]
impl TableProvider for MyCustomTable {
async fn scan(&self, ...) -> Result<Arc<dyn ExecutionPlan>> {
// ... 掃描邏輯
}
fn statistics(&self) -> Option<Statistics> {
Some(Statistics {
num_rows: Precision::Exact(self.row_count),
total_byte_size: Precision::Inexact(self.estimated_size),
column_statistics: self.build_column_stats(),
})
}
}
實例:記憶體表(MemTable)
// MemTable 直接計算精確統計
impl TableProvider for MemTable {
fn statistics(&self) -> Option<Statistics> {
let num_rows = self.batches.iter()
.map(|batch| batch.num_rows())
.sum();
Some(Statistics {
num_rows: Precision::Exact(num_rows),
// ... 其他統計
})
}
}
CSV 等格式沒有內建統計,DataFusion 會返回 Absent
:
impl ExecutionPlan for CsvExec {
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics {
num_rows: Precision::Absent, // 不知道行數
total_byte_size: Precision::Inexact(file_size), // 只知道文件大小
column_statistics: vec![],
})
}
}
統計資訊不僅來自數據源,還會在查詢計劃樹中向上傳播和估算。
查詢:
SELECT user_id, COUNT(*)
FROM orders
WHERE amount > 100
GROUP BY user_id;
計劃樹與統計傳播:
┌─────────────────────────────────────┐
│ AggregateExec │
│ 統計估算: │
│ num_rows = Inexact(50,000) │ ← 基於 distinct(user_id) 估算
│ (GROUP BY 的輸出行數 ≈ 分組數) │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ FilterExec (amount > 100) │
│ 統計估算: │
│ num_rows = Inexact(3,000,000) │ ← 根據選擇性估算
│ (輸入 10M * 選擇性 0.3) │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ ParquetExec │
│ 統計來源:文件元數據(精確) │
│ num_rows = Exact(10,000,000) │
│ min(amount) = 10 │
│ max(amount) = 10000 │
└─────────────────────────────────────┘
選擇性(Selectivity):過濾條件通過的行數比例
估算公式示例:
條件:amount > 100
已知統計:
- min(amount) = 10
- max(amount) = 10000
- 假設均勻分佈
選擇性估算:
selectivity = (max - 100) / (max - min)
= (10000 - 100) / (10000 - 10)
= 9900 / 9990
≈ 0.99
輸出行數 = 10,000,000 * 0.99 = 9,900,000
注意:這是簡化的估算。實際上 DataFusion 使用更複雜的模型,考慮數據傾斜、直方圖(Histogram)等。
Join 的輸出行數估算更複雜:
場景:orders JOIN customers ON orders.customer_id = customers.id
已知統計:
orders.num_rows = 10,000,000
customers.num_rows = 100,000
distinct(orders.customer_id) = 80,000 ← 關鍵!
distinct(customers.id) = 100,000
估算邏輯:
- 如果是 INNER JOIN,輸出行數取決於外鍵關係
- 假設每個 customer 平均有:10,000,000 / 80,000 ≈ 125 個訂單
- 估算輸出:10,000,000 行(每個訂單對應一個客戶)
如果沒有 distinct_count:
- 退化為簡單估算:min(orders.num_rows, customers.num_rows)
- 或使用預設選擇性(如 0.1)
當有多個表 JOIN 時,順序很重要:
SELECT *
FROM large_table l
JOIN medium_table m ON l.m_id = m.id
JOIN small_table s ON m.s_id = s.id;
優化器決策過程:
原始順序:large → medium → small
統計資訊:
large_table: 10,000,000 rows
medium_table: 100,000 rows
small_table: 1,000 rows
成本計算:
方案 A(原始順序):
Step 1: large JOIN medium
- Build: medium (100K rows → 20MB)
- Probe: large (10M rows)
- 中間結果:~8,000,000 rows(假設選擇性 0.8)
Step 2: 中間結果 JOIN small
- Build: small (1K rows → 200KB)
- Probe: 中間結果 (8M rows)
- 最終結果:6,000,000 rows
總成本:Build(20MB) + Probe(10M) + Build(200KB) + Probe(8M) = 高
方案 B(優化後):medium → small → large
Step 1: medium JOIN small
- Build: small (1K rows → 200KB)
- Probe: medium (100K rows)
- 中間結果:80,000 rows
Step 2: large JOIN 中間結果
- Build: 中間結果 (80K rows → 16MB)
- Probe: large (10M rows)
- 最終結果:6,000,000 rows
總成本:Build(200KB) + Probe(100K) + Build(16MB) + Probe(10M) = 低
選擇:方案 B(先 JOIN 小表)
實現提示:DataFusion 的 JoinSelection
優化規則會利用統計資訊來重新排列 JOIN 順序。
對於單個 Hash Join,DataFusion 的 JoinSelection
優化規則會智能選擇 Build 和 Probe 側。
實際邏輯(來自 datafusion/physical-optimizer/src/join_selection.rs
):
/// 檢查是否應該交換 Join 的左右順序
/// 返回 true 表示應該交換(讓右側作為 Build 側)
pub(crate) fn should_swap_join_order(
left: &dyn ExecutionPlan,
right: &dyn ExecutionPlan,
) -> Result<bool> {
let left_stats = left.partition_statistics(None)?;
let right_stats = right.partition_statistics(None)?;
// 優先使用 total_byte_size 來比較
match (
left_stats.total_byte_size.get_value(),
right_stats.total_byte_size.get_value(),
) {
(Some(l), Some(r)) => {
// 如果左側字節數 > 右側,交換(讓小的右側作為 Build)
return Ok(l > r);
}
_ => {
// total_byte_size 不可用時,退化使用 num_rows
}
}
// Fallback:使用行數比較
match (
left_stats.num_rows.get_value(),
right_stats.num_rows.get_value(),
) {
(Some(l), Some(r)) => {
// 如果左側行數 > 右側,交換
Ok(l > r)
}
_ => {
// 完全沒有統計資訊,不交換
Ok(false)
}
}
}
決策優先級:
1. total_byte_size(優先)
↓ 理由:更準確反映記憶體使用
↓ 例如:100 萬行 × 每行 1KB = 1GB
↓ vs 1000 萬行 × 每行 10B = 100MB
↓ 行數多不代表佔用大
2. num_rows(備選)
↓ 理由:當沒有字節數統計時使用
↓ 假設:行大小相似
3. 保持原樣(無統計)
↓ 理由:避免盲目交換導致性能下降
配置閾值:
DataFusion 還考慮了「單分區收集」的閾值:
// 判斷是否可以將右側完全收集到單個分區(CollectLeft 模式)
fn supports_collect_by_thresholds(
plan: &dyn ExecutionPlan,
threshold_byte_size: usize, // 預設: hash_join_single_partition_threshold
threshold_num_rows: usize, // 預設: hash_join_single_partition_threshold_rows
) -> bool {
let stats = plan.partition_statistics(None);
// 優先檢查字節數
if let Some(byte_size) = stats.total_byte_size.get_value() {
*byte_size != 0 && *byte_size < threshold_byte_size
}
// 退化檢查行數
else if let Some(num_rows) = stats.num_rows.get_value() {
*num_rows != 0 && *num_rows < threshold_num_rows
}
else {
false
}
}
這確保了只有當右側足夠小時,才會使用 CollectLeft
模式(將右側收集到單個分區)。
聚合的輸出行數取決於 GROUP BY
的基數:
SELECT user_id, COUNT(*)
FROM orders
GROUP BY user_id;
估算:
已知:distinct(user_id) = 50,000
輸出行數 = distinct(user_id) = 50,000
如果沒有 distinct_count:
- 使用啟發式:min(num_rows, num_rows / 10)
- 假設平均每組 10 行
這個估算影響下游算子的記憶體分配和並行度決策。
在某些情況下,你可能比引擎更了解數據特徵,可以手動提供統計資訊。
假設你實現了一個連接外部 API 的 TableProvider:
struct ApiTableProvider {
schema: SchemaRef,
api_client: ApiClient,
// 你知道 API 大約返回多少行
estimated_row_count: usize,
}
#[async_trait]
impl TableProvider for ApiTableProvider {
async fn scan(...) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ApiExec {
// ...
}))
}
fn statistics(&self) -> Option<Statistics> {
Some(Statistics {
// 提供估算的行數,幫助優化器
num_rows: Precision::Inexact(self.estimated_row_count),
total_byte_size: Precision::Inexact(
self.estimated_row_count * 200 // 假設每行 200 字節
),
column_statistics: vec![
// 如果知道某些欄位的範圍,也可以提供
ColumnStatistics {
null_count: Precision::Absent,
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
max_value: Precision::Exact(ScalarValue::Int32(Some(1000000))),
distinct_count: Precision::Inexact(50000),
},
// ... 其他欄位
],
})
}
}
使用 EXPLAIN
可以看到統計資訊如何影響計劃:
-- 假設我們有兩個表,一個有統計,一個沒有
CREATE EXTERNAL TABLE orders_with_stats
STORED AS PARQUET
LOCATION '/path/to/orders.parquet'; -- Parquet 有統計
CREATE EXTERNAL TABLE users_no_stats
STORED AS CSV
LOCATION '/path/to/users.csv'; -- CSV 無統計
-- 查看 JOIN 計劃
EXPLAIN SELECT *
FROM orders_with_stats o
JOIN users_no_stats u ON o.user_id = u.id;
可能的輸出:
PhysicalPlan:
HashJoinExec: mode=CollectLeft, on=[(user_id, id)]
├── CsvExec: file=/path/to/users.csv ← Build 側(默認選擇)
└── ParquetExec: file=/path/to/orders.parquet ← Probe 側
說明:由於 users 沒有統計,優化器無法確定大小,
可能做出次優選擇(應該 orders 更大,但引擎不確定)
如果為 CSV 提供統計:
// 包裝 CsvExec,提供統計
struct CsvWithStats {
inner: Arc<CsvExec>,
stats: Statistics,
}
impl ExecutionPlan for CsvWithStats {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone()) // 返回我們提供的統計
}
// 其他方法委託給 inner...
}
再次 EXPLAIN 可能會看到優化後的計劃:
PhysicalPlan:
HashJoinExec: mode=CollectLeft, on=[(user_id, id)]
├── CsvExec: file=/path/to/users.csv (stats: 100K rows) ← Build 側(小表)
└── ParquetExec: file=/path/to/orders.parquet (stats: 10M rows) ← Probe 側(大表)
優化器現在知道 users 只有 10 萬行,orders 有 1000 萬行,
正確選擇 users 作為 Build 側!
DataFusion 提供了多個與統計資訊相關的配置選項:
-- 設定單分區 Hash Join 的字節數閾值(預設 128MB)
SET datafusion.optimizer.hash_join_single_partition_threshold = 134217728;
-- 設定單分區 Hash Join 的行數閾值(預設 131072)
SET datafusion.optimizer.hash_join_single_partition_threshold_rows = 131072;
-- 是否優先使用 Hash Join(預設 true)
SET datafusion.optimizer.prefer_hash_join = true;
作用:當右側表的大小(字節數或行數)小於閾值時,使用 CollectLeft
模式將右側收集到單個分區,避免分區間的數據重分配開銷。
-- 是否啟用 Parquet 統計資訊(預設 true)
SET datafusion.execution.parquet.pruning = true;
-- 是否啟用 Page Index 進行更精細的過濾(預設 true)
SET datafusion.execution.parquet.enable_page_index = true;
-- 是否啟用 Bloom Filter 進行點查詢優化(預設 true)
SET datafusion.execution.parquet.bloom_filter_on_read = true;
作用:這些配置控制 DataFusion 如何利用 Parquet 檔案的統計資訊進行謂詞下推和數據剪枝。
使用 EXPLAIN VERBOSE
可以看到統計資訊的影響:
-- 顯示詳細的執行計劃和統計資訊
EXPLAIN VERBOSE SELECT * FROM orders o JOIN customers c ON o.customer_id = c.id;
輸出可能包含:
PhysicalPlan:
HashJoinExec: mode=CollectLeft, on=[(customer_id, id)]
stats=[num_rows=10000000, total_byte_size=2000000000]
├── ParquetExec: file=customers.parquet
│ stats=[num_rows=100000, total_byte_size=15000000] ← 右側較小
└── ParquetExec: file=orders.parquet
stats=[num_rows=10000000, total_byte_size=2000000000]
今天我們深入探討了 DataFusion 的統計資訊與基於成本的優化:
統計資訊的價值:讓優化器從「盲目猜測」變為「數據驅動決策」,在 Join 策略選擇、Join 順序優化、記憶體管理等方面帶來顯著性能提升
Statistics 結構設計:透過 num_rows
、total_byte_size
、column_statistics
描述數據特徵,用 Precision
枚舉區分精確值、估算值和缺失值
統計資訊來源:
Cost-Based 決策:
實戰技巧:
統計資訊是現代查詢引擎的「眼睛」,讓優化器能夠「看見」數據的真實面貌,做出明智的選擇。隨著 DataFusion 的演進,統計能力將更加豐富和智能,為查詢性能帶來持續的提升。
前面介紹了這麼多內容,對這個開源專案應該有更深的認識了,明天最後一天了,就來介紹如何貢獻 DataFusion 吧!