iT邦幫忙

2025 iThome 鐵人賽

DAY 11
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 11

Day 11: 查詢執行生命週期 Part 3 - 優化與執行

  • 分享至 

  • xImage
  •  

前言

在前兩天的文章中,我們了解了 DataFusion 如何將 SQL 查詢轉換為 logical plan 。今天會繼續探討從 logical plan 到 physical plan 的過程,其中包含優化器如何工作、物理計劃的生成過程以及最終的執行模型。這三個階段是 DataFusion 高效能的關鍵,理解它們能幫助我們更好的優化查詢性能。

快還要更快! - Logical plan 的優化

好不容易從 SQL 轉換成 logical plan 後,你是不是以為直接執行就好了呢? NONONO~ logical plan 只告訴電腦該怎麼做而已,要做更好更有效率,還得先經過優化這道手續呢!

優化器的工作流程

DataFusion 的優化器採用基於規則的優化(Rule-Based Optimization)策略,通過一系列預定義的優化規則來改進查詢計劃。優化器的工作流程可以分為以下幾個階段:

  1. 規則註冊與排序:優化器維護一個規則列表,每個規則都有特定的執行順序
  2. 固定點迭代:優化器會重複應用規則,直到計劃不再改變或達到最大迭代次數
  3. 規則應用:每個規則都會檢查當前的 LogicalPlan 並嘗試進行優化

讓我們看看優化器的核心實作:

// datafusion/optimizer/src/optimizer.rs
impl Optimizer {
    pub fn new() -> Self {
        let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
            Arc::new(EliminateNestedUnion::new()),
            Arc::new(SimplifyExpressions::new()),
            Arc::new(ReplaceDistinctWithAggregate::new()),
            Arc::new(EliminateJoin::new()),
            // ... 更多規則
            Arc::new(PushDownFilter::new()),
            Arc::new(OptimizeProjections::new()),
        ];
        Self::with_rules(rules)
    }
}

這段程式碼的關鍵點:

  • Vec<Arc<dyn OptimizerRule>>:規則列表,每個規則都包裝在 Arc 中以便多執行緒共享
  • 規則的順序很重要:某些規則必須在其他規則之前執行
  • 每個規則都實作了 OptimizerRule trait,定義了如何優化計劃

常見優化規則

1. Predicate Pushdown(謂詞下推)- 讓過濾更早發生

什麼是謂詞下推?

謂詞下推就像是在圖書館找書時,先告訴圖書管理員你要找什麼類型的書,而不是把所有書都拿出來再慢慢篩選。在資料庫中,這意味著將過濾條件(WHERE 子句)盡可能地推送到數據源附近,減少需要處理的數據量。

為什麼重要?

假設你有一個包含 1000 萬筆記錄的銷售表,但只需要查看 2023 年的資料。如果不過濾,系統需要讀取所有 1000 萬筆記錄;但如果能將 year = 2023 的條件推送到數據源,可能只需要讀取 100 萬筆記錄。

實作原理:

// datafusion/optimizer/src/push_down_filter.rs
pub struct PushDownFilter {}

impl OptimizerRule for PushDownFilter {
    fn name(&self) -> &str {
        "push_down_filter"
    }
    
    fn apply_order(&self) -> Option<ApplyOrder> {
        Some(ApplyOrder::TopDown)  // 從上到下應用
    }
    
    fn rewrite(
        &self,
        plan: LogicalPlan,
        _config: &dyn OptimizerConfig,
    ) -> Result<Transformed<LogicalPlan>> {
        // 檢查是否為 Filter 節點
        let LogicalPlan::Filter(mut filter) = plan else {
            return Ok(Transformed::no(plan));
        };
        
        // 將過濾條件分解為多個條件
        let predicate = split_conjunction_owned(filter.predicate.clone());
        
        // 嘗試將每個條件推送到子節點
        match Arc::unwrap_or_clone(filter.input) {
            LogicalPlan::TableScan(scan) => {
                // 可以推送到表掃描
                self.push_to_table_scan(scan, predicate)
            }
            LogicalPlan::Join(join) => {
                // 可以推送到 JOIN 的某一側
                self.push_to_join(join, predicate)
            }
            _ => {
                // 無法推送,保持原樣
                Ok(Transformed::no(LogicalPlan::Filter(filter)))
            }
        }
    }
}

