iT邦幫忙

2025 iThome 鐵人賽

DAY 18
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 18

Day 18: 基礎執行算子 - Projection 和 Filter

  • 分享至 

  • xImage
  •  

前言

在昨天的文章中,我們探討了 ExecutionPlan 的 Stream 執行模型,了解了 SendableRecordBatchStream 如何透過 Pull-based Volcano 模型實現高效的數據流動。今天我們要深入探討兩個最基礎但也最重要的執行算子:ProjectionExecFilterExec

這兩個算子雖然功能簡單,卻是幾乎所有查詢都會用到的核心元件:

  • ProjectionExec:負責選擇和計算需要的欄位(對應 SQL 的 SELECT 子句)
  • FilterExec:負責根據條件過濾資料列(對應 SQL 的 WHERE 子句)

這些基礎算子的理解,將為後續學習更複雜的聚合和 Join 算子建立基礎。

ProjectionExec - 表達式求值與欄位投影

核心概念與設計

ProjectionExec 的職責看似簡單:為輸入的每一行計算一組表達式,產生輸出行。但這個「計算表達式」的過程卻隱藏許多細節。

讓我們從一個簡單的 SQL 查詢來理解:

SELECT 
    id,
    price * quantity as total,
    upper(name) as product_name
FROM orders;

這個查詢包含三種投影類型:

  1. 簡單列引用id 直接引用輸入的欄位
  2. 計算表達式price * quantity 需要進行數學運算
  3. 函數調用upper(name) 需要執行字串函數

ProjectionExec 的設計必須能夠統一處理這些不同類型的表達式。

核心資料結構

// datafusion/physical-plan/src/projection.rs
pub struct ProjectionExec {
    /// 投影表達式列表(表達式 + 輸出欄位名稱)
    pub(crate) expr: Vec<ProjectionExpr>,
    /// 輸出 Schema
    schema: SchemaRef,
    /// 輸入執行計劃
    input: Arc<dyn ExecutionPlan>,
    /// 執行指標
    metrics: ExecutionPlanMetricsSet,
    /// 快取的計劃屬性
    cache: PlanProperties,
}

pub struct ProjectionExpr {
    /// 物理表達式
    pub expr: Arc<dyn PhysicalExpr>,
    /// 輸出欄位的別名
    pub alias: String,
}

關鍵設計點:

  • expr 列表:每個元素對應輸出 Schema 中的一個欄位
  • PhysicalExpr trait:統一的表達式介面,無論是簡單的列引用還是複雜的函數調用
  • 預計算 Schema:在創建 ProjectionExec 時就確定輸出 Schema,而不是在執行時動態推導

Schema 轉換流程

ProjectionExec 在創建時會進行 Schema 轉換:

pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
where
    I: IntoIterator<Item = E>,
    E: Into<ProjectionExpr>,
{
    let input_schema = input.schema();
    let expr = expr.into_iter().map(Into::into).collect::<Vec<_>>();

    // 為每個表達式計算輸出 Field
    let fields: Result<Vec<Field>> = expr
        .iter()
        .map(|proj_expr| {
            // 獲取表達式的資料類型
            let data_type = proj_expr.expr.data_type(&input_schema)?;
            // 判斷是否可為 null
            let nullable = proj_expr.expr.nullable(&input_schema)?;
            
            Ok(Field::new(&proj_expr.alias, data_type, nullable))
        })
        .collect();

    // 構建輸出 Schema
    let schema = Arc::new(Schema::new(fields?));
    
    // ... 創建 ProjectionExec
}

這個過程展示了幾個重要特性:

  1. 型別推導:通過 data_type() 方法,根據輸入 Schema 推導輸出型別
  2. 可空性分析:通過 nullable() 方法判斷結果是否可能為 null
  3. 提前驗證:在查詢執行前就發現型別錯誤,而不是運行時才報錯

表達式求值過程

ProjectionExec 的核心邏輯在 ProjectionStream 中實現:

struct ProjectionStream {
    schema: SchemaRef,
    expr: Vec<Arc<dyn PhysicalExpr>>,
    input: SendableRecordBatchStream,
    baseline_metrics: BaselineMetrics,
}

impl ProjectionStream {
    fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
        // 記錄計算時間
        let _timer = self.baseline_metrics.elapsed_compute().timer();
        
        // 對每個表達式求值
        let arrays = self
            .expr
            .iter()
            .map(|expr| {
                // 1. 對整個 batch 求值
                expr.evaluate(batch)
                    // 2. 將結果轉換為 Array
                    .and_then(|v| v.into_array(batch.num_rows()))
            })
            .collect::<Result<Vec<_>>>()?;

