iT邦幫忙

2025 iThome 鐵人賽

DAY 16
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 16

Day 16: ExecutionPlan 體系架構 Part 1 - Trait 設計

  • 分享至 

  • xImage
  •  

前言

在前面的章節中,我們已經理解 DataFusion 如何將 SQL 解析為 LogicalPlan,並透過 Optimizer 進行優化,最後由 PhysicalPlanner 轉換為可執行的物理計劃。今天,我們要進入執行引擎的核心——ExecutionPlan trait,這是所有物理計劃節點必須實作的介面。

理解 ExecutionPlan 的設計至關重要,因為它定義了查詢執行的基本契約:每個物理算子如何產生數據、如何描述自己的行為特性、以及如何與其他算子協作。這個 trait 的設計使得 DataFusion 能夠建構高效、可擴展的執行引擎。

今天我們將學習:

  • ExecutionPlan trait 的核心方法及其設計理念
  • 執行計劃的屬性系統(PlanProperties)
  • 如何透過 DisplayableExecutionPlan 視覺化執行計劃

ExecutionPlan Trait:執行引擎的核心介面

什麼是 ExecutionPlan?

ExecutionPlan 是 DataFusion 物理計劃中所有節點必須實作的 trait。它定義了一個統一的介面,讓每個物理算子(如 FilterExec、ProjectionExec、HashJoinExec 等)能夠:

  1. 描述自己的輸出特性:Schema、分區方式、排序順序
  2. 聲明對輸入的要求:數據分布要求、排序要求
  3. 執行查詢邏輯:產生 RecordBatch

讓我們看看 ExecutionPlan trait 的定義:

pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
    /// 執行計劃的簡短名稱,如 'FilterExec', 'ProjectionExec'
    fn name(&self) -> &str;

    /// 返回執行計劃的 Any 類型,用於向下轉型
    fn as_any(&self) -> &dyn Any;

    /// 獲取此執行計劃的輸出 Schema
    fn schema(&self) -> SchemaRef {
        Arc::clone(self.properties().schema())
    }

    /// 返回執行計劃的輸出屬性(排序、分區等)
    fn properties(&self) -> &PlanProperties;

    /// 獲取子節點列表
    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;

    /// 開始執行指定分區,返回 RecordBatch
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream>;

    // ... 其他方法
}

核心方法詳解

1. schema() - 輸出 Schema

schema() 方法返回此執行計劃產生的數據的 Schema(結構描述),包含列名、數據類型、是否可為空等資訊。

fn schema(&self) -> SchemaRef {
    Arc::clone(self.properties().schema())
}

Schema 是查詢處理的基礎,DataFusion 使用 Apache Arrow 的 Schema 定義。例如,一個 FilterExec 的輸出 Schema 通常與其輸入相同,而 ProjectionExec 則會根據投影的列來調整 Schema。

實際範例

// 假設我們有一個 FilterExec
let filter_exec: Arc<dyn ExecutionPlan> = create_filter_exec();
let schema = filter_exec.schema();

// Schema 包含了所有欄位的元數據
for field in schema.fields() {
    println!("欄位: {}, 類型: {}", field.name(), field.data_type());
}

2. properties() - 執行計劃屬性

properties() 返回一個 PlanProperties 結構,這是 DataFusion 的關鍵設計:將執行計劃的各種屬性集中管理,避免重複計算。我們稍後會詳細討論。

3. execute() - 執行入口

execute() 是最核心的方法,它開始執行指定的分區並返回一個異步的 RecordBatch:

