iT邦幫忙

2025 iThome 鐵人賽

DAY 25
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 25

Day 25: Window Functions - 視窗函數執行機制深度解析

  • 分享至 

  • xImage
  •  

前言

在 Day 6 中,我們已經學習了視窗函數的基本概念、OVER 子句語法(PARTITION BY、ORDER BY、Frame)以及常見函數(ROW_NUMBER、RANK、LAG/LEAD 等)的使用方式。今天我們將深入 DataFusion 的內部實現,從原始碼層面理解視窗函數是如何執行的。

與 Day 6 的入門介紹不同,今天我們將聚焦於:

  • WindowAggExec 的完整執行流程:從物理計劃到實際計算的每一步
  • 分區識別算法:如何高效找出分區邊界
  • Window Frame 計算機制:ROWS/RANGE/GROUPS 三種模式的底層實現
  • WindowExpr 的評估策略:不同類型視窗函數的計算方法差異
  • 性能優化技術:排序重用、記憶體管理等優化手段

今日學習重點

  • WindowAggExec 和 WindowAggStream 的原始碼結構
  • evaluate_partition_ranges() 分區識別算法
  • WindowFrameContext 的 Frame 邊界計算
  • StandardWindowExpr 和 Rank 的評估策略
  • 視窗函數的記憶體管理與批處理機制

WindowAggExec 原始碼結構解析

核心數據結構

WindowAggExec 是 DataFusion 中負責執行視窗函數的物理算子。讓我們從原始碼結構開始:

// 來自 datafusion/physical-plan/src/windows/window_agg_exec.rs

#[derive(Debug, Clone)]
pub struct WindowAggExec {
    /// 輸入執行計劃
    pub(crate) input: Arc<dyn ExecutionPlan>,
    
    /// 視窗表達式列表(可能包含多個視窗函數)
    window_expr: Vec<Arc<dyn WindowExpr>>,
    
    /// 輸出 Schema(原始列 + 視窗函數結果列)
    schema: SchemaRef,
    
    /// 執行指標收集器
    metrics: ExecutionPlanMetricsSet,
    
    /// 分區列的有序索引
    /// 如果輸入已按某些 PARTITION BY 列排序,可以重用排序
    ordered_partition_by_indices: Vec<usize>,
    
    /// 計劃屬性快取(輸出分區、等價性等)
    cache: PlanProperties,
    
    /// 是否允許重新分區
    can_repartition: bool,
}

關鍵欄位深度解析

  1. window_expr: Vec<Arc>

    • 存儲所有視窗函數表達式
    • 每個表達式實現 WindowExpr trait
    • 支援同時計算多個視窗函數(例如 ROW_NUMBER()SUM() 在同一個 OVER 中)
  2. ordered_partition_by_indices: Vec

    • 這是一個排序優化的核心欄位
    • 記錄哪些 PARTITION BY 列在輸入中已經有序
    • 例如:PARTITION BY (a, b, c),如果輸入已按 (a, b) 排序,則 indices = [0, 1]
    • get_ordered_partition_by_indices() 計算得出
    • 後續計算可以利用已有排序,避免重新排序
  3. can_repartition: bool

    • 控制是否可以重新分區數據
    • 對於大數據集,可能需要先重新分區再計算視窗函數
    • 與並行處理策略相關

執行入口:execute() 方法

fn execute(
    &self,
    partition: usize,
    context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
    // 從輸入計劃獲取數據流
    let input = self.input.execute(partition, context)?;
    
    // 創建 WindowAggStream
    let stream = Box::pin(WindowAggStream::new(
        Arc::clone(&self.schema),
        self.window_expr.clone(),
        input,
        BaselineMetrics::new(&self.metrics, partition),
        self.partition_by_sort_keys()?,
        self.ordered_partition_by_indices.clone(),
    )?);
    
    Ok(stream)
}

核心邏輯

  • execute() 方法創建 WindowAggStream
  • WindowAggStream 實現 Stream trait,負責實際的計算
  • 整個過程是異步流式的,但內部需要累積完整分區才能計算

WindowAggStream:執行的四個階段

WindowAggStream 是視窗函數執行的核心,它管理整個計算流程。讓我們逐步拆解:

階段 1:數據累積(Buffering)