        // 3. 用新的 arrays 和 schema 構建輸出 RecordBatch
        RecordBatch::try_new(Arc::clone(&self.schema), arrays)
            .map_err(Into::into)
    }
}

這裡的關鍵是 向量化求值expr.evaluate(batch) 一次性對整個 RecordBatch(通常包含數千行)進行計算,而不是逐行計算。這是 Arrow 和 DataFusion 高效能的核心所在。

實際案例:追蹤表達式求值

讓我們通過一個實例來理解:

use datafusion::prelude::*;
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();

    // 創建測試數據
    ctx.sql("CREATE TABLE products (id INT, price DOUBLE, quantity INT)")
        .await?;
    
    // 執行投影查詢
    let df = ctx
        .sql("SELECT id, price * quantity as total FROM products")
        .await?;

    // 查看物理計劃
    let physical_plan = df.create_physical_plan().await?;
    println!("{}", displayable(physical_plan.as_ref()).indent(true));

    Ok(())
}

輸出的計劃會顯示:

ProjectionExec: expr=[id@0 as id, price@1 * quantity@2 as total]
  TableScan: products

這裡 ProjectionExec 包含兩個表達式:

  1. id@0:簡單的列引用,@0 表示輸入 Schema 的第 0 個欄位
  2. price@1 * quantity@2:二元運算表達式,涉及兩個欄位的乘法

性能優化特性

ProjectionExec 有一個重要的優化判斷邏輯:

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
    let all_simple_exprs = self.expr.iter().all(|proj_expr| {
        proj_expr.expr.as_any().is::<Column>()
            || proj_expr.expr.as_any().is::<Literal>()
    });
    
    // 如果只是列引用或常量,不需要重新分區
    vec![!all_simple_exprs]
}

這個方法告訴優化器:

  • 如果投影只是簡單的列重排或重命名(如 SELECT id, name FROM t),不需要並行化處理
  • 如果包含複雜計算(如 SELECT price * quantity FROM t),可以從並行化中獲益

FilterExec - 高效的條件過濾

核心概念

FilterExec 的任務是根據一個布林謂詞(predicate)過濾輸入的資料列。只有謂詞評估為 true 的行才會被保留在輸出中。

對應的 SQL 查詢:

SELECT * FROM orders WHERE price > 100 AND status = 'completed';

這裡的 WHERE price > 100 AND status = 'completed' 就會被轉換為 FilterExec 的謂詞。

資料結構設計

// datafusion/physical-plan/src/filter.rs
pub struct FilterExec {
    /// 過濾謂詞(必須評估為布林值)
    predicate: Arc<dyn PhysicalExpr>,
    /// 輸入執行計劃
    input: Arc<dyn ExecutionPlan>,
    /// 執行指標
    metrics: ExecutionPlanMetricsSet,
    /// 預設選擇性(用於統計估算)
    default_selectivity: u8,
    /// 快取屬性
    cache: PlanProperties,
}

選擇性(Selectivity) 是一個重要概念:

  • 表示過濾後保留的資料比例
  • 範圍是 0-100,其中 20 表示預期保留 20% 的資料
  • 用於統計資訊估算,幫助優化器做出更好的決策

過濾執行流程

FilterExec 的核心邏輯相當直接:

struct FilterExecStream {
    schema: SchemaRef,
    predicate: Arc<dyn PhysicalExpr>,
    input: SendableRecordBatchStream,
    baseline_metrics: BaselineMetrics,
}

fn filter_and_project(
    batch: &RecordBatch,
    predicate: &Arc<dyn PhysicalExpr>,
) -> Result<RecordBatch> {
    // 1. 評估謂詞表達式,得到布林陣列
    let filter_array = predicate
        .evaluate(batch)?
        .into_array(batch.num_rows())?;
    
    // 2. 轉換為 BooleanArray
    let filter_array = as_boolean_array(&filter_array)?;
    
    // 3. 使用 Arrow 的 filter 函數過濾 RecordBatch
    filter_record_batch(batch, filter_array)
}

過濾過程分為三步:

1. 謂詞評估

predicate.evaluate(batch)?

這會對整個 RecordBatch 求值,返回一個 ColumnarValue。例如:

  • 輸入:1000 行的 RecordBatch
  • 謂詞:price > 100
  • 輸出:1000 個布林值的陣列,[true, false, true, true, ...]

