昨天我們探討了 TableProvider 機制,了解了 Partition Pruning 和多層次優化的概念。今天我們將深入 Parquet 讀取的實作細節,透過追蹤原始碼和實際範例,理解 DataFusion 如何將這些優化概念落實到程式碼中。
我們會聚焦在幾個關鍵的技術實作:
今日學習重點:
當 DataSourceExec 開始執行時,會透過 ParquetOpener
打開 Parquet 檔案。讓我們追蹤這個過程的關鍵步驟,了解優化是如何逐層應用的。
ParquetOpener 的 open()
方法是整個讀取流程的入口,它按照以下順序執行:
┌────────────────────────────────────────────────────────────────┐
│ Step 1: 讀取檔案元數據(ParquetMetadata) │
│ └─ 從檔案末尾讀取 Footer │
│ └─ 解析 Schema、Row Group 元數據 │
│ └─ (可選) 載入 Page Index │
└────────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ Step 2: 構建 RowFilter(行級過濾器) │
│ └─ 拆分謂詞為合取式(AND 分離) │
│ └─ 計算每個謂詞需要的列大小 │
│ └─ 按成本重排序謂詞 │
│ └─ 編譯為 ArrowPredicate │
└────────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ Step 3: 創建 ParquetAccessPlan(訪問計劃) │
│ └─ 初始化:預設掃描所有 Row Group │
│ └─ 應用 Row Group 統計剪枝 │
│ └─ 應用 Bloom Filter 剪枝 │
│ └─ 應用 Page Index 剪枝 │
└────────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ Step 4: 創建 ParquetRecordBatchStream │
│ └─ 配置 ProjectionMask(需要讀取的列) │
│ └─ 設定 RowFilter │
│ └─ 根據 AccessPlan 讀取指定的 Row Group/Page │
└────────────────────────────────────────────────────────────────┘
這個流程的特色在於多階段過濾的級聯效應:
讓我們深入每個關鍵步驟的實作細節。
ParquetAccessPlan
是 DataFusion 中一個關鍵的數據結構,它記錄了對每個 Row Group 的訪問策略。
ParquetAccessPlan 的核心是一個 Vec<RowGroupAccess>
,每個元素對應一個 Row Group:
pub struct ParquetAccessPlan {
row_groups: Vec<RowGroupAccess>,
}
pub enum RowGroupAccess {
Skip, // 完全跳過這個 Row Group
Scan, // 掃描整個 Row Group
Selection(RowSelection), // 只掃描部分行
}
這個設計允許三種訪問模式:
ParquetOpener 中創建 AccessPlan 的過程體現了漸進式剪枝的思想:
// 來自 opener.rs 的簡化邏輯
// 1. 初始化:預設掃描所有 Row Group
let access_plan = create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
// 2. 如果有檔案範圍限制(例如只讀取檔案的某個區段)
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
}
// 3. 利用 Row Group 統計資訊剪枝
if let Some(predicate) = predicate.as_ref() {
if enable_row_group_stats_pruning {
row_groups.prune_by_statistics(
&physical_file_schema,
builder.parquet_schema(),
rg_metadata,
predicate,
&file_metrics,
);
}
// 4. 利用 Bloom Filter 剪枝(異步操作)
if enable_bloom_filter && !row_groups.is_empty() {
row_groups.prune_by_bloom_filters(
&physical_file_schema,
&mut builder,
predicate,
&file_metrics,
).await;
}
}
let mut access_plan = row_groups.build();
// 5. 利用 Page Index 剪枝
if enable_page_index && !access_plan.is_empty() {
if let Some(page_pruning_predicate) = page_pruning_predicate.as_ref() {
access_plan = page_pruning_predicate
.prune_plan_with_page_index(access_plan, builder.metadata())
.unwrap_or(access_plan);
}
}
Row Group 統計剪枝:
Bloom Filter 剪枝:
column = value
)Page Index 剪枝:
RowGroupAccess::Scan
變為 RowGroupAccess::Selection(RowSelection)
假設查詢 SELECT * FROM users WHERE age > 50
,有一個包含 4 個 Row Group 的 Parquet 檔案:
初始狀態:
[Scan, Scan, Scan, Scan]
經過 Row Group 統計剪枝:
Row Group 0: age [18, 35] → Skip(max < 50)
Row Group 1: age [25, 58] → Scan
Row Group 2: age [18, 40] → Skip(max < 50)
Row Group 3: age [45, 65] → Scan
結果:[Skip, Scan, Skip, Scan]
經過 Page Index 剪枝(針對 Row Group 1):
Row Group 1 有 10 個 Page,其中:
- Page 0-4: max < 50 → 跳過
- Page 5-9: 包含 age > 50 的數據 → 保留
結果:[Skip, Selection(rows 500000-1000000), Skip, Scan]
最終只需讀取 Row Group 1 的後半部分和整個 Row Group 3,大幅減少 I/O。
即使 AccessPlan 已經決定了讀取哪些 Row Group 和 Page,DataFusion 還會在解碼階段應用 RowFilter
進行更精細的過濾。這就是延遲物化(Late Materialization)。
AccessPlan 只能利用元數據(統計資訊)進行剪枝,但有些情況無法在元數據層面處理:
age > 50 AND city = 'Taipei'
,即使 Row Group 包含符合 age > 50
的數據,也可能大部分不符合 city = 'Taipei'
build_row_filter()
函數實現了智能的謂詞處理:
// 來自 row_filter.rs 的簡化邏輯
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
physical_file_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>> {
// 1. 拆分合取式:`a AND b AND c` → [a, b, c]
let predicates = split_conjunction(expr);
// 2. 分析每個謂詞,構建候選列表
let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
.map(|expr| {
FilterCandidateBuilder::new(expr, schema, metadata)
.build(metadata)
})
.collect()?;
// 3. 如果啟用重排序,按成本排序
if reorder_predicates {
candidates.sort_unstable_by(|c1, c2| {
// 優先順序:
// 1. 能使用索引的謂詞(例如基於排序的列)
// 2. 需要讀取的字節數(壓縮後)
match c1.can_use_index.cmp(&c2.can_use_index) {
Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
ord => ord,
}
});
}
// 4. 編譯為 ArrowPredicate
let filters: Vec<Box<dyn ArrowPredicate>> = candidates
.into_iter()
.map(|candidate| {
Box::new(DatafusionArrowPredicate::new(candidate, metadata))
})
.collect()?;
// 5. 構建 RowFilter
Ok(Some(RowFilter::new(filters)))
}
重排序的核心思想是先執行最有效的過濾器。評估標準包括:
舉例說明:
SELECT user_id, name, email, description
FROM users
WHERE city = 'Taipei' -- city 列:1 MB (壓縮)
AND age > 50 -- age 列:0.5 MB (壓縮)
AND description LIKE '%DataFusion%' -- description 列:50 MB (壓縮)
不重排序:按原始順序執行
重排序後:按大小排序
結果:總 I/O 從 51.5 MB 降低到約 4 MB。
RowFilter 在 Parquet Reader 的解碼循環中應用:
For each Row Group to scan:
For each batch of rows (e.g., 8192 rows):
1. 讀取第一個過濾列(例如 age)
2. 解碼並應用謂詞 → 生成選擇向量 [T,F,T,T,F,...]
3. 如果全部為 False,跳過這批行,繼續下一批
4. 讀取第二個過濾列(例如 city)
5. 解碼並應用謂詞 → 更新選擇向量 [T,F,F,T,F,...]
6. 重複直到所有過濾列處理完
7. 根據最終的選擇向量,只讀取投影列(user_id, name, email)
8. 返回過濾後的 RecordBatch
這種機制的優勢:
Parquet 檔案的 Footer 包含所有的元數據,讀取它需要一次 I/O 操作。對於包含大量小檔案的數據集,元數據讀取可能成為瓶頸。
Parquet 的 Footer 位於檔案末尾,但不知道確切大小前,需要讀取末尾的幾個字節來確定。metadata_size_hint
允許一次性讀取足夠的數據:
// 預設行為:兩次讀取
// 1. 讀取檔案末尾 8 字節,獲取 metadata 大小
// 2. 根據大小讀取完整 metadata
// 使用 hint:一次讀取
source = source.with_metadata_size_hint(64 * 1024); // 讀取末尾 64KB
// 如果 metadata < 64KB,一次就讀取完成!
適用場景:
注意事項:
DataFusion 使用 CachedParquetFileReaderFactory
來快取元數據:
let metadata_cache = state.runtime_env()
.cache_manager
.get_file_metadata_cache();
let cached_reader_factory = Arc::new(
CachedParquetFileReaderFactory::new(store, metadata_cache)
);
source = source.with_parquet_file_reader_factory(cached_reader_factory);
快取的好處:
快取策略:
ParquetMetaData
對象,不是原始字節Page Index 是 Parquet 格式的一個相對較新的特性(引入於 Parquet 1.12),它為每個 Page 提供 min/max 統計資訊,允許更細粒度的剪枝。
Page Index 包含兩部分:
Row Group 元數據:
Column Chunk (age):
- 總體統計: min=18, max=65
- Page Index:
Page 0: min=18, max=25, offset=1000, size=8192
Page 1: min=22, max=30, offset=9192, size=8192
Page 2: min=45, max=55, offset=17384, size=8192
Page 3: min=50, max=65, offset=25576, size=8192
PagePruningAccessPlanFilter
利用 Page Index 更新 AccessPlan:
// 來自 page_filter.rs 的簡化邏輯
pub fn prune_plan_with_page_index(
&self,
mut access_plan: ParquetAccessPlan,
metadata: &ParquetMetaData,
) -> Result<ParquetAccessPlan> {
for row_group_idx in 0..metadata.num_row_groups() {
// 只處理標記為 Scan 的 Row Group
if !access_plan.should_scan(row_group_idx) {
continue;
}
// 獲取 Page Index
let column_index = metadata.row_group(row_group_idx).column_index()?;
let offset_index = metadata.row_group(row_group_idx).offset_index()?;
// 對每個 Page 應用謂詞
let mut row_selection = RowSelection::new();
for page_idx in 0..column_index.len() {
let page_stats = &column_index[page_idx];
// 檢查謂詞是否可能為真
if self.can_prune(page_stats)? {
// 這個 Page 可以跳過
row_selection.skip(offset_index[page_idx].row_count);
} else {
// 需要讀取這個 Page
row_selection.select(offset_index[page_idx].row_count);
}
}
// 更新 AccessPlan
if row_selection.skipped_any() {
access_plan.scan_selection(row_group_idx, row_selection);
}
}
Ok(access_plan)
}
Page Index 在以下情況特別有效:
WHERE date BETWEEN '2024-01-01' AND '2024-01-31'
注意:不是所有 Parquet 檔案都包含 Page Index。它需要在寫入時啟用,並且會增加少量的檔案大小(通常 <1%)。
綜合上述機制,以下是一些實用的調優建議和配置選項。
use datafusion::prelude::*;
use datafusion::config::TableParquetOptions;
let mut parquet_options = TableParquetOptions::default();
// 1. 啟用所有下推優化
parquet_options.pushdown_filters = true; // RowFilter(預設 true)
parquet_options.reorder_filters = true; // 謂詞重排序(預設 true)
parquet_options.enable_page_index = true; // Page Index 剪枝(預設 true)
parquet_options.enable_bloom_filter = false; // Bloom Filter(預設 false,有開銷)
// 2. 元數據快取和讀取
parquet_options.metadata_size_hint = Some(64 * 1024); // 64KB hint
// 3. 並行度控制
let config = SessionConfig::new()
.with_target_partitions(8) // 並行讀取的分區數
.with_parquet_options(parquet_options);
let ctx = SessionContext::new_with_config(config);
案例 1:寬表查詢(100 列表,查詢 3 列)
SELECT user_id, name, age
FROM wide_table
WHERE age > 30
優化效果:
案例 2:高選擇性過濾(0.1% 符合條件)
SELECT * FROM events
WHERE event_type = 'purchase' AND user_id = 12345
優化效果:
user_id = 12345
,再跳過 15% Row Group案例 3:排序數據的範圍查詢
SELECT * FROM logs
WHERE timestamp BETWEEN '2024-01-01' AND '2024-01-07'
優化效果(假設數據按 timestamp 排序):
查詢性能不僅取決於讀取邏輯,還取決於 Parquet 檔案的寫入方式:
use parquet::file::properties::WriterProperties;
let props = WriterProperties::builder()
.set_column_index_truncate_length(Some(64)) // 啟用 Page Index
.build();
選擇適當的 Row Group 大小:
為高基數等值查詢列創建 Bloom Filter:
let props = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.build();
WHERE date BETWEEN ...
,按 date 排序寫入今天我們深入探討了 DataFusion 中 Parquet 讀取的實作細節:
ParquetOpener 執行流程:
ParquetAccessPlan 的創建:
RowFilter 的智能構建:
元數據管理:
Page Index 應用:
明天我們將探討 Window Functions Part 1,了解視窗函數的基礎概念、OVER 子句的解析,以及 WindowExec 的執行模型。