在昨天的文章中,我們探討了 ExecutionPlan 的 Stream 執行模型,了解了 SendableRecordBatchStream
如何透過 Pull-based Volcano 模型實現高效的數據流動。今天我們要深入探討兩個最基礎但也最重要的執行算子:ProjectionExec 和 FilterExec。
這兩個算子雖然功能簡單,卻是幾乎所有查詢都會用到的核心元件:
這些基礎算子的理解,將為後續學習更複雜的聚合和 Join 算子建立基礎。
ProjectionExec 的職責看似簡單:為輸入的每一行計算一組表達式,產生輸出行。但這個「計算表達式」的過程卻隱藏許多細節。
讓我們從一個簡單的 SQL 查詢來理解:
SELECT
id,
price * quantity as total,
upper(name) as product_name
FROM orders;
這個查詢包含三種投影類型:
id
直接引用輸入的欄位price * quantity
需要進行數學運算upper(name)
需要執行字串函數ProjectionExec 的設計必須能夠統一處理這些不同類型的表達式。
// datafusion/physical-plan/src/projection.rs
pub struct ProjectionExec {
/// 投影表達式列表(表達式 + 輸出欄位名稱)
pub(crate) expr: Vec<ProjectionExpr>,
/// 輸出 Schema
schema: SchemaRef,
/// 輸入執行計劃
input: Arc<dyn ExecutionPlan>,
/// 執行指標
metrics: ExecutionPlanMetricsSet,
/// 快取的計劃屬性
cache: PlanProperties,
}
pub struct ProjectionExpr {
/// 物理表達式
pub expr: Arc<dyn PhysicalExpr>,
/// 輸出欄位的別名
pub alias: String,
}
關鍵設計點:
ProjectionExec 在創建時會進行 Schema 轉換:
pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
where
I: IntoIterator<Item = E>,
E: Into<ProjectionExpr>,
{
let input_schema = input.schema();
let expr = expr.into_iter().map(Into::into).collect::<Vec<_>>();
// 為每個表達式計算輸出 Field
let fields: Result<Vec<Field>> = expr
.iter()
.map(|proj_expr| {
// 獲取表達式的資料類型
let data_type = proj_expr.expr.data_type(&input_schema)?;
// 判斷是否可為 null
let nullable = proj_expr.expr.nullable(&input_schema)?;
Ok(Field::new(&proj_expr.alias, data_type, nullable))
})
.collect();
// 構建輸出 Schema
let schema = Arc::new(Schema::new(fields?));
// ... 創建 ProjectionExec
}
這個過程展示了幾個重要特性:
data_type()
方法,根據輸入 Schema 推導輸出型別nullable()
方法判斷結果是否可能為 nullProjectionExec 的核心邏輯在 ProjectionStream
中實現:
struct ProjectionStream {
schema: SchemaRef,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
}
impl ProjectionStream {
fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
// 記錄計算時間
let _timer = self.baseline_metrics.elapsed_compute().timer();
// 對每個表達式求值
let arrays = self
.expr
.iter()
.map(|expr| {
// 1. 對整個 batch 求值
expr.evaluate(batch)
// 2. 將結果轉換為 Array
.and_then(|v| v.into_array(batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
// 3. 用新的 arrays 和 schema 構建輸出 RecordBatch
RecordBatch::try_new(Arc::clone(&self.schema), arrays)
.map_err(Into::into)
}
}
這裡的關鍵是 向量化求值:expr.evaluate(batch)
一次性對整個 RecordBatch
(通常包含數千行)進行計算,而不是逐行計算。這是 Arrow 和 DataFusion 高效能的核心所在。
讓我們通過一個實例來理解:
use datafusion::prelude::*;
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::record_batch::RecordBatch;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// 創建測試數據
ctx.sql("CREATE TABLE products (id INT, price DOUBLE, quantity INT)")
.await?;
// 執行投影查詢
let df = ctx
.sql("SELECT id, price * quantity as total FROM products")
.await?;
// 查看物理計劃
let physical_plan = df.create_physical_plan().await?;
println!("{}", displayable(physical_plan.as_ref()).indent(true));
Ok(())
}
輸出的計劃會顯示:
ProjectionExec: expr=[id@0 as id, price@1 * quantity@2 as total]
TableScan: products
這裡 ProjectionExec
包含兩個表達式:
id@0
:簡單的列引用,@0
表示輸入 Schema 的第 0 個欄位price@1 * quantity@2
:二元運算表達式,涉及兩個欄位的乘法ProjectionExec 有一個重要的優化判斷邏輯:
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
let all_simple_exprs = self.expr.iter().all(|proj_expr| {
proj_expr.expr.as_any().is::<Column>()
|| proj_expr.expr.as_any().is::<Literal>()
});
// 如果只是列引用或常量,不需要重新分區
vec![!all_simple_exprs]
}
這個方法告訴優化器:
SELECT id, name FROM t
),不需要並行化處理SELECT price * quantity FROM t
),可以從並行化中獲益FilterExec 的任務是根據一個布林謂詞(predicate)過濾輸入的資料列。只有謂詞評估為 true
的行才會被保留在輸出中。
對應的 SQL 查詢:
SELECT * FROM orders WHERE price > 100 AND status = 'completed';
這裡的 WHERE price > 100 AND status = 'completed'
就會被轉換為 FilterExec 的謂詞。
// datafusion/physical-plan/src/filter.rs
pub struct FilterExec {
/// 過濾謂詞(必須評估為布林值)
predicate: Arc<dyn PhysicalExpr>,
/// 輸入執行計劃
input: Arc<dyn ExecutionPlan>,
/// 執行指標
metrics: ExecutionPlanMetricsSet,
/// 預設選擇性(用於統計估算)
default_selectivity: u8,
/// 快取屬性
cache: PlanProperties,
}
選擇性(Selectivity) 是一個重要概念:
FilterExec 的核心邏輯相當直接:
struct FilterExecStream {
schema: SchemaRef,
predicate: Arc<dyn PhysicalExpr>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
}
fn filter_and_project(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
) -> Result<RecordBatch> {
// 1. 評估謂詞表達式,得到布林陣列
let filter_array = predicate
.evaluate(batch)?
.into_array(batch.num_rows())?;
// 2. 轉換為 BooleanArray
let filter_array = as_boolean_array(&filter_array)?;
// 3. 使用 Arrow 的 filter 函數過濾 RecordBatch
filter_record_batch(batch, filter_array)
}
過濾過程分為三步:
1. 謂詞評估
predicate.evaluate(batch)?
這會對整個 RecordBatch 求值,返回一個 ColumnarValue
。例如:
price > 100
[true, false, true, true, ...]
2. 型別轉換
.into_array(batch.num_rows())?
將 ColumnarValue
轉換為具體的 Arrow Array
,並確保長度正確。
3. 應用過濾
filter_record_batch(batch, filter_array)?
Arrow 的 filter
函數會根據布林陣列,只保留對應位置為 true
的行。這是一個高度優化的操作,利用了 SIMD 指令集。
FilterExec 的 Stream 實現有一個重要的優化:
impl Stream for FilterExecStream {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(&batch, &self.predicate)?;
timer.done();
// 關鍵優化:跳過空的 batch
if filtered_batch.num_rows() == 0 {
continue; // 繼續處理下一個 batch
}
return Poll::Ready(Some(Ok(filtered_batch)));
}
value => return Poll::Ready(value),
}
}
}
}
跳過空 Batch 的設計:
num_rows() == 0
),不返回給下游continue
)這個設計在選擇性很低的查詢中特別重要。例如 WHERE user_id = 12345
可能會過濾掉 99.9% 的資料,產生大量空 batch。
FilterExec 的一個重要特性是支援 謂詞下推(Predicate Pushdown)。雖然具體的優化邏輯在 Optimizer 中,但 FilterExec 的設計使其成為可能。
考慮這個查詢:
SELECT * FROM parquet_file WHERE date = '2024-01-01';
物理計劃可能是:
FilterExec: date = '2024-01-01'
ParquetExec: file=data.parquet
但經過優化後:
ParquetExec: file=data.parquet, predicate_pushdown=[date = '2024-01-01']
過濾條件被下推到 ParquetExec,可以利用 Parquet 的:
這可能將掃描的資料量從 GB 級別降到 MB 級別。
FilterExec 有一個潛在的效能問題:它可能產生很多小的 RecordBatch。
考慮這個場景:
小 batch 會導致:
pub struct CoalesceBatchesExec {
/// 輸入執行計劃
input: Arc<dyn ExecutionPlan>,
/// 目標 batch 大小(行數)
target_batch_size: usize,
/// 最大獲取行數(用於 LIMIT 優化)
fetch: Option<usize>,
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
}
其核心邏輯在 BatchCoalescer
中:
pub struct BatchCoalescer {
/// 目標行數
target_batch_size: usize,
/// 緩衝的 batches
buffer: Vec<RecordBatch>,
/// 當前緩衝的總行數
buffered_rows: usize,
/// 限制(如果有 LIMIT 子句)
fetch: Option<usize>,
}
工作流程:
輸入 Batch: [410行] [395行] [420行] [405行] [430行] ...
| | | | |
v v v v v
緩衝與合併: [410] -> [805] -> [1225] -> [1630] -> [2060]
| |
v v
輸出 Batch: [1225行] [2060行] ...
當緩衝的行數達到或超過 target_batch_size
時,使用 Arrow 的 concat_batches
將多個小 batch 合併成一個大 batch。
DataFusion 的物理優化器會自動在可能產生小 batch 的算子後插入 CoalesceBatchesExec:
// datafusion/physical-optimizer/src/coalesce_batches.rs
fn optimize(&self, plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
let wrap_in_coalesce =
plan.as_any().is::<FilterExec>() || // Filter 可能產生小 batch
plan.as_any().is::<HashJoinExec>() || // Join 可能產生小 batch
// ...
if wrap_in_coalesce {
Ok(Transformed::yes(Arc::new(
CoalesceBatchesExec::new(plan, target_batch_size)
)))
} else {
Ok(Transformed::no(plan))
}
})
}
這意味著你在大多數情況下不需要手動處理 batch 大小問題,優化器會自動處理。
傳統的查詢引擎採用 tuple-at-a-time 模型,每次處理一行:
// 傳統方式(偽代碼)
for row in input_rows {
if predicate(row) { // 每行一次函數調用
output.push(project(row)); // 每行一次計算
}
}
向量化執行採用 batch-at-a-time 模型,一次處理一批行:
// 向量化方式
for batch in input_batches { // batch 通常包含數千行
let filter_mask = predicate(batch); // 一次評估數千行
let filtered = apply_filter(batch, filter_mask); // 一次過濾數千行
let projected = project(filtered); // 一次投影數千行
output.push(projected);
}
1. CPU 快取友好
傳統方式:
Row 1 -> [Load] [Compute] [Store]
Row 2 -> [Load] [Compute] [Store] // 頻繁的記憶體訪問
...
向量化:
Batch -> [Load Block] [Compute Block] [Store Block] // 連續記憶體訪問
2. SIMD 指令利用
現代 CPU 的 SIMD(Single Instruction Multiple Data)指令可以一條指令處理多個資料:
標量操作:ADD r1, r2 // 一次加法
SIMD 操作:VPADD v1, v2 // 一次 4-8 個加法(取決於資料型別)
Arrow 的計算核心大量使用 SIMD,例如在過濾操作中:
// Arrow 內部實現(簡化)
fn filter_primitive<T>(values: &[T], mask: &[bool]) -> Vec<T> {
// 使用 SIMD 指令一次處理 8 個或更多元素
values.iter()
.zip(mask.iter())
.filter_map(|(v, &m)| if m { Some(*v) } else { None })
.collect()
}
3. 分支預測優化
批次處理減少了分支判斷的次數:
傳統方式:每行一次 if 判斷,CPU 分支預測器效率低
向量化: 批次級別的操作,分支預測更準確
讓我們看一個具體例子:
// 表達式:price * quantity
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>, // price
op: Operator, // *
right: Arc<dyn PhysicalExpr>, // quantity
}
impl PhysicalExpr for BinaryExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
// 1. 遞迴求值左右子表達式(向量化)
let left_value = self.left.evaluate(batch)?;
let right_value = self.right.evaluate(batch)?;
// 2. 一次性對整個陣列執行運算(向量化)
let result = match self.op {
Operator::Multiply => multiply(&left_value, &right_value)?,
// ...
};
Ok(ColumnarValue::Array(result))
}
}
multiply(&left_value, &right_value)
會調用 Arrow 的向量化乘法,一次處理數千個值。
向量化並非沒有代價:
但在分析型查詢(OLAP)中,吞吐量(throughput)通常比延遲(latency)更重要,因此向量化的優勢遠超其代價。
讓我們通過一個完整的例子來整合今天學到的知識:
use datafusion::prelude::*;
use datafusion::arrow::array::{Int32Array, Float64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// 1. 創建測試數據
let schema = Arc::new(Schema::new(vec![
Field::new("product_id", DataType::Int32, false),
Field::new("product_name", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["Apple", "Banana", "Orange", "Grape", "Mango"])),
Arc::new(Float64Array::from(vec![1.5, 0.8, 2.0, 3.5, 2.5])),
Arc::new(Int32Array::from(vec![100, 200, 50, 30, 80])),
],
)?;
// 2. 註冊為表
ctx.register_batch("products", batch)?;
// 3. 執行包含 Filter 和 Projection 的查詢
let df = ctx.sql(
"SELECT
product_name,
price,
quantity,
price * quantity as total_value
FROM products
WHERE price > 1.0 AND quantity > 50
ORDER BY total_value DESC"
).await?;
// 4. 查看物理計劃
let physical_plan = df.clone().create_physical_plan().await?;
println!("Physical Plan:");
println!("{}\n", displayable(physical_plan.as_ref()).indent(true));
// 5. 執行並顯示結果
println!("Results:");
df.show().await?;
Ok(())
}
輸出的物理計劃(簡化版):
SortExec: expr=[total_value@3 DESC]
ProjectionExec: expr=[product_name@1, price@2, quantity@3, price@2 * quantity@3 as total_value]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: price@2 > 1.0 AND quantity@3 > 50
MemoryExec: partitions=1
執行流程說明:
price > 1.0 AND quantity > 50
(保留 4 行)price * quantity
,並選擇需要的欄位total_value
降序排序結果:
+--------------+-------+----------+-------------+
| product_name | price | quantity | total_value |
+--------------+-------+----------+-------------+
| Grape | 3.5 | 30 | 105.0 |
| Mango | 2.5 | 80 | 200.0 |
| Orange | 2.0 | 50 | 100.0 |
| Apple | 1.5 | 100 | 150.0 |
+--------------+-------+----------+-------------+
今天我們深入探討了 DataFusion 中兩個最基礎的執行算子:
PhysicalExpr
trait 統一處理列引用、運算和函數這些基礎算子雖然簡單,卻展現了 DataFusion 的核心設計哲學:將複雜的操作分解為簡單、可組合的算子,透過向量化執行實現高效能。
明天我們將進入更複雜的領域:聚合算子(AggregateExec)。聚合涉及狀態管理、多階段執行和記憶體優化,是理解 DataFusion 執行引擎的進階主題。我們將探討 GROUP BY
、SUM
、AVG
等聚合操作的內部實現機制。