iT邦幫忙

2025 iThome 鐵人賽

DAY 23
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 23

Day 23: 數據源整合 Part 1 - TableProvider 機制

  • 分享至 

  • xImage
  •  

前言

在 Day 14 我們初步認識了 TableProvider 作為數據源抽象的概念,Day 17 也看到了它在謂詞下推中的實際應用。今天我們將深入探討 TableProvider 的內部機制優化策略,了解 DataFusion 如何實現高效的數據讀取。

所有查詢的終點都需要從實際的數據源讀取數據。TableProvider 機制正是 DataFusion 靈活性的關鍵——透過統一的介面整合各種數據源,並將優化能力下推到數據源層面。

今日學習重點

  • scan() 方法的詳細工作流程與參數解析
  • ListingTable 的內部結構與統計快取機制
  • Partition Pruning 的完整實作流程
  • 多層次 Filter Pushdown 的優化策略

scan() 方法:數據讀取的核心入口

我們在 Day 14 已經認識了 TableProvider trait 的基本結構,其核心是 scan() 方法。今天我們深入探討這個方法如何實現數據源的靈活讀取和優化。

async fn scan(
    &self,
    state: &dyn Session,
    projection: Option<&Vec<usize>>,  // 投影下推
    filters: &[Expr],                  // 謂詞下推  
    limit: Option<usize>,              // 限制下推
) -> Result<Arc<dyn ExecutionPlan>>;

這個方法的設計巧妙之處在於:透過三個可選參數(projection、filters、limit),讓數據源可以在讀取數據之前就進行優化,避免不必要的 I/O 和計算。

從查詢到執行計劃的轉換流程

┌─────────────────────────────────────────────────────────────┐
│                     Physical Planner                         │
│                                                              │
│  需要從 users 表讀取數據:                                     │
│  SELECT id, name FROM users WHERE age > 18 LIMIT 100         │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
                    呼叫 scan() 方法
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                      TableProvider                           │
│                                                              │
│  參數解析:                                                    │
│  • projection: Some([0, 1])  → 只需要第0和第1列             │
│  • filters: [age > 18]       → 過濾條件                      │
│  • limit: Some(100)          → 最多返回100行                 │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
                創建對應的 ExecutionPlan
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                   返回 ExecutionPlan                         │
│                                                              │
│  例如:ParquetExec, CsvExec, DataSourceExec 等              │
└─────────────────────────────────────────────────────────────┘

參數詳解

1. projection(投影下推)

projection 參數是一個列索引的陣列,表示查詢只需要這些列。這是「Projection Pushdown」優化的體現:

// 假設表有 5 列:[id, name, age, email, address]
// SELECT name, age FROM users

projection: Some(&vec![1, 2])  // 只需要第1列(name)和第2列(age)

對於列式存儲格式(如 Parquet),這個優化非常重要。如果表有 100 列但查詢只需要 2 列,我們可以只讀取這 2 列的數據,大幅減少 I/O。

2. filters(謂詞下推)

filters 參數包含過濾條件,這是「Filter Pushdown」(謂詞下推)優化:

// SELECT * FROM users WHERE age > 18 AND city = 'Taipei'

filters: &[
    Expr::BinaryExpr(age > 18),
    Expr::BinaryExpr(city = 'Taipei')
]

不同的數據源可以用不同的方式利用這些過濾條件:

  • Parquet:利用 row group 的統計資訊跳過整個 row group
  • 資料庫:將條件轉換為 SQL WHERE 子句
  • 記憶體表:提前過濾 RecordBatch

3. limit(限制下推)

limit 參數告訴數據源最多需要多少行數據:

// SELECT * FROM users LIMIT 100

limit: Some(100)

這個參數特別適合:

  • 網路數據源:可以提早停止傳輸
  • 分區表:讀取到足夠的數據就停止
  • 流式數據:控制批次大小

ListingTable 的內部機制

在 Day 14 我們知道 ListingTable 用於讀取檔案系統中的數據檔案。現在讓我們深入它的內部結構,了解它如何實現高效的檔案掃描和優化。

核心數據結構

pub struct ListingTable {
    table_paths: Vec<ListingTableUrl>,
    file_schema: SchemaRef,              // 檔案中實際的欄位
    table_schema: SchemaRef,             // file_schema + 分區列
    options: ListingOptions,
    collected_statistics: FileStatisticsCache,  // 統計資訊快取
    constraints: Constraints,
    column_defaults: HashMap<String, Expr>,
    // ...
}

這個結構有幾個關鍵設計:

1. 雙重 Schema 設計

  • file_schema:檔案中實際存儲的欄位
  • table_schema:加上從目錄路徑解析的分區列

例如對於路徑 /data/year=2024/month=01/data.parquet,即使檔案裡沒有 yearmonth 欄位,table_schema 也會包含這些分區列。

2. 分區表支援