pub struct WindowAggStream {
    schema: SchemaRef,
    input: SendableRecordBatchStream,
    batches: Vec<RecordBatch>,      // 累積的 batch
    finished: bool,                  // 是否已完成輸入
    window_expr: Vec<Arc<dyn WindowExpr>>,
    partition_by_sort_keys: Vec<PhysicalSortExpr>,
    baseline_metrics: BaselineMetrics,
    ordered_partition_by_indices: Vec<usize>,
}

為什麼需要累積?

視窗函數的特性決定了必須看到完整的分區才能計算:

-- 例如:計算排名
SELECT 
    name,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;

要計算每個員工的排名,必須:

  1. 看到該部門的所有員工(完整分區)
  2. 按薪資排序
  3. 然後才能分配排名

因此,WindowAggStream 會:

  • 持續從 input stream 讀取 RecordBatch
  • 將它們累積到 batches: Vec<RecordBatch>
  • 直到輸入流結束(finished = true
  • 然後才開始計算

階段 2:Batch 合併與分區識別

當所有數據累積完畢後,進入核心計算方法 compute_aggregates()

fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
    let _timer = self.baseline_metrics.elapsed_compute().timer();

    // 步驟 1: 合併所有 batch 為一個大 batch
    let batch = concat_batches(&self.input.schema(), &self.batches)?;
    if batch.num_rows() == 0 {
        return Ok(None);
    }

    // 步驟 2: 評估分區鍵,找出分區邊界
    let partition_by_sort_keys = self
        .ordered_partition_by_indices
        .iter()
        .map(|idx| self.partition_by_sort_keys[*idx]
            .evaluate_to_sort_column(&batch))
        .collect::<Result<Vec<_>>>()?;
    
    // 關鍵函數:識別分區邊界
    let partition_points = evaluate_partition_ranges(
        batch.num_rows(), 
        &partition_by_sort_keys
    )?;

    // ... 後續步驟
}

合併 Batch 的考量

  • 使用 Arrow 的 concat_batches() 函數
  • 這是一個零拷貝操作(對於不可變數組)
  • 合併後的單一 batch 便於後續處理

分區邊界識別算法

evaluate_partition_ranges() 是一個關鍵函數,它通過比較連續行來找出分區邊界:

算法邏輯

假設數據(已按 PARTITION BY department 排序):
Row 0: department = "Sales",  salary = 50000
Row 1: department = "Sales",  salary = 48000  
Row 2: department = "Sales",  salary = 52000
Row 3: department = "IT",     salary = 60000  ← 分區邊界!
Row 4: department = "IT",     salary = 58000
Row 5: department = "HR",     salary = 45000  ← 分區邊界!

識別過程:
1. 從 Row 0 開始,記錄當前分區的起始索引
2. 逐行比較 partition_by_sort_keys 的值
3. 當發現值改變時(Row 2 → Row 3),標記分區結束
4. 記錄 Range { start: 0, end: 3 } 代表第一個分區
5. 開始新分區,重複步驟
6. 最後一個分區在數據末尾結束

輸出:
[
    Range { start: 0, end: 3 },   // Sales 部門
    Range { start: 3, end: 5 },   // IT 部門  
    Range { start: 5, end: 6 },   // HR 部門
]

實現細節

// 簡化的概念代碼
fn evaluate_partition_ranges(
    num_rows: usize,
    partition_columns: &[SortColumn],
) -> Result<Vec<Range<usize>>> {
    if partition_columns.is_empty() {
        // 沒有 PARTITION BY,整個數據是一個分區
        return Ok(vec![Range { start: 0, end: num_rows }]);
    }

    let mut ranges = Vec::new();
    let mut start = 0;

    for current in 1..num_rows {
        // 比較當前行與前一行的分區鍵
        if !partition_columns_equal(partition_columns, current - 1, current) {
            // 分區邊界:前一個分區結束
            ranges.push(Range { start, end: current });
            start = current;
        }
    }
    
    // 最後一個分區
    ranges.push(Range { start, end: num_rows });
    
    Ok(ranges)
}

性能特點

  • 時間複雜度:O(n),只需一次遍歷
  • 前提:輸入必須已按 PARTITION BY 列排序
  • 比較操作使用 Arrow 的向量化比較