fn execute(
    &self,
    partition: usize,
    context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;

關鍵設計特點

  1. 分區執行partition 參數指定要執行哪個分區,這使得 DataFusion 可以並行處理不同分區
  2. 惰性計算execute() 本身不是 async 函數,但它返回一個 async Stream,只有在請求數據時才實際計算
  3. 流式處理:返回的是 SendableRecordBatchStream,支持漸進式產生結果,不需要一次性載入所有數據到內存

執行流程圖

┌─────────────────────────────────────────────────────────────┐
│                     ExecutionPlan::execute(0)               │
│                                                             │
│  partition 0      partition 1      partition 2              │
│       │                │                │                   │
│       ▼                ▼                ▼                   │
│  ┌─────────┐      ┌─────────┐      ┌─────────┐              │
│  │ Stream  │      │ Stream  │      │ Stream  │              │
│  │ Batch 1 │      │ Batch 1 │      │ Batch 1 │              │
│  │ Batch 2 │      │ Batch 2 │      │ Batch 2 │              │
│  │   ...   │      │   ...   │      │   ...   │              │
│  └─────────┘      └─────────┘      └─────────┘              │
│       │                │                │                   │
│       └────────────────┴────────────────┘                   │
│                        │                                    │
│                        ▼                                    │
│               並行處理後合併結果                              │
└─────────────────────────────────────────────────────────────┘

實際使用範例

use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use futures::StreamExt;

async fn execute_plan(plan: Arc<dyn ExecutionPlan>) -> Result<()> {
    // 創建執行上下文
    let task_ctx = Arc::new(TaskContext::default());
    
    // 獲取分區數量
    let partition_count = plan.output_partitioning().partition_count();
    
    // 執行第一個分區
    let mut stream = plan.execute(0, task_ctx)?;
    
    // 逐批處理數據
    while let Some(batch) = stream.next().await {
        let batch = batch?;
        println!("收到批次:{} 行", batch.num_rows());
        // 處理 RecordBatch...
    }
    
    Ok(())
}

4. 輸入要求方法

ExecutionPlan 還定義了一系列方法來聲明對輸入的要求,這些資訊被優化器用來插入必要的重分區或排序操作:

/// 指定每個子節點的數據分布要求
fn required_input_distribution(&self) -> Vec<Distribution> {
    vec![Distribution::UnspecifiedDistribution; self.children().len()]
}

/// 指定每個子節點的排序要求
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
    vec![None; self.children().len()]
}

/// 此算子是否保持輸入的順序
fn maintains_input_order(&self) -> Vec<bool> {
    vec![false; self.children().len()]
}

例如,HashJoinExec 可能要求其 build 側的數據為單一分區(Distribution::SinglePartition),而 SortMergeJoinExec 則要求輸入已按 join key 排序。

PlanProperties:執行計劃的屬性系統

為什麼需要 PlanProperties?

在執行計劃優化過程中,優化器需要頻繁查詢各個節點的輸出特性:

  • 數據如何分區?(RoundRobin、Hash、Range)
  • 輸出是否有序?按哪些列排序?
  • 哪些列被認為是等價的?

如果每次查詢都重新計算這些屬性,會造成巨大的性能開銷。因此 DataFusion 設計了 PlanProperties 結構來緩存這些計算結果:

#[derive(Debug, Clone)]
pub struct PlanProperties {
    /// Schema 資訊
    pub eq_properties: EquivalenceProperties,
    /// 輸出分區策略
    pub partitioning: Partitioning,
    /// 流的發射類型(增量式或批次式)
    pub emission_type: EmissionType,
    /// 流的有界性(有界或無界)
    pub boundedness: Boundedness,
    /// 評估類型
    pub evaluation_type: EvaluationType,
    /// 調度類型
    pub scheduling_type: SchedulingType,
    /// 輸出排序
    output_ordering: Option<LexOrdering>,
}

ExecutionPlanProperties Trait

所有 ExecutionPlan 都實作了 ExecutionPlanProperties trait,它提供統一的方式訪問這些屬性:

pub trait ExecutionPlanProperties {
    /// 輸出如何分區
    fn output_partitioning(&self) -> &Partitioning;

    /// 輸出是否有序,以及排序鍵
    fn output_ordering(&self) -> Option<&LexOrdering>;

    /// 流的有界性
    fn boundedness(&self) -> Boundedness;

    /// 流的發射行為
    fn pipeline_behavior(&self) -> EmissionType;

    /// 等價屬性
    fn equivalence_properties(&self) -> &EquivalenceProperties;
}

輸出分區(Output Partitioning)

output_partitioning() 描述數據如何分佈在多個分區中,這對於並行執行至關重要:

pub enum Partitioning {
    /// 輪詢分區:數據隨機分散
    RoundRobinBatch(usize),
    
    /// 雜湊分區:按指定列的 hash 值分區
    Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
    
    /// 未知分區
    UnknownPartitioning(usize),
}

分區示意圖