ListingTable 支援 Hive 風格的分區表結構:

/data/
  ├── year=2023/
  │   ├── month=01/
  │   │   ├── data1.parquet
  │   │   └── data2.parquet
  │   └── month=02/
  │       └── data3.parquet
  └── year=2024/
      └── month=01/
          └── data4.parquet

當查詢包含分區列的條件時(如 WHERE year = 2024),ListingTable 可以直接跳過不相關的分區目錄,這就是 Partition Pruning(分區剪枝),我們稍後會詳細探討。

3. 統計資訊快取機制

FileStatisticsCache 是 ListingTable 的關鍵優化:

// 快取內容示例
FileStatistics {
    num_rows: Some(1_000_000),           // 總行數
    total_byte_size: Some(45_000_000),   // 檔案大小
    column_statistics: [
        ColumnStatistics {
            null_count: Some(0),
            max_value: Some(ScalarValue::Int32(100)),
            min_value: Some(ScalarValue::Int32(1)),
            // ...
        },
        // ... 其他列的統計
    ],
}

這個快取的作用:

  • 成本估算:優化器根據行數和大小選擇執行策略(Hash Join vs Sort-Merge Join)
  • 統計剪枝:利用 min/max 值跳過不符合條件的檔案
  • 避免重複掃描:檔案元數據只讀取一次,後續查詢直接使用快取

其他內建實作簡介

MemTable:將 Vec<RecordBatch> 直接存儲在記憶體中。它的 scan() 實作非常直接,主要特點是忽略 filters 參數(因為數據已在記憶體,過濾成本低),並且支援預先定義的 sort_order 來告訴執行引擎數據已排序。

StreamingTable:用於無界流式數據源,不支援大多數下推優化(因為流式數據無法預先統計),更多作為與外部流處理系統的適配層。

Partition Pruning:分區剪枝

分區剪枝是一個非常重要的優化技術。讓我們透過一個具體的例子來理解。

剪枝原理

假設我們有一個按日期分區的銷售數據表:

/sales/
  ├── year=2023/month=01/  → 包含 100 個檔案
  ├── year=2023/month=02/  → 包含 100 個檔案
  ├── ...
  └── year=2024/month=12/  → 包含 100 個檔案

當執行查詢:

SELECT * FROM sales 
WHERE year = 2024 AND month >= 10
LIMIT 100

不使用剪枝的情況下,需要:

  1. 列出所有 24 個分區目錄(2年 × 12個月)
  2. 掃描所有 2,400 個檔案
  3. 讀取每個檔案的元數據
  4. 應用過濾條件

使用剪枝後,只需要:

  1. 根據分區條件 year = 2024 AND month >= 10 計算出符合的分區
  2. 只列出 3 個分區目錄:2024/102024/112024/12
  3. 只掃描 300 個檔案
  4. 效能提升 8 倍

剪枝實作流程

ListingTable 的剪枝邏輯大致如下:

┌─────────────────────────────────────────────────────────────┐
│ 1. 解析查詢中的分區列過濾條件                                  │
│    WHERE year = 2024 AND month >= 10                         │
│    → 提取出 partition_filters                                │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│ 2. 列舉所有分區路徑                                            │
│    掃描目錄結構,找到所有分區                                  │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│ 3. 對每個分區套用過濾條件                                      │
│    year=2023/month=01 → 不符合(year ≠ 2024)                │
│    year=2024/month=09 → 不符合(month < 10)                 │
│    year=2024/month=10 → 符合 ✓                               │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│ 4. 只對保留的分區列舉檔案                                      │
│    減少檔案系統操作,提升性能                                  │
└─────────────────────────────────────────────────────────────┘

在 DataFusion 的實作中,這個邏輯主要在 pruned_partition_list() 函數中:

pub async fn pruned_partition_list<'a>(
    ctx: &'a dyn Session,
    store: &'a dyn ObjectStore,
    table_path: &'a ListingTableUrl,
    filters: &'a [Expr],
    file_extension: &'a str,
    partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
    // 如果沒有分區列,直接列出所有檔案
    if partition_cols.is_empty() {
        return Ok(/* 列出所有檔案 */);
    }
    
    // 列舉所有分區
    let partitions = list_partitions(
        store, 
        table_path, 
        partition_cols.len(), 
        partition_prefix
    ).await?;
    
    // 套用過濾條件進行剪枝
    let pruned = prune_partitions(
        table_path, 
        partitions, 
        filters, 
        partition_cols
    ).await?;
    
    // 返回剪枝後的檔案列表
    Ok(/* stream of files from pruned partitions */)
}

Filter 和 Projection Pushdown 到數據源

除了分區剪枝,TableProvider 還可以利用 filtersprojection 參數進行更細緻的優化。

Projection Pushdown 的價值

以 Parquet 格式為例,假設我們有一個包含 50 列的用戶行為表:

-- 表結構:user_id, session_id, timestamp, event_type, ... (共50列)
SELECT user_id, event_type FROM user_events

不使用 Projection Pushdown

  1. 讀取所有 50 列的數據
  2. 在記憶體中建立完整的 RecordBatch
  3. 再透過 ProjectionExec 選出需要的 2 列
  4. 浪費了 48 列的 I/O 和記憶體

使用 Projection Pushdown

  1. TableProvider 收到 projection: Some([0, 3])
  2. 直接告訴 Parquet reader 只讀取這 2 列
  3. 節省 96% 的 I/O

Filter Pushdown 的層次

Filter Pushdown 在不同層次都可以發揮作用:

1. 邏輯層面(Optimizer)

在查詢優化階段,PushDownFilter 規則會嘗試將 Filter 下推到 TableScan:

Before:                          After:
┌─────────────┐                 ┌─────────────┐
│   Filter    │                 │  TableScan  │
│  age > 18   │                 │  filters =  │
└─────────────┘                 │  [age > 18] │
       ▲                        └─────────────┘
       │
┌─────────────┐
│  TableScan  │
└─────────────┘

2. 物理層面(ExecutionPlan)

在物理執行時,DataSourceExec 可以利用 filters:

  • 統計資訊剪枝:檢查 row group 的 min/max 統計值
  • Row group 跳過:對於 Parquet,跳過整個 row group
  • Page 跳過:利用 Page Index 跳過更細粒度的數據塊

3. 檔案格式層面

以 Parquet 為例,filter 可以在多個層次生效:

Parquet 檔案結構:
┌────────────────────────────────────────┐
│ Footer (File Metadata)                 │ ← 檢查整個檔案的統計資訊
│  - 所有 column 的 min/max              │
│  - Row group 數量                      │
└────────────────────────────────────────┘
         │
         ▼
┌────────────────────────────────────────┐
│ Row Group 1 (Metadata)                 │ ← 檢查 row group 統計
│  - 每個 column 的 min/max              │
│  - 行數:1,000,000                     │
└────────────────────────────────────────┘
         │
         ▼
┌────────────────────────────────────────┐
│ Column Chunk (age column)              │
│  ┌──────────────────────────────────┐  │
│  │ Page 1: min=18, max=25           │  │ ← 可能包含符合條件的數據
│  │ Page 2: min=5,  max=15           │  │ ← 全部 < 18,跳過!
│  │ Page 3: min=30, max=45           │  │ ← 可能包含符合條件的數據
│  └──────────────────────────────────┘  │
└────────────────────────────────────────┘

對於查詢 WHERE age > 18

  • File level:檢查整個檔案是否可能包含 age > 18 的數據
  • Row group level:跳過 max(age) ≤ 18 的 row group
  • Page level:跳過 max(age) ≤ 18 的 page
  • Row level:在讀取的數據上應用過濾條件

這種多層次的過濾大幅減少了實際讀取和處理的數據量。

supports_filters_pushdown 方法

TableProvider 可以透過 supports_filters_pushdown 方法告訴優化器哪些過濾條件可以下推:

fn supports_filters_pushdown(
    &self,
    filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
    filters.iter().map(|filter| {
        // 檢查這個過濾條件是否可以下推
        if can_pushdown(filter) {
            TableProviderFilterPushDown::Exact  // 精確過濾
        } else {
            TableProviderFilterPushDown::Unsupported  // 不支援
        }
    }).collect()
}

返回值有三種可能:

  • Exact:數據源可以精確執行這個過濾,DataFusion 不需要再過濾
  • Inexact:數據源可以部分過濾,但 DataFusion 還需要再次過濾(例如布隆過濾器)
  • Unsupported:數據源不支援這個過濾,必須由 DataFusion 執行

小結

今天我們深入探討了 TableProvider 的實作細節和優化機制:

實作機制

  • scan() 方法的三個關鍵參數:projection、filters、limit 實現了下推優化的基礎
  • ListingTable 的內部結構:雙重 Schema 設計、統計資訊快取、分區支援
  • Partition Pruning 的完整流程:從解析條件、列舉分區、套用過濾到返回檔案列表

多層次優化策略

  • 邏輯層:優化器將 Filter 下推到 TableScan
  • 物理層:ExecutionPlan 利用統計資訊進行剪枝
  • 格式層:Parquet 在 file/row group/page 層次過濾數據

明天我們將探討 Parquet 讀取優化的細節,深入了解列式存儲格式如何在多個層次實現過濾和投影優化。

參考資料

  1. DataFusion TableProvider 官方文檔
  2. Custom Table Providers 使用指南
  3. Apache Parquet 格式規範
  4. Filter Pushdown 優化文件
  5. DataFusion 原始碼:

上一篇
Day 22: Join 算子 Part 2 - Sort-Merge Join 和策略選擇
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言