階段 3:分區內計算

找出分區邊界後,對每個分區獨立計算視窗函數:

fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
    // ... 前面的步驟

    // 步驟 3: 對每個分區計算視窗函數
    let mut partition_results = vec![];
    
    for partition_point in partition_points {
        let length = partition_point.end - partition_point.start;
        
        // slice() 提取當前分區的數據(零拷貝)
        let partition_batch = batch.slice(partition_point.start, length);
        
        // 計算所有視窗函數
        partition_results.push(compute_window_aggregates(
            &self.window_expr,
            &partition_batch,
        )?)
    }

    // 步驟 4: 合併所有分區的結果
    let columns = transpose(partition_results)
        .iter()
        .map(|elems| concat(&elems.iter()
            .map(|x| x.as_ref()).collect::<Vec<_>>()))
        .collect::<Result<Vec<ArrayRef>, ArrowError>>()?;

    // 步驟 5: 將視窗函數結果附加到原始列
    let mut batch_columns = batch.columns().to_vec();
    batch_columns.extend_from_slice(&columns);
    
    Ok(Some(RecordBatch::try_new(
        Arc::clone(&self.schema),
        batch_columns,
    )?))
}

關鍵函數:compute_window_aggregates()

fn compute_window_aggregates(
    window_expr: &[Arc<dyn WindowExpr>],
    batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
    window_expr
        .iter()
        .map(|window_expr| window_expr.evaluate(batch))
        .collect()
}

這個函數非常簡潔:

  • 對每個視窗函數調用 evaluate()
  • 返回一個 Vec<ArrayRef>,每個元素是一個視窗函數的結果列
  • 結果列的長度 = 輸入分區的行數(保持行數不變)

階段 4:結果組合

最後,將所有分區的結果組合並附加到原始列:

原始 Batch:
| employee_id | name  | department | salary |
|-------------|-------|------------|--------|
| 1           | Alice | Sales      | 48000  |
| 2           | Bob   | Sales      | 52000  |
| 3           | Carol | IT         | 58000  |

經過視窗函數計算(例如 RANK() OVER (PARTITION BY department ORDER BY salary DESC)):

分區 1 (Sales):
partition_results[0] = [2, 1]  // Alice 排第2,Bob 排第1

分區 2 (IT):  
partition_results[1] = [1]     // Carol 排第1

合併分區結果:
window_column = concat([2, 1], [1]) = [2, 1, 1]

最終 Batch:
| employee_id | name  | department | salary | rank |
|-------------|-------|------------|--------|------|
| 1           | Alice | Sales      | 48000  | 2    |
| 2           | Bob   | Sales      | 52000  | 1    |
| 3           | Carol | IT         | 58000  | 1    |

transpose() 的作用

當有多個視窗函數時,partition_results 的結構是:

// partition_results: Vec<Vec<ArrayRef>>
[
    [rank_col_partition1, sum_col_partition1],  // 分區 1
    [rank_col_partition2, sum_col_partition2],  // 分區 2
]

// 需要轉置為:
[
    [rank_col_partition1, rank_col_partition2],  // 所有分區的 rank
    [sum_col_partition1, sum_col_partition2],    // 所有分區的 sum
]

// 然後對每個視窗函數的結果 concat

WindowExpr Trait:視窗函數的統一介面

所有視窗函數都實現 WindowExpr trait,這提供了統一的計算介面:

// 來自 datafusion/physical-expr/src/window/window_expr.rs

pub trait WindowExpr: Send + Sync {
    /// 類型轉換支援
    fn as_any(&self) -> &dyn Any;
    
    /// 視窗函數名稱
    fn name(&self) -> &str;
    
    /// 返回欄位定義
    fn field(&self) -> Result<FieldRef>;
    
    /// 獲取表達式引用
    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
    
    /// PARTITION BY 表達式
    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
    
    /// ORDER BY 表達式  
    fn order_by(&self) -> &[PhysicalSortExpr];
    
    /// 核心方法:評估視窗函數
    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
    
    /// 獲取 Window Frame 定義
    fn get_window_frame(&self) -> &Arc<WindowFrame> {
        &DEFAULT_WINDOW_FRAME
    }
}

