在前面的章節中,我們已經理解 DataFusion 如何將 SQL 解析為 LogicalPlan,並透過 Optimizer 進行優化,最後由 PhysicalPlanner 轉換為可執行的物理計劃。今天,我們要進入執行引擎的核心——ExecutionPlan trait,這是所有物理計劃節點必須實作的介面。
理解 ExecutionPlan 的設計至關重要,因為它定義了查詢執行的基本契約:每個物理算子如何產生數據、如何描述自己的行為特性、以及如何與其他算子協作。這個 trait 的設計使得 DataFusion 能夠建構高效、可擴展的執行引擎。
今天我們將學習:
ExecutionPlan 是 DataFusion 物理計劃中所有節點必須實作的 trait。它定義了一個統一的介面,讓每個物理算子(如 FilterExec、ProjectionExec、HashJoinExec 等)能夠:
讓我們看看 ExecutionPlan trait 的定義:
pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// 執行計劃的簡短名稱,如 'FilterExec', 'ProjectionExec'
fn name(&self) -> &str;
/// 返回執行計劃的 Any 類型,用於向下轉型
fn as_any(&self) -> &dyn Any;
/// 獲取此執行計劃的輸出 Schema
fn schema(&self) -> SchemaRef {
Arc::clone(self.properties().schema())
}
/// 返回執行計劃的輸出屬性(排序、分區等)
fn properties(&self) -> &PlanProperties;
/// 獲取子節點列表
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
/// 開始執行指定分區,返回 RecordBatch
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
// ... 其他方法
}
schema()
- 輸出 Schemaschema()
方法返回此執行計劃產生的數據的 Schema(結構描述),包含列名、數據類型、是否可為空等資訊。
fn schema(&self) -> SchemaRef {
Arc::clone(self.properties().schema())
}
Schema 是查詢處理的基礎,DataFusion 使用 Apache Arrow 的 Schema 定義。例如,一個 FilterExec 的輸出 Schema 通常與其輸入相同,而 ProjectionExec 則會根據投影的列來調整 Schema。
實際範例:
// 假設我們有一個 FilterExec
let filter_exec: Arc<dyn ExecutionPlan> = create_filter_exec();
let schema = filter_exec.schema();
// Schema 包含了所有欄位的元數據
for field in schema.fields() {
println!("欄位: {}, 類型: {}", field.name(), field.data_type());
}
properties()
- 執行計劃屬性properties()
返回一個 PlanProperties
結構,這是 DataFusion 的關鍵設計:將執行計劃的各種屬性集中管理,避免重複計算。我們稍後會詳細討論。
execute()
- 執行入口execute()
是最核心的方法,它開始執行指定的分區並返回一個異步的 RecordBatch:
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
關鍵設計特點:
partition
參數指定要執行哪個分區,這使得 DataFusion 可以並行處理不同分區execute()
本身不是 async 函數,但它返回一個 async Stream,只有在請求數據時才實際計算SendableRecordBatchStream
,支持漸進式產生結果,不需要一次性載入所有數據到內存執行流程圖:
┌─────────────────────────────────────────────────────────────┐
│ ExecutionPlan::execute(0) │
│ │
│ partition 0 partition 1 partition 2 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Stream │ │ Stream │ │ Stream │ │
│ │ Batch 1 │ │ Batch 1 │ │ Batch 1 │ │
│ │ Batch 2 │ │ Batch 2 │ │ Batch 2 │ │
│ │ ... │ │ ... │ │ ... │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └────────────────┴────────────────┘ │
│ │ │
│ ▼ │
│ 並行處理後合併結果 │
└─────────────────────────────────────────────────────────────┘
實際使用範例:
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use futures::StreamExt;
async fn execute_plan(plan: Arc<dyn ExecutionPlan>) -> Result<()> {
// 創建執行上下文
let task_ctx = Arc::new(TaskContext::default());
// 獲取分區數量
let partition_count = plan.output_partitioning().partition_count();
// 執行第一個分區
let mut stream = plan.execute(0, task_ctx)?;
// 逐批處理數據
while let Some(batch) = stream.next().await {
let batch = batch?;
println!("收到批次:{} 行", batch.num_rows());
// 處理 RecordBatch...
}
Ok(())
}
ExecutionPlan 還定義了一系列方法來聲明對輸入的要求,這些資訊被優化器用來插入必要的重分區或排序操作:
/// 指定每個子節點的數據分布要求
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution; self.children().len()]
}
/// 指定每個子節點的排序要求
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
vec![None; self.children().len()]
}
/// 此算子是否保持輸入的順序
fn maintains_input_order(&self) -> Vec<bool> {
vec![false; self.children().len()]
}
例如,HashJoinExec
可能要求其 build 側的數據為單一分區(Distribution::SinglePartition
),而 SortMergeJoinExec
則要求輸入已按 join key 排序。
在執行計劃優化過程中,優化器需要頻繁查詢各個節點的輸出特性:
如果每次查詢都重新計算這些屬性,會造成巨大的性能開銷。因此 DataFusion 設計了 PlanProperties 結構來緩存這些計算結果:
#[derive(Debug, Clone)]
pub struct PlanProperties {
/// Schema 資訊
pub eq_properties: EquivalenceProperties,
/// 輸出分區策略
pub partitioning: Partitioning,
/// 流的發射類型(增量式或批次式)
pub emission_type: EmissionType,
/// 流的有界性(有界或無界)
pub boundedness: Boundedness,
/// 評估類型
pub evaluation_type: EvaluationType,
/// 調度類型
pub scheduling_type: SchedulingType,
/// 輸出排序
output_ordering: Option<LexOrdering>,
}
所有 ExecutionPlan 都實作了 ExecutionPlanProperties
trait,它提供統一的方式訪問這些屬性:
pub trait ExecutionPlanProperties {
/// 輸出如何分區
fn output_partitioning(&self) -> &Partitioning;
/// 輸出是否有序,以及排序鍵
fn output_ordering(&self) -> Option<&LexOrdering>;
/// 流的有界性
fn boundedness(&self) -> Boundedness;
/// 流的發射行為
fn pipeline_behavior(&self) -> EmissionType;
/// 等價屬性
fn equivalence_properties(&self) -> &EquivalenceProperties;
}
output_partitioning()
描述數據如何分佈在多個分區中,這對於並行執行至關重要:
pub enum Partitioning {
/// 輪詢分區:數據隨機分散
RoundRobinBatch(usize),
/// 雜湊分區:按指定列的 hash 值分區
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// 未知分區
UnknownPartitioning(usize),
}
分區示意圖:
Hash Partitioning (by customer_id)
┌─────────────────────────────────────────────────────────────┐
│ Source Data │
│ customer_id=1, customer_id=5, customer_id=2, customer_id=1 │
└────────────────────────────┬────────────────────────────────┘
│
hash(customer_id) % 3
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Partition 0│ │ Partition 1│ │ Partition 2│
│ │ │ │ │ │
│ id=1, id=1 │ │ id=5 │ │ id=2 │
└────────────┘ └────────────┘ └────────────┘
output_ordering()
告訴優化器數據是否已排序,這能避免不必要的重排序:
// 如果數據按 (timestamp DESC, user_id ASC) 排序
let ordering = vec![
PhysicalSortExpr {
expr: col("timestamp"),
options: SortOptions {
descending: true,
nulls_first: false,
},
},
PhysicalSortExpr {
expr: col("user_id"),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
];
排序資訊非常重要,例如:
PARTITION BY
和 ORDER BY
子句排序SortExec
可以被優化掉在開發和調優過程中,我們需要查看執行計劃的結構來:
DataFusion 提供了 DisplayableExecutionPlan
來將執行計劃以各種格式呈現。
use datafusion::physical_plan::displayable;
let plan: Arc<dyn ExecutionPlan> = ...;
let displayable_plan = displayable(plan.as_ref());
// 單行顯示
println!("{}", displayable_plan.one_line());
// 縮進樹狀顯示
println!("{}", displayable_plan.indent(false));
// Graphviz 格式(可視化)
println!("{}", displayable_plan.graphviz());
1. 縮進格式(Indent Format)
ProjectionExec: expr=[id@0, name@1, total@2]
AggregateExec: mode=FinalPartitioned, gby=[id@0, name@1], aggr=[SUM(amount)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([id@0, name@1], 4)
AggregateExec: mode=Partial, gby=[id@0, name@1], aggr=[SUM(amount)]
FilterExec: amount@2 > 100
CsvExec: file_groups={4 groups}, projection=[id, name, amount]
這種格式清楚地展示了:
2. Graphviz 格式
可以將輸出複製到 Graphviz Online 生成圖形:
digraph {
0[label="ProjectionExec: expr=[id, name, total]"]
1[label="AggregateExec: mode=FinalPartitioned"]
2[label="RepartitionExec: partitioning=Hash"]
3[label="AggregateExec: mode=Partial"]
4[label="FilterExec"]
5[label="CsvExec"]
0 -> 1
1 -> 2
2 -> 3
3 -> 4
4 -> 5
}
DisplayableExecutionPlan
還支持顯示額外資訊:
// 顯示 Schema 資訊
let display = displayable(plan.as_ref())
.set_show_schema(true);
// 顯示統計資訊
let display = displayable(plan.as_ref())
.set_show_statistics(true);
// 顯示指標(metrics)
let display = DisplayableExecutionPlan::with_metrics(plan.as_ref());
println!("{}", display.indent(false));
顯示的輸出會包含更詳細的資訊:
FilterExec: amount@2 > 100, metrics=[output_rows=1234, elapsed_time=5ms]
schema=[[id:Int32;N, name:Utf8;N, amount:Float64;N]]
statistics=[num_rows=5000, total_byte_size=150000]
讓我們通過一個完整的例子來觀察執行計劃:
use datafusion::prelude::*;
use datafusion::physical_plan::displayable;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// 註冊測試數據
ctx.register_csv("orders", "orders.csv", CsvReadOptions::new()).await?;
// 構建查詢
let df = ctx.sql(
"SELECT customer_id, SUM(amount) as total
FROM orders
WHERE amount > 100
GROUP BY customer_id
ORDER BY total DESC
LIMIT 10"
).await?;
// 獲取物理計劃
let physical_plan = df.create_physical_plan().await?;
// 顯示執行計劃
let displayable = displayable(physical_plan.as_ref());
println!("執行計劃:\n{}", displayable.indent(false));
// 執行查詢
let results = df.collect().await?;
println!("結果:{:?}", results);
Ok(())
}
這會輸出類似:
執行計劃:
GlobalLimitExec: skip=0, fetch=10
SortPreservingMergeExec: [total@1 DESC]
SortExec: expr=[total@1 DESC], preserve_partitioning=[true]
AggregateExec: mode=FinalPartitioned, gby=[customer_id@0], aggr=[SUM(amount)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([customer_id@0], 4)
AggregateExec: mode=Partial, gby=[customer_id@0], aggr=[SUM(amount)]
FilterExec: amount@1 > 100
CsvExec: file_groups={1 group}, projection=[customer_id, amount]
執行計劃應該從下往上閱讀:
觀察 RepartitionExec
和 CoalescePartitionsExec
:
過多的重分區會影響性能,應該關注是否有不必要的 repartition。
注意 AggregateExec
的 mode:
這是分佈式聚合的標準模式,可以大幅減少數據傳輸。
在 SQL 中,可以使用 EXPLAIN
直接查看執行計劃:
EXPLAIN
SELECT customer_id, SUM(amount)
FROM orders
GROUP BY customer_id;
這比寫 Rust 代碼更快速,適合快速驗證優化效果。
如果你要實作自己的 ExecutionPlan:
properties()
:確保 PlanProperties 準確反映輸出特性maintains_input_order()
:幫助優化器避免不必要的排序execute()
中不要立即計算,返回 Stream 讓消費者驅動今天我們深入探討了 ExecutionPlan trait 的設計,這是 DataFusion 執行引擎的基石:
核心方法:
schema()
描述輸出結構properties()
提供緩存的屬性資訊execute()
是實際執行入口,返回 RecordBatch屬性系統:
output_partitioning()
描述數據分區方式output_ordering()
描述數據排序狀態可視化工具:
明天我們將繼續深入 Stream 執行模型,理解 DataFusion 如何使用 Pull-based Volcano 模型來實現高效的流式執行。