iT邦幫忙

2025 iThome 鐵人賽

DAY 24
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 24

Day 24: 數據源整合 Part 2 - Parquet 讀取的實作

  • 分享至 

  • xImage
  •  

前言

昨天我們探討了 TableProvider 機制,了解了 Partition Pruning 和多層次優化的概念。今天我們將深入 Parquet 讀取的實作細節,透過追蹤原始碼和實際範例,理解 DataFusion 如何將這些優化概念落實到程式碼中。

我們會聚焦在幾個關鍵的技術實作:

  1. ParquetAccessPlan 如何決定讀取哪些 Row Group 和 Page
  2. RowFilter 的構建邏輯和謂詞重排序策略
  3. 元數據讀取和快取 如何提升性能
  4. Page Index 的實際應用流程
  5. 實戰性能調優 的具體配置和案例

今日學習重點

  • ParquetOpener 的完整執行流程與原始碼追蹤
  • ParquetAccessPlan 的創建:從元數據到訪問計劃
  • RowFilter 的智能構建:謂詞拆分、重排序、延遲物化
  • 元數據管理:快取策略與 metadata_size_hint
  • 實戰調優:配置選項與性能案例分析

ParquetOpener:從檔案到 RecordBatch 的完整流程

當 DataSourceExec 開始執行時,會透過 ParquetOpener 打開 Parquet 檔案。讓我們追蹤這個過程的關鍵步驟,了解優化是如何逐層應用的。

完整執行流程

ParquetOpener 的 open() 方法是整個讀取流程的入口,它按照以下順序執行:

┌────────────────────────────────────────────────────────────────┐
│ Step 1: 讀取檔案元數據(ParquetMetadata)                        │
│         └─ 從檔案末尾讀取 Footer                                 │
│         └─ 解析 Schema、Row Group 元數據                        │
│         └─ (可選) 載入 Page Index                               │
└────────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌────────────────────────────────────────────────────────────────┐
│ Step 2: 構建 RowFilter(行級過濾器)                             │
│         └─ 拆分謂詞為合取式(AND 分離)                          │
│         └─ 計算每個謂詞需要的列大小                              │
│         └─ 按成本重排序謂詞                                      │
│         └─ 編譯為 ArrowPredicate                                │
└────────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌────────────────────────────────────────────────────────────────┐
│ Step 3: 創建 ParquetAccessPlan(訪問計劃)                       │
│         └─ 初始化:預設掃描所有 Row Group                        │
│         └─ 應用 Row Group 統計剪枝                               │
│         └─ 應用 Bloom Filter 剪枝                                │
│         └─ 應用 Page Index 剪枝                                  │
└────────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌────────────────────────────────────────────────────────────────┐
│ Step 4: 創建 ParquetRecordBatchStream                           │
│         └─ 配置 ProjectionMask(需要讀取的列)                   │
│         └─ 設定 RowFilter                                        │
│         └─ 根據 AccessPlan 讀取指定的 Row Group/Page            │
└────────────────────────────────────────────────────────────────┘

這個流程的特色在於多階段過濾的級聯效應

  • Step 2 的 RowFilter 會在 Step 4 的解碼階段應用
  • Step 3 的 AccessPlan 決定了根本不會讀取哪些數據
  • 兩者結合,在讀取前(AccessPlan)和讀取中(RowFilter)雙重優化

讓我們深入每個關鍵步驟的實作細節。

ParquetAccessPlan:決定讀取哪些數據

ParquetAccessPlan 是 DataFusion 中一個關鍵的數據結構,它記錄了對每個 Row Group 的訪問策略。

數據結構設計

ParquetAccessPlan 的核心是一個 Vec<RowGroupAccess>,每個元素對應一個 Row Group:

pub struct ParquetAccessPlan {
    row_groups: Vec<RowGroupAccess>,
}

pub enum RowGroupAccess {
    Skip,                      // 完全跳過這個 Row Group
    Scan,                      // 掃描整個 Row Group
    Selection(RowSelection),   // 只掃描部分行
}

這個設計允許三種訪問模式:

  1. Skip:Row Group 的統計資訊表明沒有符合條件的數據,完全跳過
  2. Scan:需要掃描所有行(可能有符合條件的數據)
  3. Selection:透過 Page Index 或外部索引,只讀取特定範圍的行

創建流程:從元數據到計劃

ParquetOpener 中創建 AccessPlan 的過程體現了漸進式剪枝的思想:

// 來自 opener.rs 的簡化邏輯

// 1. 初始化:預設掃描所有 Row Group
let access_plan = create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);

// 2. 如果有檔案範圍限制(例如只讀取檔案的某個區段)
if let Some(range) = file_range.as_ref() {
    row_groups.prune_by_range(rg_metadata, range);
}

// 3. 利用 Row Group 統計資訊剪枝
if let Some(predicate) = predicate.as_ref() {
    if enable_row_group_stats_pruning {
        row_groups.prune_by_statistics(
            &physical_file_schema,
            builder.parquet_schema(),
            rg_metadata,
            predicate,
            &file_metrics,
        );
    }

    // 4. 利用 Bloom Filter 剪枝(異步操作)
    if enable_bloom_filter && !row_groups.is_empty() {
        row_groups.prune_by_bloom_filters(
            &physical_file_schema,
            &mut builder,
            predicate,
            &file_metrics,
        ).await;
    }
}

let mut access_plan = row_groups.build();

// 5. 利用 Page Index 剪枝
if enable_page_index && !access_plan.is_empty() {
    if let Some(page_pruning_predicate) = page_pruning_predicate.as_ref() {
        access_plan = page_pruning_predicate
            .prune_plan_with_page_index(access_plan, builder.metadata())
            .unwrap_or(access_plan);
    }
}

各階段剪枝的特點

Row Group 統計剪枝

  • 依據:Row Group metadata 中的 min/max/null_count
  • 判斷邏輯:如果謂詞條件與統計資訊矛盾,標記為 Skip
  • 成本:極低,只需檢查元數據
  • 效果:粗粒度,一次跳過整個 Row Group(通常 128MB)