核心方法:evaluate()

這是視窗函數計算的核心,不同類型的視窗函數有不同的實現策略:

1. 聚合視窗函數(StandardWindowExpr)

聚合視窗函數(如 SUM、AVG)使用 StandardWindowExpr 實現:

pub struct StandardWindowExpr {
    expr: Arc<dyn AggregateWindowExpr>,  // 底層聚合函數
    partition_by: Vec<Arc<dyn PhysicalExpr>>,
    order_by: Vec<PhysicalSortExpr>,
    window_frame: Arc<WindowFrame>,      // Frame 定義
}

evaluate() 的兩種模式

模式 1:使用 Window Frame(需要逐行計算)

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
    let mut evaluator = self.expr.create_evaluator()?;
    let num_rows = batch.num_rows();
    
    if evaluator.uses_window_frame() {
        // 需要為每一行計算 Frame 範圍
        let sort_options = self.order_by.iter().map(|o| o.options).collect();
        let mut row_wise_results = vec![];

        // 準備參數和 ORDER BY 列
        let mut values = self.evaluate_args(batch)?;
        let order_bys = get_orderby_values(self.order_by_columns(batch)?);
        values.extend(order_bys);

        // 創建 WindowFrameContext
        let mut window_frame_ctx = WindowFrameContext::new(
            Arc::clone(&self.window_frame), 
            sort_options
        );
        let mut last_range = Range { start: 0, end: 0 };
        
        // 逐行計算
        for idx in 0..num_rows {
            // 計算當前行的 Frame 範圍
            let range = window_frame_ctx.calculate_range(
                &order_bys_ref,
                &last_range,
                num_rows,
                idx,
            )?;
            
            // 在該範圍內評估聚合函數
            let value = evaluator.evaluate(&values, &range)?;
            row_wise_results.push(value);
            last_range = range;
        }
        
        ScalarValue::iter_to_array(row_wise_results)
    } else {
        // 模式 2:不使用 Frame(後面說明)
    }
}

Frame 範圍計算示例

-- 計算 3 行移動平均
SELECT 
    date,
    value,
    AVG(value) OVER (
        ORDER BY date 
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) as moving_avg
FROM data;

對於每一行,calculate_range() 會計算:

Row 0: Range { start: 0, end: 1 }   // 只有當前行(沒有前 2 行)
Row 1: Range { start: 0, end: 2 }   // 當前行和前 1 行
Row 2: Range { start: 0, end: 3 }   // 當前行和前 2 行(完整的 3 行)
Row 3: Range { start: 1, end: 4 }   // 當前行和前 2 行(滑動視窗)
Row 4: Range { start: 2, end: 5 }   // 繼續滑動

然後在每個範圍內調用 evaluator.evaluate()

模式 2:不使用 Frame(包括排名函數)

} else if evaluator.include_rank() {
    // 排名函數有特殊處理
    let columns = self.order_by_columns(batch)?;
    let rank_values = evaluator.evaluate_rank(&columns)?;
    rank_values.to_array()
} else {
    // 整個分區的聚合(例如沒有 ORDER BY 的 SUM)
    let values = self.evaluate_args(batch)?;
    evaluator.update_batch(&values)?;
    let value = evaluator.evaluate()?;
    value.to_array_of_size(batch.num_rows())
}

2. 排名函數(Rank、DenseRank)

排名函數使用 PartitionEvaluator 模式:

// 來自 datafusion/functions-window/src/rank.rs

pub struct RankEvaluator {
    state: RankState,
    rank_type: RankType,  // Basic, Dense, Percent
}

struct RankState {
    current_group_idx: usize,
    last_rank_data: Vec<ScalarValue>,
    last_rank_boundary: usize,
    n_rank: usize,
}

