在 Day 6 中,我們已經學習了視窗函數的基本概念、OVER 子句語法(PARTITION BY、ORDER BY、Frame)以及常見函數(ROW_NUMBER、RANK、LAG/LEAD 等)的使用方式。今天我們將深入 DataFusion 的內部實現,從原始碼層面理解視窗函數是如何執行的。
與 Day 6 的入門介紹不同,今天我們將聚焦於:
今日學習重點:
WindowAggExec
是 DataFusion 中負責執行視窗函數的物理算子。讓我們從原始碼結構開始:
// 來自 datafusion/physical-plan/src/windows/window_agg_exec.rs
#[derive(Debug, Clone)]
pub struct WindowAggExec {
/// 輸入執行計劃
pub(crate) input: Arc<dyn ExecutionPlan>,
/// 視窗表達式列表(可能包含多個視窗函數)
window_expr: Vec<Arc<dyn WindowExpr>>,
/// 輸出 Schema(原始列 + 視窗函數結果列)
schema: SchemaRef,
/// 執行指標收集器
metrics: ExecutionPlanMetricsSet,
/// 分區列的有序索引
/// 如果輸入已按某些 PARTITION BY 列排序,可以重用排序
ordered_partition_by_indices: Vec<usize>,
/// 計劃屬性快取(輸出分區、等價性等)
cache: PlanProperties,
/// 是否允許重新分區
can_repartition: bool,
}
關鍵欄位深度解析:
window_expr: Vec<Arc>
WindowExpr
traitROW_NUMBER()
和 SUM()
在同一個 OVER 中)ordered_partition_by_indices: Vec
indices = [0, 1]
get_ordered_partition_by_indices()
計算得出can_repartition: bool
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// 從輸入計劃獲取數據流
let input = self.input.execute(partition, context)?;
// 創建 WindowAggStream
let stream = Box::pin(WindowAggStream::new(
Arc::clone(&self.schema),
self.window_expr.clone(),
input,
BaselineMetrics::new(&self.metrics, partition),
self.partition_by_sort_keys()?,
self.ordered_partition_by_indices.clone(),
)?);
Ok(stream)
}
核心邏輯:
execute()
方法創建 WindowAggStream
WindowAggStream
實現 Stream
trait,負責實際的計算WindowAggStream
是視窗函數執行的核心,它管理整個計算流程。讓我們逐步拆解:
pub struct WindowAggStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
batches: Vec<RecordBatch>, // 累積的 batch
finished: bool, // 是否已完成輸入
window_expr: Vec<Arc<dyn WindowExpr>>,
partition_by_sort_keys: Vec<PhysicalSortExpr>,
baseline_metrics: BaselineMetrics,
ordered_partition_by_indices: Vec<usize>,
}
為什麼需要累積?
視窗函數的特性決定了必須看到完整的分區才能計算:
-- 例如:計算排名
SELECT
name,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;
要計算每個員工的排名,必須:
因此,WindowAggStream
會:
input
stream 讀取 RecordBatch
batches: Vec<RecordBatch>
finished = true
)當所有數據累積完畢後,進入核心計算方法 compute_aggregates()
:
fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
let _timer = self.baseline_metrics.elapsed_compute().timer();
// 步驟 1: 合併所有 batch 為一個大 batch
let batch = concat_batches(&self.input.schema(), &self.batches)?;
if batch.num_rows() == 0 {
return Ok(None);
}
// 步驟 2: 評估分區鍵,找出分區邊界
let partition_by_sort_keys = self
.ordered_partition_by_indices
.iter()
.map(|idx| self.partition_by_sort_keys[*idx]
.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<_>>>()?;
// 關鍵函數:識別分區邊界
let partition_points = evaluate_partition_ranges(
batch.num_rows(),
&partition_by_sort_keys
)?;
// ... 後續步驟
}
合併 Batch 的考量:
concat_batches()
函數evaluate_partition_ranges()
是一個關鍵函數,它通過比較連續行來找出分區邊界:
算法邏輯:
假設數據(已按 PARTITION BY department 排序):
Row 0: department = "Sales", salary = 50000
Row 1: department = "Sales", salary = 48000
Row 2: department = "Sales", salary = 52000
Row 3: department = "IT", salary = 60000 ← 分區邊界!
Row 4: department = "IT", salary = 58000
Row 5: department = "HR", salary = 45000 ← 分區邊界!
識別過程:
1. 從 Row 0 開始,記錄當前分區的起始索引
2. 逐行比較 partition_by_sort_keys 的值
3. 當發現值改變時(Row 2 → Row 3),標記分區結束
4. 記錄 Range { start: 0, end: 3 } 代表第一個分區
5. 開始新分區,重複步驟
6. 最後一個分區在數據末尾結束
輸出:
[
Range { start: 0, end: 3 }, // Sales 部門
Range { start: 3, end: 5 }, // IT 部門
Range { start: 5, end: 6 }, // HR 部門
]
實現細節:
// 簡化的概念代碼
fn evaluate_partition_ranges(
num_rows: usize,
partition_columns: &[SortColumn],
) -> Result<Vec<Range<usize>>> {
if partition_columns.is_empty() {
// 沒有 PARTITION BY,整個數據是一個分區
return Ok(vec![Range { start: 0, end: num_rows }]);
}
let mut ranges = Vec::new();
let mut start = 0;
for current in 1..num_rows {
// 比較當前行與前一行的分區鍵
if !partition_columns_equal(partition_columns, current - 1, current) {
// 分區邊界:前一個分區結束
ranges.push(Range { start, end: current });
start = current;
}
}
// 最後一個分區
ranges.push(Range { start, end: num_rows });
Ok(ranges)
}
性能特點:
找出分區邊界後,對每個分區獨立計算視窗函數:
fn compute_aggregates(&self) -> Result<Option<RecordBatch>> {
// ... 前面的步驟
// 步驟 3: 對每個分區計算視窗函數
let mut partition_results = vec![];
for partition_point in partition_points {
let length = partition_point.end - partition_point.start;
// slice() 提取當前分區的數據(零拷貝)
let partition_batch = batch.slice(partition_point.start, length);
// 計算所有視窗函數
partition_results.push(compute_window_aggregates(
&self.window_expr,
&partition_batch,
)?)
}
// 步驟 4: 合併所有分區的結果
let columns = transpose(partition_results)
.iter()
.map(|elems| concat(&elems.iter()
.map(|x| x.as_ref()).collect::<Vec<_>>()))
.collect::<Result<Vec<ArrayRef>, ArrowError>>()?;
// 步驟 5: 將視窗函數結果附加到原始列
let mut batch_columns = batch.columns().to_vec();
batch_columns.extend_from_slice(&columns);
Ok(Some(RecordBatch::try_new(
Arc::clone(&self.schema),
batch_columns,
)?))
}
關鍵函數:compute_window_aggregates()
fn compute_window_aggregates(
window_expr: &[Arc<dyn WindowExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
window_expr
.iter()
.map(|window_expr| window_expr.evaluate(batch))
.collect()
}
這個函數非常簡潔:
evaluate()
Vec<ArrayRef>
,每個元素是一個視窗函數的結果列最後,將所有分區的結果組合並附加到原始列:
原始 Batch:
| employee_id | name | department | salary |
|-------------|-------|------------|--------|
| 1 | Alice | Sales | 48000 |
| 2 | Bob | Sales | 52000 |
| 3 | Carol | IT | 58000 |
經過視窗函數計算(例如 RANK() OVER (PARTITION BY department ORDER BY salary DESC)):
分區 1 (Sales):
partition_results[0] = [2, 1] // Alice 排第2,Bob 排第1
分區 2 (IT):
partition_results[1] = [1] // Carol 排第1
合併分區結果:
window_column = concat([2, 1], [1]) = [2, 1, 1]
最終 Batch:
| employee_id | name | department | salary | rank |
|-------------|-------|------------|--------|------|
| 1 | Alice | Sales | 48000 | 2 |
| 2 | Bob | Sales | 52000 | 1 |
| 3 | Carol | IT | 58000 | 1 |
transpose() 的作用:
當有多個視窗函數時,partition_results
的結構是:
// partition_results: Vec<Vec<ArrayRef>>
[
[rank_col_partition1, sum_col_partition1], // 分區 1
[rank_col_partition2, sum_col_partition2], // 分區 2
]
// 需要轉置為:
[
[rank_col_partition1, rank_col_partition2], // 所有分區的 rank
[sum_col_partition1, sum_col_partition2], // 所有分區的 sum
]
// 然後對每個視窗函數的結果 concat
所有視窗函數都實現 WindowExpr
trait,這提供了統一的計算介面:
// 來自 datafusion/physical-expr/src/window/window_expr.rs
pub trait WindowExpr: Send + Sync {
/// 類型轉換支援
fn as_any(&self) -> &dyn Any;
/// 視窗函數名稱
fn name(&self) -> &str;
/// 返回欄位定義
fn field(&self) -> Result<FieldRef>;
/// 獲取表達式引用
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
/// PARTITION BY 表達式
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
/// ORDER BY 表達式
fn order_by(&self) -> &[PhysicalSortExpr];
/// 核心方法:評估視窗函數
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
/// 獲取 Window Frame 定義
fn get_window_frame(&self) -> &Arc<WindowFrame> {
&DEFAULT_WINDOW_FRAME
}
}
核心方法:evaluate()
這是視窗函數計算的核心,不同類型的視窗函數有不同的實現策略:
聚合視窗函數(如 SUM、AVG)使用 StandardWindowExpr
實現:
pub struct StandardWindowExpr {
expr: Arc<dyn AggregateWindowExpr>, // 底層聚合函數
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Arc<WindowFrame>, // Frame 定義
}
evaluate() 的兩種模式:
模式 1:使用 Window Frame(需要逐行計算)
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let mut evaluator = self.expr.create_evaluator()?;
let num_rows = batch.num_rows();
if evaluator.uses_window_frame() {
// 需要為每一行計算 Frame 範圍
let sort_options = self.order_by.iter().map(|o| o.options).collect();
let mut row_wise_results = vec![];
// 準備參數和 ORDER BY 列
let mut values = self.evaluate_args(batch)?;
let order_bys = get_orderby_values(self.order_by_columns(batch)?);
values.extend(order_bys);
// 創建 WindowFrameContext
let mut window_frame_ctx = WindowFrameContext::new(
Arc::clone(&self.window_frame),
sort_options
);
let mut last_range = Range { start: 0, end: 0 };
// 逐行計算
for idx in 0..num_rows {
// 計算當前行的 Frame 範圍
let range = window_frame_ctx.calculate_range(
&order_bys_ref,
&last_range,
num_rows,
idx,
)?;
// 在該範圍內評估聚合函數
let value = evaluator.evaluate(&values, &range)?;
row_wise_results.push(value);
last_range = range;
}
ScalarValue::iter_to_array(row_wise_results)
} else {
// 模式 2:不使用 Frame(後面說明)
}
}
Frame 範圍計算示例:
-- 計算 3 行移動平均
SELECT
date,
value,
AVG(value) OVER (
ORDER BY date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg
FROM data;
對於每一行,calculate_range()
會計算:
Row 0: Range { start: 0, end: 1 } // 只有當前行(沒有前 2 行)
Row 1: Range { start: 0, end: 2 } // 當前行和前 1 行
Row 2: Range { start: 0, end: 3 } // 當前行和前 2 行(完整的 3 行)
Row 3: Range { start: 1, end: 4 } // 當前行和前 2 行(滑動視窗)
Row 4: Range { start: 2, end: 5 } // 繼續滑動
然後在每個範圍內調用 evaluator.evaluate()
。
模式 2:不使用 Frame(包括排名函數)
} else if evaluator.include_rank() {
// 排名函數有特殊處理
let columns = self.order_by_columns(batch)?;
let rank_values = evaluator.evaluate_rank(&columns)?;
rank_values.to_array()
} else {
// 整個分區的聚合(例如沒有 ORDER BY 的 SUM)
let values = self.evaluate_args(batch)?;
evaluator.update_batch(&values)?;
let value = evaluator.evaluate()?;
value.to_array_of_size(batch.num_rows())
}
排名函數使用 PartitionEvaluator
模式:
// 來自 datafusion/functions-window/src/rank.rs
pub struct RankEvaluator {
state: RankState,
rank_type: RankType, // Basic, Dense, Percent
}
struct RankState {
current_group_idx: usize,
last_rank_data: Vec<ScalarValue>,
last_rank_boundary: usize,
n_rank: usize,
}
impl PartitionEvaluator for RankEvaluator {
fn evaluate_partition(
&mut self,
partition: &RecordBatch,
order_by_columns: &[ArrayRef],
) -> Result<ArrayRef> {
let num_rows = partition.num_rows();
let mut ranks: Vec<u64> = Vec::with_capacity(num_rows);
// 逐行計算排名
for i in 0..num_rows {
let current_row = get_row_at_idx(order_by_columns, i)?;
// 檢查當前行的 ORDER BY 值是否與前一行不同
if i == 0 || current_row != self.state.last_rank_data {
// 值改變,更新排名
match self.rank_type {
RankType::Basic => {
// RANK(): 跳躍排名
self.state.current_group_idx = i + 1;
}
RankType::Dense => {
// DENSE_RANK(): 密集排名
self.state.current_group_idx += 1;
}
RankType::Percent => {
// PERCENT_RANK(): 相對排名
// (rank - 1) / (total_rows - 1)
}
}
self.state.last_rank_data = current_row;
}
ranks.push(self.state.current_group_idx as u64);
}
Ok(Arc::new(UInt64Array::from(ranks)))
}
}
RANK vs DENSE_RANK 的關鍵差異:
數據(按 score 排序):
| score | RANK | DENSE_RANK |
|-------|------|------------|
| 95 | 1 | 1 |
| 90 | 2 | 2 |
| 90 | 2 | 2 | ← 並列
| 85 | 4 | 3 | ← RANK 跳到 4,DENSE_RANK 到 3
實現差異:
- RANK: current_group_idx = i + 1(使用當前行索引)
- DENSE_RANK: current_group_idx += 1(遞增組計數器)
LAG/LEAD 函數直接訪問其他行的值,實現相對簡單:
// 簡化的概念代碼
fn evaluate_lag(
batch: &RecordBatch,
column: &ArrayRef,
offset: i64,
default_value: &ScalarValue,
) -> Result<ArrayRef> {
let num_rows = batch.num_rows();
let mut result = Vec::with_capacity(num_rows);
for i in 0..num_rows {
let target_idx = (i as i64) - offset;
if target_idx >= 0 && target_idx < num_rows as i64 {
// 目標行存在
result.push(ScalarValue::try_from_array(column, target_idx as usize)?);
} else {
// 目標行不存在,使用預設值
result.push(default_value.clone());
}
}
ScalarValue::iter_to_array(result)
}
Window Frame 是視窗函數中最複雜的部分,它定義了在當前行周圍實際參與計算的行範圍。DataFusion 支援三種 Frame 模式。
// 來自 datafusion/expr/src/window_state.rs
pub enum WindowFrameContext {
Rows(Arc<WindowFrame>),
Range {
window_frame: Arc<WindowFrame>,
state: WindowFrameStateRange,
},
Groups {
window_frame: Arc<WindowFrame>,
state: WindowFrameStateGroups,
},
}
ROWS 模式最簡單,直接基於行數計算:
fn calculate_range_rows(
window_frame: &Arc<WindowFrame>,
length: usize,
idx: usize,
) -> Result<Range<usize>> {
// 計算起始位置
let start = match window_frame.start_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
// UNBOUNDED PRECEDING
0
}
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
// n PRECEDING
idx.saturating_sub(n as usize)
}
WindowFrameBound::CurrentRow => {
// CURRENT ROW
idx
}
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
// n FOLLOWING
std::cmp::min(idx + n as usize, length)
}
_ => return internal_err!("Invalid frame start"),
};
// 計算結束位置
let end = match window_frame.end_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
idx.saturating_sub(n as usize).saturating_add(1)
}
WindowFrameBound::CurrentRow => {
idx + 1
}
WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
// UNBOUNDED FOLLOWING
length
}
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
std::cmp::min(idx + n as usize + 1, length)
}
_ => return internal_err!("Invalid frame end"),
};
Ok(Range { start, end })
}
示例:ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING
數據(10 行):
Row 0: Range { start: 0, end: 2 } // 自己 + 後 1 行(沒有前 2 行)
Row 1: Range { start: 0, end: 3 } // 前 1 + 自己 + 後 1 行
Row 2: Range { start: 0, end: 4 } // 前 2 + 自己 + 後 1 行(完整)
Row 3: Range { start: 1, end: 5 } // 視窗滑動
Row 4: Range { start: 2, end: 6 }
...
Row 8: Range { start: 6, end: 10 } // 前 2 + 自己 + 後 1 行
Row 9: Range { start: 7, end: 10 } // 前 2 + 自己(沒有後 1 行)
RANGE 模式基於 ORDER BY 列的值範圍:
fn calculate_range(
&mut self,
window_frame: &Arc<WindowFrame>,
last_range: &Range<usize>,
range_columns: &[ArrayRef],
length: usize,
idx: usize,
) -> Result<Range<usize>> {
let start = match window_frame.start_bound {
WindowFrameBound::Preceding(ref n) => {
if n.is_null() {
// UNBOUNDED PRECEDING
0
} else {
// n PRECEDING:找到值 <= (current_value - n) 的第一行
self.calculate_index_of_row::<true, true>(
range_columns,
idx,
Some(n),
length,
)?
}
}
WindowFrameBound::CurrentRow => {
// 找到與當前行值相等的第一行
self.calculate_index_of_row::<true, true>(
range_columns,
idx,
None,
length,
)?
}
WindowFrameBound::Following(ref n) => {
// n FOLLOWING:找到值 >= (current_value + n) 的第一行
self.calculate_index_of_row::<true, false>(
range_columns,
idx,
Some(n),
length,
)?
}
};
// 計算 end(類似邏輯)
let end = /* ... */;
Ok(Range { start, end })
}
RANGE vs ROWS 的關鍵差異:
-- 數據
| date | value |
|------------|-------|
| 2024-01-01 | 100 |
| 2024-01-02 | 150 |
| 2024-01-02 | 120 | ← 同一天有 2 筆記錄
| 2024-01-03 | 130 |
-- ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
-- 嚴格計算 2 行
Row 0: Range(0, 1) → [100] // 只有自己
Row 1: Range(0, 2) → [100, 150] // 前 1 行 + 自己
Row 2: Range(1, 3) → [150, 120] // 前 1 行 + 自己
Row 3: Range(2, 4) → [120, 130] // 前 1 行 + 自己
-- RANGE BETWEEN 1 DAY PRECEDING AND CURRENT ROW
-- 基於日期範圍
Row 0 (2024-01-01): Range(0, 1) → [100]
Row 1 (2024-01-02): Range(0, 3) → [100, 150, 120] // 包含昨天和今天所有記錄
Row 2 (2024-01-02): Range(0, 3) → [100, 150, 120] // 同樣範圍(相同日期)
Row 3 (2024-01-03): Range(1, 4) → [150, 120, 130] // 包含昨天和今天所有記錄
RANGE 模式的適用場景:
GROUPS 模式將 ORDER BY 相同的行視為一個組:
fn calculate_range(
&mut self,
window_frame: &Arc<WindowFrame>,
range_columns: &[ArrayRef],
length: usize,
idx: usize,
) -> Result<Range<usize>> {
// 邏輯:維護一個組邊界的快取
// 當遇到新行時,檢查它是否與前一組的值相同
// 如果不同,開始新組
let delta = match window_frame.start_bound {
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => n,
_ => 0,
};
// 找到當前行所在組的開始
let current_group = self.find_group_start(range_columns, idx, length)?;
// 從當前組向前/後數 delta 個組
let target_group = current_group.saturating_sub(delta);
let start = self.group_boundaries[target_group];
// 類似計算 end
let end = /* ... */;
Ok(Range { start, end })
}
GROUPS 示例:
-- 數據(按 score 排序)
| name | score |
|-------|-------|
| Alice | 95 | ← Group 0
| Bob | 90 | ← Group 1
| Carol | 90 | ← Group 1(相同值)
| David | 85 | ← Group 2
-- GROUPS BETWEEN 1 PRECEDING AND CURRENT ROW
Row 0 (Alice, 95):
當前組: Group 0
範圍: Group 0(沒有前一組)
Range(0, 1) → [Alice]
Row 1 (Bob, 90):
當前組: Group 1
範圍: Group 0 + Group 1
Range(0, 3) → [Alice, Bob, Carol]
Row 2 (Carol, 90):
當前組: Group 1(與 Bob 同組)
範圍: Group 0 + Group 1
Range(0, 3) → [Alice, Bob, Carol] // 相同範圍
Row 3 (David, 85):
當前組: Group 2
範圍: Group 1 + Group 2
Range(1, 4) → [Bob, Carol, David]
GROUPS 的關鍵特性:
優化原理:
如果輸入數據已經按照 PARTITION BY + ORDER BY
排序,WindowAggExec 可以直接使用,避免重新排序。
實現機制:
// 在創建 WindowAggExec 時
let ordered_partition_by_indices =
get_ordered_partition_by_indices(
window_expr[0].partition_by(),
&input
)?;
get_ordered_partition_by_indices()
檢查:
input.output_ordering()
)效果:
場景:
SELECT
name,
ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) as rn,
RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rank,
SUM(salary) OVER (PARTITION BY dept ORDER BY salary DESC) as cumsum
FROM employees;
優化策略:
相同視窗合併:
共享 ORDER BY 評估:
ORDER BY salary DESC
只需評估一次向量化計算:
實現:
fn compute_window_aggregates(
window_expr: &[Arc<dyn WindowExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
// 一次遍歷評估所有視窗函數
window_expr
.iter()
.map(|expr| expr.evaluate(batch))
.collect()
}
挑戰:視窗函數需要累積完整分區,可能導致記憶體壓力。
DataFusion 的策略:
流式累積:
// WindowAggStream 實現 Stream trait
// 逐批接收數據,累積到內部 Vec
impl Stream for WindowAggStream {
fn poll_next(...) -> Poll<Option<Result<RecordBatch>>> {
if !self.finished {
// 持續讀取輸入
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.batches.push(batch);
// 繼續輸入
cx.waker().wake_by_ref();
Poll::Pending
}
None => {
self.finished = true;
// 輸入完成,開始計算
}
}
} else {
// 計算並返回結果
Poll::Ready(self.compute_aggregates()?)
}
}
}
批次處理:
零拷貝操作:
concat_batches()
對於不可變陣列是零拷貝batch.slice()
提取分區數據也是零拷貝分區並行:
增量計算:
Bounded Window:
BoundedWindowAggExec
讓我們透過 EXPLAIN 看看視窗函數的執行計劃:
EXPLAIN SELECT
department,
employee_id,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank,
AVG(salary) OVER (PARTITION BY department) as dept_avg
FROM employees;
執行計劃:
ProjectionExec: exprs=[department, employee_id, salary, rank, dept_avg]
WindowAggExec: wdw=[
RANK() OVER (PARTITION BY department ORDER BY salary DESC),
AVG(salary) OVER (PARTITION BY department)
]
SortExec: expr=[department ASC, salary DESC]
TableScan: employees
計劃解讀:
TableScan:讀取 employees 表
SortExec:
department ASC, salary DESC
排序WindowAggExec:
ProjectionExec:
性能提示:
今天我們深入探討了 DataFusion 視窗函數的執行機制,從原始碼層面完整剖析了視窗函數的實現:
核心執行流程:
evaluate_partition_ranges()
找出邊界window_expr.evaluate()
Window Frame 計算三種模式:
不同視窗函數的實現策略:
性能優化技術:
實踐價值:
理解這些底層實現後,我們能夠:
視窗函數是 DataFusion 中最複雜的算子之一,但透過深入理解其執行機制,我們可以充分發揮其威力,解決各種複雜的分析問題。
明天我們將探討 並行執行與分區策略,了解 DataFusion 如何利用多核心並行處理數據,以及 RepartitionExec 如何在不同的分區策略(RoundRobin、Hash、Range)之間進行選擇和優化。