2. 型別轉換

.into_array(batch.num_rows())?

ColumnarValue 轉換為具體的 Arrow Array,並確保長度正確。

3. 應用過濾

filter_record_batch(batch, filter_array)?

Arrow 的 filter 函數會根據布林陣列,只保留對應位置為 true 的行。這是一個高度優化的操作,利用了 SIMD 指令集。

Stream 實現的關鍵細節

FilterExec 的 Stream 實現有一個重要的優化:

impl Stream for FilterExecStream {
    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<RecordBatch>>> {
        loop {
            match ready!(self.input.poll_next_unpin(cx)) {
                Some(Ok(batch)) => {
                    let timer = self.baseline_metrics.elapsed_compute().timer();
                    let filtered_batch = filter_and_project(&batch, &self.predicate)?;
                    timer.done();
                    
                    // 關鍵優化:跳過空的 batch
                    if filtered_batch.num_rows() == 0 {
                        continue;  // 繼續處理下一個 batch
                    }
                    
                    return Poll::Ready(Some(Ok(filtered_batch)));
                }
                value => return Poll::Ready(value),
            }
        }
    }
}

跳過空 Batch 的設計

  • 如果一個 batch 的所有行都被過濾掉了(num_rows() == 0),不返回給下游
  • 直接繼續處理下一個輸入 batch(continue
  • 這避免了下游算子處理空 batch 的開銷

這個設計在選擇性很低的查詢中特別重要。例如 WHERE user_id = 12345 可能會過濾掉 99.9% 的資料,產生大量空 batch。

謂詞下推的威力

FilterExec 的一個重要特性是支援 謂詞下推(Predicate Pushdown)。雖然具體的優化邏輯在 Optimizer 中,但 FilterExec 的設計使其成為可能。

考慮這個查詢:

SELECT * FROM parquet_file WHERE date = '2024-01-01';

物理計劃可能是:

FilterExec: date = '2024-01-01'
  ParquetExec: file=data.parquet

但經過優化後:

ParquetExec: file=data.parquet, predicate_pushdown=[date = '2024-01-01']

過濾條件被下推到 ParquetExec,可以利用 Parquet 的:

  • Row Group Statistics:直接跳過整個 Row Group
  • Page Index:在 Page 層級過濾
  • Dictionary Encoding:加速等值比較

這可能將掃描的資料量從 GB 級別降到 MB 級別。

CoalesceBatchesExec - 批次合併優化

為什麼需要 Batch 合併?

FilterExec 有一個潛在的效能問題:它可能產生很多小的 RecordBatch

考慮這個場景:

  • 輸入:每個 batch 有 8192 行(DataFusion 的預設 batch size)
  • 過濾條件:選擇性 5%(只保留 5% 的資料)
  • 輸出:每個 batch 約 410 行

小 batch 會導致:

  1. 向量化效率降低:SIMD 指令無法充分利用
  2. 函數調用開銷增加:處理 1000 個小 batch 比處理 100 個大 batch 開銷更大
  3. 快取局部性變差:頻繁的上下文切換

CoalesceBatchesExec 的設計

pub struct CoalesceBatchesExec {
    /// 輸入執行計劃
    input: Arc<dyn ExecutionPlan>,
    /// 目標 batch 大小(行數)
    target_batch_size: usize,
    /// 最大獲取行數(用於 LIMIT 優化)
    fetch: Option<usize>,
    metrics: ExecutionPlanMetricsSet,
    cache: PlanProperties,
}

其核心邏輯在 BatchCoalescer 中:

pub struct BatchCoalescer {
    /// 目標行數
    target_batch_size: usize,
    /// 緩衝的 batches
    buffer: Vec<RecordBatch>,
    /// 當前緩衝的總行數
    buffered_rows: usize,
    /// 限制(如果有 LIMIT 子句)
    fetch: Option<usize>,
}

工作流程

輸入 Batch:    [410行] [395行] [420行] [405行] [430行] ...
                  |       |       |       |       |
                  v       v       v       v       v
緩衝與合併:    [410] -> [805] -> [1225] -> [1630] -> [2060]
                                    |                   |
                                    v                   v
輸出 Batch:                     [1225行]           [2060行] ...

當緩衝的行數達到或超過 target_batch_size 時,使用 Arrow 的 concat_batches 將多個小 batch 合併成一個大 batch。

自動插入機制

DataFusion 的物理優化器會自動在可能產生小 batch 的算子後插入 CoalesceBatchesExec:

// datafusion/physical-optimizer/src/coalesce_batches.rs
fn optimize(&self, plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
    plan.transform_up(|plan| {
        let wrap_in_coalesce = 
            plan.as_any().is::<FilterExec>() ||      // Filter 可能產生小 batch
            plan.as_any().is::<HashJoinExec>() ||    // Join 可能產生小 batch
            // ...
        
        if wrap_in_coalesce {
            Ok(Transformed::yes(Arc::new(
                CoalesceBatchesExec::new(plan, target_batch_size)
            )))
        } else {
            Ok(Transformed::no(plan))
        }
    })
}

這意味著你在大多數情況下不需要手動處理 batch 大小問題,優化器會自動處理。

向量化執行的初步認識

什麼是向量化執行?

傳統的查詢引擎採用 tuple-at-a-time 模型,每次處理一行:

// 傳統方式(偽代碼)
for row in input_rows {
    if predicate(row) {  // 每行一次函數調用
        output.push(project(row));  // 每行一次計算
    }
}

向量化執行採用 batch-at-a-time 模型,一次處理一批行:

// 向量化方式
for batch in input_batches {  // batch 通常包含數千行
    let filter_mask = predicate(batch);  // 一次評估數千行
    let filtered = apply_filter(batch, filter_mask);  // 一次過濾數千行
    let projected = project(filtered);  // 一次投影數千行
    output.push(projected);
}

向量化的效能優勢

1. CPU 快取友好

傳統方式:
Row 1 -> [Load] [Compute] [Store]
Row 2 -> [Load] [Compute] [Store]  // 頻繁的記憶體訪問
...

向量化:
Batch -> [Load Block] [Compute Block] [Store Block]  // 連續記憶體訪問

2. SIMD 指令利用

現代 CPU 的 SIMD(Single Instruction Multiple Data)指令可以一條指令處理多個資料:

標量操作:ADD r1, r2    // 一次加法
SIMD 操作:VPADD v1, v2  // 一次 4-8 個加法(取決於資料型別)

Arrow 的計算核心大量使用 SIMD,例如在過濾操作中:

// Arrow 內部實現(簡化)
fn filter_primitive<T>(values: &[T], mask: &[bool]) -> Vec<T> {
    // 使用 SIMD 指令一次處理 8 個或更多元素
    values.iter()
        .zip(mask.iter())
        .filter_map(|(v, &m)| if m { Some(*v) } else { None })
        .collect()
}

3. 分支預測優化

批次處理減少了分支判斷的次數:

傳統方式:每行一次 if 判斷,CPU 分支預測器效率低
向量化:  批次級別的操作,分支預測更準確

DataFusion 的向量化實踐

讓我們看一個具體例子:

// 表達式:price * quantity
pub struct BinaryExpr {
    left: Arc<dyn PhysicalExpr>,   // price
    op: Operator,                   // *
    right: Arc<dyn PhysicalExpr>,  // quantity
}

impl PhysicalExpr for BinaryExpr {
    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
        // 1. 遞迴求值左右子表達式(向量化)
        let left_value = self.left.evaluate(batch)?;
        let right_value = self.right.evaluate(batch)?;
        
        // 2. 一次性對整個陣列執行運算(向量化)
        let result = match self.op {
            Operator::Multiply => multiply(&left_value, &right_value)?,
            // ...
        };
        
        Ok(ColumnarValue::Array(result))
    }
}