impl PartitionEvaluator for RankEvaluator {
    fn evaluate_partition(
        &mut self,
        partition: &RecordBatch,
        order_by_columns: &[ArrayRef],
    ) -> Result<ArrayRef> {
        let num_rows = partition.num_rows();
        let mut ranks: Vec<u64> = Vec::with_capacity(num_rows);

        // 逐行計算排名
        for i in 0..num_rows {
            let current_row = get_row_at_idx(order_by_columns, i)?;
            
            // 檢查當前行的 ORDER BY 值是否與前一行不同
            if i == 0 || current_row != self.state.last_rank_data {
                // 值改變,更新排名
                match self.rank_type {
                    RankType::Basic => {
                        // RANK(): 跳躍排名
                        self.state.current_group_idx = i + 1;
                    }
                    RankType::Dense => {
                        // DENSE_RANK(): 密集排名
                        self.state.current_group_idx += 1;
                    }
                    RankType::Percent => {
                        // PERCENT_RANK(): 相對排名
                        // (rank - 1) / (total_rows - 1)
                    }
                }
                self.state.last_rank_data = current_row;
            }
            
            ranks.push(self.state.current_group_idx as u64);
        }

        Ok(Arc::new(UInt64Array::from(ranks)))
    }
}

RANK vs DENSE_RANK 的關鍵差異

數據(按 score 排序):
| score | RANK | DENSE_RANK |
|-------|------|------------|
| 95    | 1    | 1          |
| 90    | 2    | 2          |
| 90    | 2    | 2          | ← 並列
| 85    | 4    | 3          | ← RANK 跳到 4,DENSE_RANK 到 3

實現差異:
- RANK: current_group_idx = i + 1(使用當前行索引)
- DENSE_RANK: current_group_idx += 1(遞增組計數器)

3. LAG/LEAD 函數

LAG/LEAD 函數直接訪問其他行的值,實現相對簡單:

// 簡化的概念代碼
fn evaluate_lag(
    batch: &RecordBatch,
    column: &ArrayRef,
    offset: i64,
    default_value: &ScalarValue,
) -> Result<ArrayRef> {
    let num_rows = batch.num_rows();
    let mut result = Vec::with_capacity(num_rows);

    for i in 0..num_rows {
        let target_idx = (i as i64) - offset;
        
        if target_idx >= 0 && target_idx < num_rows as i64 {
            // 目標行存在
            result.push(ScalarValue::try_from_array(column, target_idx as usize)?);
        } else {
            // 目標行不存在,使用預設值
            result.push(default_value.clone());
        }
    }

    ScalarValue::iter_to_array(result)
}

Window Frame 計算機制深度解析

Window Frame 是視窗函數中最複雜的部分,它定義了在當前行周圍實際參與計算的行範圍。DataFusion 支援三種 Frame 模式。

WindowFrameContext 結構

// 來自 datafusion/expr/src/window_state.rs

pub enum WindowFrameContext {
    Rows(Arc<WindowFrame>),
    Range {
        window_frame: Arc<WindowFrame>,
        state: WindowFrameStateRange,
    },
    Groups {
        window_frame: Arc<WindowFrame>,
        state: WindowFrameStateGroups,
    },
}

模式 1:ROWS(基於物理行)

ROWS 模式最簡單,直接基於行數計算:

fn calculate_range_rows(
    window_frame: &Arc<WindowFrame>,
    length: usize,
    idx: usize,
) -> Result<Range<usize>> {
    // 計算起始位置
    let start = match window_frame.start_bound {
        WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
            // UNBOUNDED PRECEDING
            0
        }
        WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
            // n PRECEDING
            idx.saturating_sub(n as usize)
        }
        WindowFrameBound::CurrentRow => {
            // CURRENT ROW
            idx
        }
        WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
            // n FOLLOWING
            std::cmp::min(idx + n as usize, length)
        }
        _ => return internal_err!("Invalid frame start"),
    };

    // 計算結束位置
    let end = match window_frame.end_bound {
        WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
            idx.saturating_sub(n as usize).saturating_add(1)
        }
        WindowFrameBound::CurrentRow => {
            idx + 1
        }
        WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
            // UNBOUNDED FOLLOWING
            length
        }
        WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
            std::cmp::min(idx + n as usize + 1, length)
        }
        _ => return internal_err!("Invalid frame end"),
    };

    Ok(Range { start, end })
}

示例:ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING

數據(10 行):
Row 0:  Range { start: 0, end: 2 }   // 自己 + 後 1 行(沒有前 2 行)
Row 1:  Range { start: 0, end: 3 }   // 前 1 + 自己 + 後 1 行
Row 2:  Range { start: 0, end: 4 }   // 前 2 + 自己 + 後 1 行(完整)
Row 3:  Range { start: 1, end: 5 }   // 視窗滑動
Row 4:  Range { start: 2, end: 6 }
...
Row 8:  Range { start: 6, end: 10 }  // 前 2 + 自己 + 後 1 行
Row 9:  Range { start: 7, end: 10 }  // 前 2 + 自己(沒有後 1 行)

模式 2:RANGE(基於值範圍)

RANGE 模式基於 ORDER BY 列的值範圍:

fn calculate_range(
    &mut self,
    window_frame: &Arc<WindowFrame>,
    last_range: &Range<usize>,
    range_columns: &[ArrayRef],
    length: usize,
    idx: usize,
) -> Result<Range<usize>> {
    let start = match window_frame.start_bound {
        WindowFrameBound::Preceding(ref n) => {
            if n.is_null() {
                // UNBOUNDED PRECEDING
                0
            } else {
                // n PRECEDING:找到值 <= (current_value - n) 的第一行
                self.calculate_index_of_row::<true, true>(
                    range_columns,
                    idx,
                    Some(n),
                    length,
                )?
            }
        }
        WindowFrameBound::CurrentRow => {
            // 找到與當前行值相等的第一行
            self.calculate_index_of_row::<true, true>(
                range_columns,
                idx,
                None,
                length,
            )?
        }
        WindowFrameBound::Following(ref n) => {
            // n FOLLOWING:找到值 >= (current_value + n) 的第一行
            self.calculate_index_of_row::<true, false>(
                range_columns,
                idx,
                Some(n),
                length,
            )?
        }
    };

    // 計算 end(類似邏輯)
    let end = /* ... */;

    Ok(Range { start, end })
}

RANGE vs ROWS 的關鍵差異

-- 數據
| date       | value |
|------------|-------|
| 2024-01-01 | 100   |
| 2024-01-02 | 150   |
| 2024-01-02 | 120   |  ← 同一天有 2 筆記錄
| 2024-01-03 | 130   |

-- ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
-- 嚴格計算 2 行
Row 0: Range(0, 1) → [100]           // 只有自己
Row 1: Range(0, 2) → [100, 150]      // 前 1 行 + 自己
Row 2: Range(1, 3) → [150, 120]      // 前 1 行 + 自己
Row 3: Range(2, 4) → [120, 130]      // 前 1 行 + 自己

-- RANGE BETWEEN 1 DAY PRECEDING AND CURRENT ROW
-- 基於日期範圍
Row 0 (2024-01-01): Range(0, 1) → [100]
Row 1 (2024-01-02): Range(0, 3) → [100, 150, 120]  // 包含昨天和今天所有記錄
Row 2 (2024-01-02): Range(0, 3) → [100, 150, 120]  // 同樣範圍(相同日期)
Row 3 (2024-01-03): Range(1, 4) → [150, 120, 130]  // 包含昨天和今天所有記錄

RANGE 模式的適用場景

  • 時間序列分析(過去 N 天/小時的數據)
  • 價格範圍查詢(價格相差不超過 X 的記錄)
  • 需要處理並列值的情況

模式 3:GROUPS(基於分組)

GROUPS 模式將 ORDER BY 相同的行視為一個組:

fn calculate_range(
    &mut self,
    window_frame: &Arc<WindowFrame>,
    range_columns: &[ArrayRef],
    length: usize,
    idx: usize,
) -> Result<Range<usize>> {
    // 邏輯:維護一個組邊界的快取
    // 當遇到新行時,檢查它是否與前一組的值相同
    // 如果不同,開始新組
    
    let delta = match window_frame.start_bound {
        WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => n,
        _ => 0,
    };

    // 找到當前行所在組的開始
    let current_group = self.find_group_start(range_columns, idx, length)?;
    
    // 從當前組向前/後數 delta 個組
    let target_group = current_group.saturating_sub(delta);
    let start = self.group_boundaries[target_group];

    // 類似計算 end
    let end = /* ... */;

    Ok(Range { start, end })
}

GROUPS 示例

-- 數據(按 score 排序)
| name  | score |
|-------|-------|
| Alice | 95    |  ← Group 0
| Bob   | 90    |  ← Group 1
| Carol | 90    |  ← Group 1(相同值)
| David | 85    |  ← Group 2

