在前兩天的文章中,我們了解了 DataFusion 如何將 SQL 查詢轉換為 logical plan 。今天會繼續探討從 logical plan 到 physical plan 的過程,其中包含優化器如何工作、物理計劃的生成過程以及最終的執行模型。這三個階段是 DataFusion 高效能的關鍵,理解它們能幫助我們更好的優化查詢性能。
好不容易從 SQL 轉換成 logical plan 後,你是不是以為直接執行就好了呢? NONONO~ logical plan 只告訴電腦該怎麼做而已,要做更好更有效率,還得先經過優化這道手續呢!
DataFusion 的優化器採用基於規則的優化(Rule-Based Optimization)策略,通過一系列預定義的優化規則來改進查詢計劃。優化器的工作流程可以分為以下幾個階段:
LogicalPlan
並嘗試進行優化讓我們看看優化器的核心實作:
// datafusion/optimizer/src/optimizer.rs
impl Optimizer {
pub fn new() -> Self {
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(EliminateNestedUnion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(ReplaceDistinctWithAggregate::new()),
Arc::new(EliminateJoin::new()),
// ... 更多規則
Arc::new(PushDownFilter::new()),
Arc::new(OptimizeProjections::new()),
];
Self::with_rules(rules)
}
}
這段程式碼的關鍵點:
Vec<Arc<dyn OptimizerRule>>
:規則列表,每個規則都包裝在 Arc 中以便多執行緒共享OptimizerRule
trait,定義了如何優化計劃1. Predicate Pushdown(謂詞下推)- 讓過濾更早發生
什麼是謂詞下推?
謂詞下推就像是在圖書館找書時,先告訴圖書管理員你要找什麼類型的書,而不是把所有書都拿出來再慢慢篩選。在資料庫中,這意味著將過濾條件(WHERE 子句)盡可能地推送到數據源附近,減少需要處理的數據量。
為什麼重要?
假設你有一個包含 1000 萬筆記錄的銷售表,但只需要查看 2023 年的資料。如果不過濾,系統需要讀取所有 1000 萬筆記錄;但如果能將 year = 2023 的條件推送到數據源,可能只需要讀取 100 萬筆記錄。
實作原理:
// datafusion/optimizer/src/push_down_filter.rs
pub struct PushDownFilter {}
impl OptimizerRule for PushDownFilter {
fn name(&self) -> &str {
"push_down_filter"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown) // 從上到下應用
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// 檢查是否為 Filter 節點
let LogicalPlan::Filter(mut filter) = plan else {
return Ok(Transformed::no(plan));
};
// 將過濾條件分解為多個條件
let predicate = split_conjunction_owned(filter.predicate.clone());
// 嘗試將每個條件推送到子節點
match Arc::unwrap_or_clone(filter.input) {
LogicalPlan::TableScan(scan) => {
// 可以推送到表掃描
self.push_to_table_scan(scan, predicate)
}
LogicalPlan::Join(join) => {
// 可以推送到 JOIN 的某一側
self.push_to_join(join, predicate)
}
_ => {
// 無法推送,保持原樣
Ok(Transformed::no(LogicalPlan::Filter(filter)))
}
}
}
}
實際例子:
原始查詢:
ELECT product_name, price
FROM products
WHERE category = 'Electronics' AND price > 100
優化前:
Projection: [product_name, price]
Filter: category = 'Electronics' AND price > 100
TableScan: products (讀取所有列和所有行)
優化後:
Projection: [product_name, price]
TableScan: products
filters: [category = 'Electronics', price > 100] (只讀取符合條件的行)
2. Projection Pushdown(投影下推)- 只讀取需要的列
什麼是投影下推?
投影下推就像是去圖書館時,只借你真正需要的書,而不是把整個書架都搬回家。在資料庫中,這意味著只讀取查詢中實際使用的列,而不是讀取整個表的所有列。
為什麼重要?
假設一個表有 100 個列,但你的查詢只需要其中的 3 個列。如果不過濾,系統需要讀取所有 100 個列的數據;但如果能推送到數據源,只需要讀取 3 個列,大大減少 I/O 開銷。
實作原理:
// datafusion/physical-optimizer/src/projection_pushdown.rs
pub struct ProjectionPushdown {}
impl PhysicalOptimizerRule for ProjectionPushdown {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// 使用 transform_down 從上到下遍歷計劃樹
plan.transform_down(remove_unnecessary_projections).data()
}
}
// 移除不必要的投影的具體實作
fn remove_unnecessary_projections(
plan: Arc<dyn ExecutionPlan>
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let projection = plan.as_any().downcast_ref::<ProjectionExec>()?;
// 檢查投影是否只是重新排列列,沒有實際計算
if is_simple_column_reordering(projection) {
// 可以將投影合併到子節點
let child = projection.children()[0].clone();
let new_child = add_projection_to_child(child, projection.expr())?;
Ok(Transformed::yes(new_child))
} else {
Ok(Transformed::no(plan))
}
}
實際例子:
原始查詢:
SELECT name, age FROM users WHERE city = 'Taipei'
優化前:
Projection: [name, age]
Filter: city = 'Taipei'
TableScan: users (讀取所有列:id, name, age, city, email, phone, ...)
優化後:
Filter: city = 'Taipei'
TableScan: users
projection: [name, age, city] (只讀取需要的列)
3. Constant Folding(常數折疊)- 編譯時計算
什麼是常數折疊?
常數折疊就像是在做數學題時,遇到 2 + 3 就直接寫成 5,而不是等到執行時再計算。在資料庫中,這意味著在查詢編譯階段就計算出常數表達式的結果。
為什麼重要?
避免在每次執行查詢時重複計算相同的常數表達式,減少 CPU 開銷。
實作原理:
// datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
impl TreeNodeRewriter for ConstEvaluator<'_> {
type Node = Expr;
fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
// 預設認為可以評估這個表達式
self.can_evaluate.push(true);
// 檢查這個表達式是否可以在編譯時評估
if !Self::can_evaluate(&expr) {
// 如果不能評估,標記所有父表達式也不能評估
let parent_iter = self.can_evaluate.iter_mut().rev();
for p in parent_iter {
if !*p { break; } // 優化:如果已經標記過,就不需要繼續
*p = false;
}
}
Ok(Transformed::no(expr))
}
fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
// 檢查是否可以評估這個表達式
if self.can_evaluate.pop().unwrap_or(false) {
match self.evaluate_const_expr(&expr) {
ConstSimplifyResult::Simplified(value, metadata) => {
// 成功計算出常數值
Ok(Transformed::yes(Expr::Literal(value)))
}
ConstSimplifyResult::CannotSimplify => {
// 無法簡化,保持原樣
Ok(Transformed::no(expr))
}
ConstSimplifyResult::SimplifyRuntimeError(err, _) => {
// 計算時發生錯誤,保持原樣
Ok(Transformed::no(expr))
}
}
} else {
Ok(Transformed::no(expr))
}
}
}
實際例子:
原始查詢:
SELECT name, age + 10 as future_age
FROM users
WHERE birth_year = 2024 - age
優化前:
Projection: [name, age + 10 as future_age]
Filter: birth_year = 2024 - age
TableScan: users
優化後:
Projection: [name, age + 10 as future_age]
Filter: birth_year = 2024 - age (age + 10 在執行時計算)
TableScan: users
注意:在這個例子中,age + 10 無法在編譯時計算,因為 age 是變數。但如果查詢是 SELECT 2 + 3 as five,那麼 2 + 3 會被折疊為 5。
PhysicalPlanner 的轉換過程
優化後的 LogicalPlan 需要轉換為可執行的 ExecutionPlan。PhysicalPlanner 負責這個轉換過程,就像將建築藍圖轉換為實際的施工計劃。
轉換過程的關鍵步驟:
// datafusion/core/src/physical_planner.rs
impl DefaultPhysicalPlanner {
async fn create_initial_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
// 步驟 1: 使用深度優先搜尋將樹扁平化
let mut flat_tree = vec![];
let mut dfs_visit_stack = vec![(None, logical_plan)];
while let Some((parent_index, node)) = dfs_visit_stack.pop() {
let current_index = flat_tree.len();
// 將子節點加入搜尋堆疊(注意:順序是反的)
dfs_visit_stack.extend(
node.inputs().iter().map(|&n| (Some(current_index), n))
);
// 根據子節點數量決定節點狀態
let state = match node.inputs().len() {
0 => NodeState::ZeroOrOneChild, // 葉節點
1 => NodeState::ZeroOrOneChild, // 單子節點
_ => NodeState::TwoOrMoreChildren( // 多子節點
Mutex::new(Vec::new())
),
};
let node = LogicalNode {
node,
parent_index,
state,
};
flat_tree.push(node);
}
// 步驟 2: 並行構建物理計劃
let flat_tree = Arc::new(flat_tree);
let planning_concurrency = session_state
.config_options()
.execution
.planning_concurrency;
// 從葉節點開始,並行構建物理計劃
// ... 並行構建邏輯
}
}
這段程式碼的關鍵點:
ExecutionPlan 的執行模型
DataFusion 採用基於流的執行模型,使用 SendableRecordBatchStream 來實現拉式(Pull-based)執行。這就像是一個流水線,每個工人都從上游拿取材料,處理後傳給下游。
流式執行的優勢:
// datafusion/execution/src/stream.rs
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
/// 返回這個流的 Schema
fn schema(&self) -> SchemaRef;
}
執行流程示意:
消費者調用 next()
↓
ProjectionExec 調用子節點的 next()
↓
FilterExec 調用子節點的 next()
↓
TableScanExec 從數據源讀取 RecordBatch
↓
返回處理後的 RecordBatch
今天我們深入了解了 DataFusion 查詢執行的三個關鍵階段:
這些機制共同構成了 DataFusion 高效能查詢執行的基礎。明天我們將深入探討 LogicalPlan 的設計模式,了解為什麼它採用 Enum 設計以及各個變體的具體作用。