multiply(&left_value, &right_value) 會調用 Arrow 的向量化乘法,一次處理數千個值。

向量化的代價

向量化並非沒有代價:

  1. 記憶體開銷:需要緩衝整個 batch 的資料
  2. 延遲增加:必須等待足夠的資料累積成一個 batch
  3. 實現複雜度:向量化的程式碼比標量程式碼複雜

但在分析型查詢(OLAP)中,吞吐量(throughput)通常比延遲(latency)更重要,因此向量化的優勢遠超其代價。

整合範例:組合 Projection 和 Filter

讓我們通過一個完整的例子來整合今天學到的知識:

use datafusion::prelude::*;
use datafusion::arrow::array::{Int32Array, Float64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();

    // 1. 創建測試數據
    let schema = Arc::new(Schema::new(vec![
        Field::new("product_id", DataType::Int32, false),
        Field::new("product_name", DataType::Utf8, false),
        Field::new("price", DataType::Float64, false),
        Field::new("quantity", DataType::Int32, false),
    ]));

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
            Arc::new(StringArray::from(vec!["Apple", "Banana", "Orange", "Grape", "Mango"])),
            Arc::new(Float64Array::from(vec![1.5, 0.8, 2.0, 3.5, 2.5])),
            Arc::new(Int32Array::from(vec![100, 200, 50, 30, 80])),
        ],
    )?;

    // 2. 註冊為表
    ctx.register_batch("products", batch)?;

    // 3. 執行包含 Filter 和 Projection 的查詢
    let df = ctx.sql(
        "SELECT 
            product_name,
            price,
            quantity,
            price * quantity as total_value
         FROM products
         WHERE price > 1.0 AND quantity > 50
         ORDER BY total_value DESC"
    ).await?;

    // 4. 查看物理計劃
    let physical_plan = df.clone().create_physical_plan().await?;
    println!("Physical Plan:");
    println!("{}\n", displayable(physical_plan.as_ref()).indent(true));

    // 5. 執行並顯示結果
    println!("Results:");
    df.show().await?;

    Ok(())
}