Hash Partitioning (by customer_id)
┌─────────────────────────────────────────────────────────────┐
│                         Source Data                         │
│  customer_id=1, customer_id=5, customer_id=2, customer_id=1 │
└────────────────────────────┬────────────────────────────────┘
                             │
                    hash(customer_id) % 3
                             │
           ┌─────────────────┼─────────────────┐
           ▼                 ▼                 ▼
    ┌────────────┐    ┌────────────┐    ┌────────────┐
    │ Partition 0│    │ Partition 1│    │ Partition 2│
    │            │    │            │    │            │
    │ id=1, id=1 │    │   id=5     │    │   id=2     │
    └────────────┘    └────────────┘    └────────────┘

輸出排序(Output Ordering)

output_ordering() 告訴優化器數據是否已排序,這能避免不必要的重排序:

// 如果數據按 (timestamp DESC, user_id ASC) 排序
let ordering = vec![
    PhysicalSortExpr {
        expr: col("timestamp"),
        options: SortOptions {
            descending: true,
            nulls_first: false,
        },
    },
    PhysicalSortExpr {
        expr: col("user_id"),
        options: SortOptions {
            descending: false,
            nulls_first: false,
        },
    },
];

排序資訊非常重要,例如:

  • SortMergeJoin 需要兩邊輸入都已排序
  • 窗口函數可能需要按 PARTITION BYORDER BY 子句排序
  • 如果數據已經有序,SortExec 可以被優化掉

DisplayableExecutionPlan:執行計劃可視化

為什麼需要可視化?

在開發和調優過程中,我們需要查看執行計劃的結構來:

  • 驗證計劃是否符合預期
  • 找出性能瓶頸
  • 理解優化器的決策

DataFusion 提供了 DisplayableExecutionPlan 來將執行計劃以各種格式呈現。

基本使用

use datafusion::physical_plan::displayable;

let plan: Arc<dyn ExecutionPlan> = ...;
let displayable_plan = displayable(plan.as_ref());

// 單行顯示
println!("{}", displayable_plan.one_line());

// 縮進樹狀顯示
println!("{}", displayable_plan.indent(false));

// Graphviz 格式(可視化)
println!("{}", displayable_plan.graphviz());

顯示格式範例

1. 縮進格式(Indent Format)

ProjectionExec: expr=[id@0, name@1, total@2]
  AggregateExec: mode=FinalPartitioned, gby=[id@0, name@1], aggr=[SUM(amount)]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([id@0, name@1], 4)
        AggregateExec: mode=Partial, gby=[id@0, name@1], aggr=[SUM(amount)]
          FilterExec: amount@2 > 100
            CsvExec: file_groups={4 groups}, projection=[id, name, amount]

這種格式清楚地展示了:

  • 執行計劃的樹狀結構
  • 每個算子的關鍵參數
  • 數據流動的方向(從下往上)

2. Graphviz 格式

可以將輸出複製到 Graphviz Online 生成圖形:

digraph {
    0[label="ProjectionExec: expr=[id, name, total]"]
    1[label="AggregateExec: mode=FinalPartitioned"]
    2[label="RepartitionExec: partitioning=Hash"]
    3[label="AggregateExec: mode=Partial"]
    4[label="FilterExec"]
    5[label="CsvExec"]
    0 -> 1
    1 -> 2
    2 -> 3
    3 -> 4
    4 -> 5
}

顯示更多資訊

DisplayableExecutionPlan 還支持顯示額外資訊:

// 顯示 Schema 資訊
let display = displayable(plan.as_ref())
    .set_show_schema(true);

// 顯示統計資訊
let display = displayable(plan.as_ref())
    .set_show_statistics(true);

// 顯示指標(metrics)
let display = DisplayableExecutionPlan::with_metrics(plan.as_ref());

println!("{}", display.indent(false));

顯示的輸出會包含更詳細的資訊:

FilterExec: amount@2 > 100, metrics=[output_rows=1234, elapsed_time=5ms]
  schema=[[id:Int32;N, name:Utf8;N, amount:Float64;N]]
  statistics=[num_rows=5000, total_byte_size=150000]

實際應用:觀察執行計劃

讓我們通過一個完整的例子來觀察執行計劃:

use datafusion::prelude::*;
use datafusion::physical_plan::displayable;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    
    // 註冊測試數據
    ctx.register_csv("orders", "orders.csv", CsvReadOptions::new()).await?;
    
    // 構建查詢
    let df = ctx.sql(
        "SELECT customer_id, SUM(amount) as total 
         FROM orders 
         WHERE amount > 100 
         GROUP BY customer_id 
         ORDER BY total DESC 
         LIMIT 10"
    ).await?;
    
    // 獲取物理計劃
    let physical_plan = df.create_physical_plan().await?;
    
    // 顯示執行計劃
    let displayable = displayable(physical_plan.as_ref());
    println!("執行計劃:\n{}", displayable.indent(false));
    
    // 執行查詢
    let results = df.collect().await?;
    println!("結果:{:?}", results);
    
    Ok(())
}

這會輸出類似:

執行計劃:
GlobalLimitExec: skip=0, fetch=10
  SortPreservingMergeExec: [total@1 DESC]
    SortExec: expr=[total@1 DESC], preserve_partitioning=[true]
      AggregateExec: mode=FinalPartitioned, gby=[customer_id@0], aggr=[SUM(amount)]
        CoalesceBatchesExec: target_batch_size=8192
          RepartitionExec: partitioning=Hash([customer_id@0], 4)
            AggregateExec: mode=Partial, gby=[customer_id@0], aggr=[SUM(amount)]
              FilterExec: amount@1 > 100
                CsvExec: file_groups={1 group}, projection=[customer_id, amount]

進階技巧/最佳實務

1. 理解執行計劃的閱讀順序

執行計劃應該從下往上閱讀:

  • 最底層是數據源(CsvExec、ParquetExec)
  • 向上是各種轉換算子
  • 頂層是最終輸出

2. 注意分區數量的變化

觀察 RepartitionExecCoalescePartitionsExec

  • RepartitionExec:增加分區數以提高並行度
  • CoalescePartitionsExec:減少分區數以降低開銷

過多的重分區會影響性能,應該關注是否有不必要的 repartition。

3. 識別兩階段聚合

注意 AggregateExec 的 mode:

  • Partial:在各分區本地先聚合
  • FinalPartitionedFinal:合併部分聚合結果

這是分佈式聚合的標準模式,可以大幅減少數據傳輸。

4. 利用 EXPLAIN 快速查看計劃

在 SQL 中,可以使用 EXPLAIN 直接查看執行計劃:

EXPLAIN 
SELECT customer_id, SUM(amount) 
FROM orders 
GROUP BY customer_id;

這比寫 Rust 代碼更快速,適合快速驗證優化效果。

5. 自定義 ExecutionPlan 的最佳實踐

如果你要實作自己的 ExecutionPlan:

  1. 正確實作 properties():確保 PlanProperties 準確反映輸出特性
  2. 實作 maintains_input_order():幫助優化器避免不必要的排序
  3. 遵循惰性計算原則:在 execute() 中不要立即計算,返回 Stream 讓消費者驅動
  4. 處理取消操作:確保 Stream 被 drop 時能正確釋放資源

小結

今天我們深入探討了 ExecutionPlan trait 的設計,這是 DataFusion 執行引擎的基石:

  1. 核心方法

    • schema() 描述輸出結構
    • properties() 提供緩存的屬性資訊
    • execute() 是實際執行入口,返回 RecordBatch
  2. 屬性系統

    • PlanProperties 統一管理輸出特性
    • output_partitioning() 描述數據分區方式
    • output_ordering() 描述數據排序狀態
  3. 可視化工具

    • DisplayableExecutionPlan 提供多種顯示格式
    • 支持顯示 metrics、statistics、schema 等額外資訊
    • 幫助理解和調優執行計劃

明天我們將繼續深入 Stream 執行模型,理解 DataFusion 如何使用 Pull-based Volcano 模型來實現高效的流式執行。

參考資料

  1. DataFusion ExecutionPlan 源碼
  2. DataFusion Display 模組
  3. Apache Arrow Schema 文檔
  4. Volcano Iterator Model 論文 - 理解 pull-based 執行模型的經典論文
  5. DataFusion Architecture Guide - 官方架構文檔

上一篇
Day 15: 物理計劃生成 - 從邏輯到物理的橋樑
下一篇
Day 17: ExecutionPlan 體系架構 Part 2 - Stream 執行模型
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言