-- GROUPS BETWEEN 1 PRECEDING AND CURRENT ROW

Row 0 (Alice, 95):
  當前組: Group 0
  範圍: Group 0(沒有前一組)
  Range(0, 1) → [Alice]

Row 1 (Bob, 90):
  當前組: Group 1
  範圍: Group 0 + Group 1
  Range(0, 3) → [Alice, Bob, Carol]

Row 2 (Carol, 90):
  當前組: Group 1(與 Bob 同組)
  範圍: Group 0 + Group 1
  Range(0, 3) → [Alice, Bob, Carol]  // 相同範圍

Row 3 (David, 85):
  當前組: Group 2
  範圍: Group 1 + Group 2
  Range(1, 4) → [Bob, Carol, David]

GROUPS 的關鍵特性

  • 相同 ORDER BY 值的行屬於同一組
  • Frame 邊界以組為單位
  • 適用於處理並列情況的排名分析

性能優化技術

1. 排序重用(Sort Reuse)

優化原理

如果輸入數據已經按照 PARTITION BY + ORDER BY 排序,WindowAggExec 可以直接使用,避免重新排序。

實現機制

// 在創建 WindowAggExec 時
let ordered_partition_by_indices = 
    get_ordered_partition_by_indices(
        window_expr[0].partition_by(), 
        &input
    )?;

get_ordered_partition_by_indices() 檢查:

  1. 輸入的排序屬性(input.output_ordering()
  2. 視窗函數的 PARTITION BY 列
  3. 返回哪些列已經有序

效果

  • 如果全部有序:跳過排序步驟,直接進入分區識別
  • 如果部分有序:可以利用部分排序,減少排序開銷
  • 對於大數據集,可節省 20-40% 的執行時間

2. 多視窗函數優化

場景

SELECT 
    name,
    ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) as rn,
    RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rank,
    SUM(salary) OVER (PARTITION BY dept ORDER BY salary DESC) as cumsum
FROM employees;

優化策略

  1. 相同視窗合併

    • 三個函數有相同的 PARTITION BY 和 ORDER BY
    • 只需一次排序和分區識別
    • 在同一次遍歷中計算所有函數
  2. 共享 ORDER BY 評估

    • ORDER BY salary DESC 只需評估一次
    • 所有函數共享排序列的結果
  3. 向量化計算

    • 對於每個分區,批次評估所有函數
    • 減少函數調用開銷

實現

fn compute_window_aggregates(
    window_expr: &[Arc<dyn WindowExpr>],
    batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
    // 一次遍歷評估所有視窗函數
    window_expr
        .iter()
        .map(|expr| expr.evaluate(batch))
        .collect()
}

3. 記憶體管理

挑戰:視窗函數需要累積完整分區,可能導致記憶體壓力。

DataFusion 的策略

  1. 流式累積

    // WindowAggStream 實現 Stream trait
    // 逐批接收數據,累積到內部 Vec
    impl Stream for WindowAggStream {
        fn poll_next(...) -> Poll<Option<Result<RecordBatch>>> {
            if !self.finished {
                // 持續讀取輸入
                match ready!(self.input.poll_next_unpin(cx)) {
                    Some(Ok(batch)) => {
                        self.batches.push(batch);
                        // 繼續輸入
                        cx.waker().wake_by_ref();
                        Poll::Pending
                    }
                    None => {
                        self.finished = true;
                        // 輸入完成,開始計算
                    }
                }
            } else {
                // 計算並返回結果
                Poll::Ready(self.compute_aggregates()?)
            }
        }
    }
    
  2. 批次處理

    • 輸入以 RecordBatch 為單位
    • 預設 batch 大小 8192 行
    • 平衡記憶體使用和計算效率
  3. 零拷貝操作

    • concat_batches() 對於不可變陣列是零拷貝
    • batch.slice() 提取分區數據也是零拷貝
    • 減少記憶體分配和拷貝

4. 未來優化方向

分區並行

  • 當前實現:分區串行計算
  • 潛在優化:不同分區可以並行處理
  • 需要考慮記憶體和 CPU 資源平衡