輸出的物理計劃(簡化版):

SortExec: expr=[total_value@3 DESC]
  ProjectionExec: expr=[product_name@1, price@2, quantity@3, price@2 * quantity@3 as total_value]
    CoalesceBatchesExec: target_batch_size=8192
      FilterExec: price@2 > 1.0 AND quantity@3 > 50
        MemoryExec: partitions=1

執行流程說明:

  1. MemoryExec:從記憶體讀取 RecordBatch
  2. FilterExec:過濾 price > 1.0 AND quantity > 50(保留 4 行)
  3. CoalesceBatchesExec:合併可能的小 batch(本例資料少,實際不會觸發)
  4. ProjectionExec:計算 price * quantity,並選擇需要的欄位
  5. SortExec:按 total_value 降序排序

結果:

+--------------+-------+----------+-------------+
| product_name | price | quantity | total_value |
+--------------+-------+----------+-------------+
| Grape        | 3.5   | 30       | 105.0       |
| Mango        | 2.5   | 80       | 200.0       |
| Orange       | 2.0   | 50       | 100.0       |
| Apple        | 1.5   | 100      | 150.0       |
+--------------+-------+----------+-------------+

小結

今天我們深入探討了 DataFusion 中兩個最基礎的執行算子:

ProjectionExec 的核心要點

  1. 統一的表達式介面:透過 PhysicalExpr trait 統一處理列引用、運算和函數
  2. 預先 Schema 推導:在執行前確定輸出結構,實現提前驗證
  3. 向量化求值:一次處理整個 RecordBatch,充分利用 SIMD 和快取
  4. 智能優化判斷:根據表達式複雜度決定是否需要並行化

FilterExec 的核心要點

  1. 三步過濾流程:謂詞評估 → 型別轉換 → 應用過濾
  2. 空 Batch 優化:自動跳過完全被過濾的 batch,減少下游開銷
  3. 選擇性估算:為優化器提供統計資訊,輔助決策
  4. 謂詞下推支援:設計上支援將過濾條件下推到資料源

CoalesceBatchesExec 的角色

  1. 解決小 Batch 問題:將過濾等操作產生的小 batch 合併
  2. 提升向量化效率:確保下游算子處理足夠大的資料塊
  3. 自動插入機制:優化器自動在需要的位置插入,無需手動處理

向量化執行的認識

  1. 批次處理模型:一次處理數千行,而非逐行處理
  2. 硬體友好:充分利用 CPU 快取、SIMD 指令和分支預測
  3. Arrow 的支撐:列式記憶體格式是向量化執行的基礎

這些基礎算子雖然簡單,卻展現了 DataFusion 的核心設計哲學:將複雜的操作分解為簡單、可組合的算子,透過向量化執行實現高效能

明天我們將進入更複雜的領域:聚合算子(AggregateExec)。聚合涉及狀態管理、多階段執行和記憶體優化,是理解 DataFusion 執行引擎的進階主題。我們將探討 GROUP BYSUMAVG 等聚合操作的內部實現機制。

參考資料

  1. DataFusion ProjectionExec 原始碼
  2. DataFusion FilterExec 原始碼
  3. DataFusion CoalesceBatchesExec 原始碼
  4. Apache Arrow Compute Kernels
  5. MonetDB/X100: Hyper-Pipelining Query Execution

上一篇
Day 17: ExecutionPlan 體系架構 Part 2 - Stream 執行模型
下一篇
Day 19: 聚合算子 Part 1 - Aggregate 執行流程
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言