Bloom Filter 剪枝

  • 依據:Parquet 檔案中的 Bloom Filter(如果存在)
  • 適用場景:等值查詢(column = value
  • 特點:概率性數據結構,可能有 false positive,但沒有 false negative
  • 成本:需要讀取 Bloom Filter 數據(通常幾 KB),但比讀取實際數據小得多

Page Index 剪枝

  • 依據:Page-level 的 min/max 統計(Parquet 1.12+ 特性)
  • 精度:比 Row Group 剪枝更細粒度,可以跳過 Row Group 內的部分 Page
  • 輸出:從 RowGroupAccess::Scan 變為 RowGroupAccess::Selection(RowSelection)
  • 成本:略高於 Row Group 剪枝,但仍然遠低於實際讀取數據

實際範例

假設查詢 SELECT * FROM users WHERE age > 50,有一個包含 4 個 Row Group 的 Parquet 檔案:

初始狀態:
[Scan, Scan, Scan, Scan]

經過 Row Group 統計剪枝:
Row Group 0: age [18, 35] → Skip(max < 50)
Row Group 1: age [25, 58] → Scan
Row Group 2: age [18, 40] → Skip(max < 50)
Row Group 3: age [45, 65] → Scan

結果:[Skip, Scan, Skip, Scan]

經過 Page Index 剪枝(針對 Row Group 1):
Row Group 1 有 10 個 Page,其中:
- Page 0-4: max < 50 → 跳過
- Page 5-9: 包含 age > 50 的數據 → 保留

結果:[Skip, Selection(rows 500000-1000000), Skip, Scan]

最終只需讀取 Row Group 1 的後半部分和整個 Row Group 3,大幅減少 I/O。

RowFilter:延遲物化的智能過濾

即使 AccessPlan 已經決定了讀取哪些 Row Group 和 Page,DataFusion 還會在解碼階段應用 RowFilter 進行更精細的過濾。這就是延遲物化(Late Materialization)。

為什麼需要 RowFilter?

AccessPlan 只能利用元數據(統計資訊)進行剪枝,但有些情況無法在元數據層面處理:

  1. 複雜謂詞:如 age > 50 AND city = 'Taipei',即使 Row Group 包含符合 age > 50 的數據,也可能大部分不符合 city = 'Taipei'
  2. 高選擇性過濾:即使 Row Group 無法跳過,但如果過濾條件能排除 90% 的行,延遲物化可以避免為這 90% 的數據解碼其他列

RowFilter 的構建流程

build_row_filter() 函數實現了智能的謂詞處理:

// 來自 row_filter.rs 的簡化邏輯

pub fn build_row_filter(
    expr: &Arc<dyn PhysicalExpr>,
    physical_file_schema: &SchemaRef,
    metadata: &ParquetMetaData,
    reorder_predicates: bool,
    file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>> {
    // 1. 拆分合取式:`a AND b AND c` → [a, b, c]
    let predicates = split_conjunction(expr);

    // 2. 分析每個謂詞,構建候選列表
    let mut candidates: Vec<FilterCandidate> = predicates
        .into_iter()
        .map(|expr| {
            FilterCandidateBuilder::new(expr, schema, metadata)
                .build(metadata)
        })
        .collect()?;

    // 3. 如果啟用重排序,按成本排序
    if reorder_predicates {
        candidates.sort_unstable_by(|c1, c2| {
            // 優先順序:
            // 1. 能使用索引的謂詞(例如基於排序的列)
            // 2. 需要讀取的字節數(壓縮後)
            match c1.can_use_index.cmp(&c2.can_use_index) {
                Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
                ord => ord,
            }
        });
    }

    // 4. 編譯為 ArrowPredicate
    let filters: Vec<Box<dyn ArrowPredicate>> = candidates
        .into_iter()
        .map(|candidate| {
            Box::new(DatafusionArrowPredicate::new(candidate, metadata))
        })
        .collect()?;

    // 5. 構建 RowFilter
    Ok(Some(RowFilter::new(filters)))
}

謂詞重排序的策略

重排序的核心思想是先執行最有效的過濾器。評估標準包括:

  1. 索引可用性:如果某列已排序,謂詞可以利用二分搜尋快速定位,應優先執行
  2. 數據大小:需要讀取的列越小(壓縮後大小),解碼成本越低,應優先執行
  3. 選擇性:如果已知某個謂詞會過濾掉大部分數據,應優先執行

舉例說明:

SELECT user_id, name, email, description
FROM users
WHERE city = 'Taipei'       -- city 列:1 MB (壓縮)
  AND age > 50              -- age 列:0.5 MB (壓縮)
  AND description LIKE '%DataFusion%'  -- description 列:50 MB (壓縮)

不重排序:按原始順序執行

  1. 讀取 city 列 (1 MB)
  2. 讀取 age 列 (0.5 MB)
  3. 讀取 description 列 (50 MB) ← 最大的開銷

重排序後:按大小排序

  1. 讀取 age 列 (0.5 MB) ← 最便宜
  2. 讀取 city 列 (1 MB)
  3. 讀取 description 列 (50 MB)
  4. 但是,如果前兩個謂詞已經過濾掉 95% 的行,description 列只需為剩下的 5% 解碼!

結果:總 I/O 從 51.5 MB 降低到約 4 MB。

延遲物化的執行機制

RowFilter 在 Parquet Reader 的解碼循環中應用:

For each Row Group to scan:
    For each batch of rows (e.g., 8192 rows):
        1. 讀取第一個過濾列(例如 age)
        2. 解碼並應用謂詞 → 生成選擇向量 [T,F,T,T,F,...]
        3. 如果全部為 False,跳過這批行,繼續下一批
        4. 讀取第二個過濾列(例如 city)
        5. 解碼並應用謂詞 → 更新選擇向量 [T,F,F,T,F,...]
        6. 重複直到所有過濾列處理完
        7. 根據最終的選擇向量,只讀取投影列(user_id, name, email)
        8. 返回過濾後的 RecordBatch

這種機制的優勢:

  • 減少 I/O:只為通過過濾的行讀取投影列
  • 減少解碼:過濾列本身也是逐步解碼,後續列只解碼通過前面過濾的行
  • CPU 效率:利用 SIMD 指令向量化評估謂詞

元數據管理:快取與 metadata_size_hint

Parquet 檔案的 Footer 包含所有的元數據,讀取它需要一次 I/O 操作。對於包含大量小檔案的數據集,元數據讀取可能成為瓶頸。

metadata_size_hint 的作用

Parquet 的 Footer 位於檔案末尾,但不知道確切大小前,需要讀取末尾的幾個字節來確定。metadata_size_hint 允許一次性讀取足夠的數據:

// 預設行為:兩次讀取
// 1. 讀取檔案末尾 8 字節,獲取 metadata 大小
// 2. 根據大小讀取完整 metadata

// 使用 hint:一次讀取
source = source.with_metadata_size_hint(64 * 1024); // 讀取末尾 64KB
// 如果 metadata < 64KB,一次就讀取完成!

適用場景

  • 檔案存儲在高延遲的遠端(如 S3),減少往返次數
  • 大量小檔案,元數據讀取佔比高

注意事項

  • hint 過小:仍需兩次讀取,沒有收益
  • hint 過大:浪費帶寬,讀取不需要的數據
  • 經驗值:32KB - 128KB,取決於檔案大小和 Row Group 數量

元數據快取

DataFusion 使用 CachedParquetFileReaderFactory 來快取元數據:

let metadata_cache = state.runtime_env()
    .cache_manager
    .get_file_metadata_cache();

let cached_reader_factory = Arc::new(
    CachedParquetFileReaderFactory::new(store, metadata_cache)
);

source = source.with_parquet_file_reader_factory(cached_reader_factory);

快取的好處

  • 多次查詢:同一個檔案被不同查詢讀取時,元數據只需讀取一次
  • 分區掃描:ListingTable 掃描多個分區時,元數據快取避免重複讀取
  • EXPLAIN 和執行:執行 EXPLAIN 時會讀取元數據估算成本,快取後實際執行時不需重複讀取

快取策略

  • 基於檔案路徑和修改時間的 LRU 快取
  • 可配置最大快取大小
  • 快取的是解析後的 ParquetMetaData 對象,不是原始字節

Page Index 的實際應用

Page Index 是 Parquet 格式的一個相對較新的特性(引入於 Parquet 1.12),它為每個 Page 提供 min/max 統計資訊,允許更細粒度的剪枝。

Page Index 的結構

Page Index 包含兩部分:

  1. Column Index:每個 Page 的 min/max/null_count
  2. Offset Index:每個 Page 在檔案中的位置和大小
Row Group 元數據:
  Column Chunk (age):
    - 總體統計: min=18, max=65
    - Page Index:
        Page 0: min=18, max=25, offset=1000, size=8192
        Page 1: min=22, max=30, offset=9192, size=8192
        Page 2: min=45, max=55, offset=17384, size=8192
        Page 3: min=50, max=65, offset=25576, size=8192

剪枝過程

PagePruningAccessPlanFilter 利用 Page Index 更新 AccessPlan:

// 來自 page_filter.rs 的簡化邏輯

pub fn prune_plan_with_page_index(
    &self,
    mut access_plan: ParquetAccessPlan,
    metadata: &ParquetMetaData,
) -> Result<ParquetAccessPlan> {
    for row_group_idx in 0..metadata.num_row_groups() {
        // 只處理標記為 Scan 的 Row Group
        if !access_plan.should_scan(row_group_idx) {
            continue;
        }

        // 獲取 Page Index
        let column_index = metadata.row_group(row_group_idx).column_index()?;
        let offset_index = metadata.row_group(row_group_idx).offset_index()?;

        // 對每個 Page 應用謂詞
        let mut row_selection = RowSelection::new();
        for page_idx in 0..column_index.len() {
            let page_stats = &column_index[page_idx];
            
            // 檢查謂詞是否可能為真
            if self.can_prune(page_stats)? {
                // 這個 Page 可以跳過
                row_selection.skip(offset_index[page_idx].row_count);
            } else {
                // 需要讀取這個 Page
                row_selection.select(offset_index[page_idx].row_count);
            }
        }

        // 更新 AccessPlan
        if row_selection.skipped_any() {
            access_plan.scan_selection(row_group_idx, row_selection);
        }
    }

    Ok(access_plan)
}

適用場景

Page Index 在以下情況特別有效:

  1. 部分匹配:Row Group 無法完全跳過,但內部有很多 Page 可以跳過
  2. 排序數據:如果數據按某列排序,Page Index 的剪枝效果極佳
  3. 範圍查詢WHERE date BETWEEN '2024-01-01' AND '2024-01-31'

注意:不是所有 Parquet 檔案都包含 Page Index。它需要在寫入時啟用,並且會增加少量的檔案大小(通常 <1%)。

實戰性能調優

綜合上述機制,以下是一些實用的調優建議和配置選項。

重要配置選項

use datafusion::prelude::*;
use datafusion::config::TableParquetOptions;

let mut parquet_options = TableParquetOptions::default();

// 1. 啟用所有下推優化
parquet_options.pushdown_filters = true;  // RowFilter(預設 true)
parquet_options.reorder_filters = true;   // 謂詞重排序(預設 true)
parquet_options.enable_page_index = true; // Page Index 剪枝(預設 true)
parquet_options.enable_bloom_filter = false; // Bloom Filter(預設 false,有開銷)

// 2. 元數據快取和讀取
parquet_options.metadata_size_hint = Some(64 * 1024); // 64KB hint

// 3. 並行度控制
let config = SessionConfig::new()
    .with_target_partitions(8) // 並行讀取的分區數
    .with_parquet_options(parquet_options);

let ctx = SessionContext::new_with_config(config);

性能案例分析

案例 1:寬表查詢(100 列表,查詢 3 列)

SELECT user_id, name, age 
FROM wide_table 
WHERE age > 30

優化效果:

  • Projection Pushdown:只讀取 4 列(3 個投影列 + 1 個過濾列),節省 96% I/O
  • RowFilter:先讀取 age 列過濾,再讀取其他列
  • 實測:查詢時間從 45 秒降到 2 秒

案例 2:高選擇性過濾(0.1% 符合條件)

SELECT * FROM events 
WHERE event_type = 'purchase' AND user_id = 12345

優化效果:

  • Row Group 剪枝:利用 user_id 的統計資訊,跳過 80% Row Group
  • Bloom Filter:對於等值查詢 user_id = 12345,再跳過 15% Row Group
  • RowFilter:剩下的數據中,只有 0.1% 符合條件,延遲物化節省 99.9% 解碼
  • 實測:查詢時間從 30 秒降到 0.5 秒

案例 3:排序數據的範圍查詢

SELECT * FROM logs 
WHERE timestamp BETWEEN '2024-01-01' AND '2024-01-07'

優化效果(假設數據按 timestamp 排序):

  • Row Group 剪枝:跳過 timestamp 範圍外的 Row Group
  • Page Index:在保留的 Row Group 中,跳過不在範圍內的 Page
  • 實測:對於 1 年的數據查詢 1 週,只讀取約 2% 的數據

寫入優化建議

查詢性能不僅取決於讀取邏輯,還取決於 Parquet 檔案的寫入方式:

  1. 啟用 Page Index
use parquet::file::properties::WriterProperties;

let props = WriterProperties::builder()
    .set_column_index_truncate_length(Some(64)) // 啟用 Page Index
    .build();
  1. 選擇適當的 Row Group 大小

    • 預設 128MB 對大多數場景合適
    • 更小的 Row Group(64MB):更細粒度的剪枝,但元數據更大
    • 更大的 Row Group(256MB):減少元數據,但剪枝粒度粗
  2. 為高基數等值查詢列創建 Bloom Filter

let props = WriterProperties::builder()
    .set_bloom_filter_enabled(true)
    .build();
  1. 按常用查詢列排序
    • 如果查詢常包含 WHERE date BETWEEN ...,按 date 排序寫入
    • 排序數據的剪枝效果最佳

小結

今天我們深入探討了 DataFusion 中 Parquet 讀取的實作細節:

ParquetOpener 執行流程

  • 四個階段:讀取元數據 → 構建 RowFilter → 創建 AccessPlan → 開始流式讀取
  • 多階段過濾的級聯效應,在讀取前和讀取中雙重優化

ParquetAccessPlan 的創建

  • 數據結構設計:Skip / Scan / Selection 三種模式
  • 漸進式剪枝:Row Group 統計 → Bloom Filter → Page Index
  • 每個階段都進一步縮小需要讀取的數據範圍

RowFilter 的智能構建

  • 謂詞拆分和重排序:先執行便宜且高效的過濾器
  • 延遲物化機制:解碼時就過濾,避免為不符合條件的數據解碼投影列
  • 實測效果:可節省 90%+ 的解碼開銷

元數據管理

  • metadata_size_hint 減少遠端存儲的往返次數
  • 元數據快取避免重複讀取
  • 對於大量小檔案的場景,快取收益顯著

Page Index 應用

  • 比 Row Group 更細粒度的剪枝
  • 特別適合排序數據和範圍查詢
  • 需要在寫入時啟用

明天我們將探討 Window Functions Part 1,了解視窗函數的基礎概念、OVER 子句的解析,以及 WindowExec 的執行模型。

參考資料

  1. Apache Parquet Page Index Documentation
  2. Parquet Bloom Filter Specification
  3. DataFusion Parquet Source Documentation
  4. Parquet RowFilter Documentation - RowFilter 使用說明
  5. DataFusion 原始碼:
  6. Advanced Parquet Index Example

上一篇
Day 23: 數據源整合 Part 1 - TableProvider 機制
下一篇
Day 25: Window Functions - 視窗函數執行機制深度解析
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言