實際例子:

原始查詢:

ELECT product_name, price 
FROM products 
WHERE category = 'Electronics' AND price > 100

優化前:

Projection: [product_name, price]
  Filter: category = 'Electronics' AND price > 100
    TableScan: products (讀取所有列和所有行)

優化後:

Projection: [product_name, price]
  TableScan: products 
    filters: [category = 'Electronics', price > 100] (只讀取符合條件的行)

2. Projection Pushdown(投影下推)- 只讀取需要的列

什麼是投影下推?

投影下推就像是去圖書館時,只借你真正需要的書,而不是把整個書架都搬回家。在資料庫中,這意味著只讀取查詢中實際使用的列,而不是讀取整個表的所有列。

為什麼重要?

假設一個表有 100 個列,但你的查詢只需要其中的 3 個列。如果不過濾,系統需要讀取所有 100 個列的數據;但如果能推送到數據源,只需要讀取 3 個列,大大減少 I/O 開銷。

實作原理:

// datafusion/physical-optimizer/src/projection_pushdown.rs
pub struct ProjectionPushdown {}

impl PhysicalOptimizerRule for ProjectionPushdown {
    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        _config: &ConfigOptions,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // 使用 transform_down 從上到下遍歷計劃樹
        plan.transform_down(remove_unnecessary_projections).data()
    }
}

// 移除不必要的投影的具體實作
fn remove_unnecessary_projections(
    plan: Arc<dyn ExecutionPlan>
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
    let projection = plan.as_any().downcast_ref::<ProjectionExec>()?;
    
    // 檢查投影是否只是重新排列列,沒有實際計算
    if is_simple_column_reordering(projection) {
        // 可以將投影合併到子節點
        let child = projection.children()[0].clone();
        let new_child = add_projection_to_child(child, projection.expr())?;
        Ok(Transformed::yes(new_child))
    } else {
        Ok(Transformed::no(plan))
    }
}

實際例子:

原始查詢:

SELECT name, age FROM users WHERE city = 'Taipei'

優化前:

Projection: [name, age]
  Filter: city = 'Taipei'
    TableScan: users (讀取所有列:id, name, age, city, email, phone, ...)

優化後:

Filter: city = 'Taipei'
  TableScan: users 
    projection: [name, age, city] (只讀取需要的列)

3. Constant Folding(常數折疊)- 編譯時計算

什麼是常數折疊?

常數折疊就像是在做數學題時,遇到 2 + 3 就直接寫成 5,而不是等到執行時再計算。在資料庫中,這意味著在查詢編譯階段就計算出常數表達式的結果。

為什麼重要?

避免在每次執行查詢時重複計算相同的常數表達式,減少 CPU 開銷。

實作原理:

// datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
impl TreeNodeRewriter for ConstEvaluator<'_> {
    type Node = Expr;
    
    fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
        // 預設認為可以評估這個表達式
        self.can_evaluate.push(true);
        
        // 檢查這個表達式是否可以在編譯時評估
        if !Self::can_evaluate(&expr) {
            // 如果不能評估,標記所有父表達式也不能評估
            let parent_iter = self.can_evaluate.iter_mut().rev();
            for p in parent_iter {
                if !*p { break; }  // 優化:如果已經標記過,就不需要繼續
                *p = false;
            }
        }
        
        Ok(Transformed::no(expr))
    }
    
    fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
        // 檢查是否可以評估這個表達式
        if self.can_evaluate.pop().unwrap_or(false) {
            match self.evaluate_const_expr(&expr) {
                ConstSimplifyResult::Simplified(value, metadata) => {
                    // 成功計算出常數值
                    Ok(Transformed::yes(Expr::Literal(value)))
                }
                ConstSimplifyResult::CannotSimplify => {
                    // 無法簡化,保持原樣
                    Ok(Transformed::no(expr))
                }
                ConstSimplifyResult::SimplifyRuntimeError(err, _) => {
                    // 計算時發生錯誤,保持原樣
                    Ok(Transformed::no(expr))
                }
            }
        } else {
            Ok(Transformed::no(expr))
        }
    }
}

實際例子:

原始查詢:

SELECT name, age + 10 as future_age 
FROM users 
WHERE birth_year = 2024 - age

優化前:

Projection: [name, age + 10 as future_age]
  Filter: birth_year = 2024 - age
    TableScan: users

優化後:

Projection: [name, age + 10 as future_age]
  Filter: birth_year = 2024 - age  (age + 10 在執行時計算)
    TableScan: users

注意:在這個例子中,age + 10 無法在編譯時計算,因為 age 是變數。但如果查詢是 SELECT 2 + 3 as five,那麼 2 + 3 會被折疊為 5。

PhysicalPlanner 的轉換過程

優化後的 LogicalPlan 需要轉換為可執行的 ExecutionPlan。PhysicalPlanner 負責這個轉換過程,就像將建築藍圖轉換為實際的施工計劃。

轉換過程的關鍵步驟:

  1. 樹扁平化:將邏輯計劃樹轉換為線性結構,便於並行處理
  2. 節點映射:將每個邏輯節點映射到對應的物理執行算子
  3. 並行構建:多個執行緒同時構建物理計劃的不同部分
// datafusion/core/src/physical_planner.rs
impl DefaultPhysicalPlanner {
    async fn create_initial_plan(
        &self,
        logical_plan: &LogicalPlan,
        session_state: &SessionState,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // 步驟 1: 使用深度優先搜尋將樹扁平化
        let mut flat_tree = vec![];
        let mut dfs_visit_stack = vec![(None, logical_plan)];
        
        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
            let current_index = flat_tree.len();
            
            // 將子節點加入搜尋堆疊(注意:順序是反的)
            dfs_visit_stack.extend(
                node.inputs().iter().map(|&n| (Some(current_index), n))
            );
            
            // 根據子節點數量決定節點狀態
            let state = match node.inputs().len() {
                0 => NodeState::ZeroOrOneChild,  // 葉節點
                1 => NodeState::ZeroOrOneChild,  // 單子節點
                _ => NodeState::TwoOrMoreChildren(  // 多子節點
                    Mutex::new(Vec::new())
                ),
            };
            
            let node = LogicalNode {
                node,
                parent_index,
                state,
            };
            flat_tree.push(node);
        }
        
        // 步驟 2: 並行構建物理計劃
        let flat_tree = Arc::new(flat_tree);
        let planning_concurrency = session_state
            .config_options()
            .execution
            .planning_concurrency;
            
        // 從葉節點開始,並行構建物理計劃
        // ... 並行構建邏輯
    }
}

這段程式碼的關鍵點:

  • 深度優先搜尋:確保子節點在父節點之前被處理
  • 節點狀態管理:不同數量的子節點需要不同的同步機制
  • 並行構建:多個執行緒可以同時處理不同的子樹

ExecutionPlan 的執行模型

DataFusion 採用基於流的執行模型,使用 SendableRecordBatchStream 來實現拉式(Pull-based)執行。這就像是一個流水線,每個工人都從上游拿取材料,處理後傳給下游。

流式執行的優勢:

  1. 記憶體效率:一次只處理一個 RecordBatch,不需要將整個結果集載入記憶體
  2. 並行性:多個分區可以並行執行
  3. 背壓處理:當下游處理較慢時,上游會自動減速
// datafusion/execution/src/stream.rs
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
    /// 返回這個流的 Schema
    fn schema(&self) -> SchemaRef;
}

執行流程示意:

消費者調用 next() 
    ↓
ProjectionExec 調用子節點的 next()
    ↓
FilterExec 調用子節點的 next()
    ↓
TableScanExec 從數據源讀取 RecordBatch
    ↓
返回處理後的 RecordBatch

小結

今天我們深入了解了 DataFusion 查詢執行的三個關鍵階段:

  1. 優化器:通過規則引擎改進查詢計劃,包括謂詞下推、投影下推、常數折疊等
  2. PhysicalPlanner:將優化後的邏輯計劃轉換為可執行的物理計劃
  3. 執行模型:基於流的拉式執行,提供高效的記憶體使用和並行處理

這些機制共同構成了 DataFusion 高效能查詢執行的基礎。明天我們將深入探討 LogicalPlan 的設計模式,了解為什麼它採用 Enum 設計以及各個變體的具體作用。

參考資料


上一篇
Day 10: 查詢執行生命週期 Part 2 - 從 SQL 到 logical plan
下一篇
Day 12: LogicalPlan 深入理解 Part 1 - 設計模式
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅14
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言