增量計算

  • 當前:需要完整分區
  • 潛在優化:對於某些函數(如 ROW_NUMBER),可以流式計算
  • 需要複雜的狀態管理

Bounded Window

  • DataFusion 正在開發 BoundedWindowAggExec
  • 對於有限 Frame 的視窗函數,可以流式處理
  • 不需要累積完整分區

實戰案例:EXPLAIN 分析視窗函數

讓我們透過 EXPLAIN 看看視窗函數的執行計劃:

EXPLAIN SELECT 
    department,
    employee_id,
    salary,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank,
    AVG(salary) OVER (PARTITION BY department) as dept_avg
FROM employees;

執行計劃

ProjectionExec: exprs=[department, employee_id, salary, rank, dept_avg]
  WindowAggExec: wdw=[
    RANK() OVER (PARTITION BY department ORDER BY salary DESC),
    AVG(salary) OVER (PARTITION BY department)
  ]
    SortExec: expr=[department ASC, salary DESC]
      TableScan: employees

計劃解讀

  1. TableScan:讀取 employees 表

  2. SortExec

    • department ASC, salary DESC 排序
    • 這是為 WindowAggExec 準備的
    • PARTITION BY department 需要分組
    • ORDER BY salary DESC 需要排序
  3. WindowAggExec

    • 計算兩個視窗函數
    • 第一個函數需要 ORDER BY,第二個不需要
    • 但它們共享相同的 PARTITION BY
  4. ProjectionExec

    • 選擇需要的列
    • 視窗函數結果已附加到 batch

性能提示

  • 如果源數據已按 department 分區,可以跳過部分排序
  • 如果有索引支援排序,可以進一步優化

小結

今天我們深入探討了 DataFusion 視窗函數的執行機制,從原始碼層面完整剖析了視窗函數的實現:

核心執行流程

  1. 數據累積:WindowAggStream 累積完整分區數據
  2. 分區識別:通過 evaluate_partition_ranges() 找出邊界
  3. 分區內計算:對每個分區調用 window_expr.evaluate()
  4. 結果組合:將視窗函數結果附加到原始列

Window Frame 計算三種模式

  • ROWS:基於物理行位置,計算簡單高效,適合固定行數的移動視窗
  • RANGE:基於值範圍,適合時間序列分析和處理並列值
  • GROUPS:基於 ORDER BY 分組,適合處理並列情況的排名分析

不同視窗函數的實現策略

  • 聚合函數(StandardWindowExpr):使用 WindowFrameContext 計算 Frame 範圍,逐行評估
  • 排名函數(Rank/DenseRank):通過比較連續行的 ORDER BY 值分配排名,RANK 跳號而 DENSE_RANK 不跳號
  • 偏移函數(LAG/LEAD):直接訪問指定偏移量的行,實現簡單直接

性能優化技術

  • 排序重用:利用已有排序,避免重複排序,可節省 20-40% 執行時間
  • 多視窗函數合併:相同視窗定義的函數一次計算,共享 ORDER BY 評估
  • 零拷貝操作:批次合併和分區提取都盡量避免數據拷貝,降低記憶體開銷

實踐價值

理解這些底層實現後,我們能夠:

  • 編寫更高效的視窗函數查詢,選擇合適的 Frame 模式
  • 透過 EXPLAIN 理解查詢計劃,診斷性能瓶頸
  • 針對性地優化視窗函數性能,例如利用已有排序或合併相同視窗
  • 在設計數據模型時考慮視窗函數的執行特性

視窗函數是 DataFusion 中最複雜的算子之一,但透過深入理解其執行機制,我們可以充分發揮其威力,解決各種複雜的分析問題。

明天我們將探討 並行執行與分區策略,了解 DataFusion 如何利用多核心並行處理數據,以及 RepartitionExec 如何在不同的分區策略(RoundRobin、Hash、Range)之間進行選擇和優化。

參考資料

  1. DataFusion Window Functions 文件
  2. User Defined Window Functions in DataFusion
  3. DataFusion 原始碼:
  4. Window Functions in SQL:2011
  5. PostgreSQL Window Functions Internals

上一篇
Day 24: 數據源整合 Part 2 - Parquet 讀取的實作
下一篇
Day 26: 並行執行與分區策略 - DataFusion 的性能倍增器
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言