在昨天的文章中,我們深入探討了 DataFusion 聚合算子的核心機制並理解聚合操作如何透過狀態管理和 Hash Table 來處理分組數據。然而在特定場景中,純粹的 Hash Aggregation 可能面臨記憶體壓力、無法利用已有排序等問題。今天,我們將探討:
Hash Aggregation 是昨天我們探討的方法,它的核心思想是使用 Hash Table 來管理分組狀態:
輸入數據流:
department | salary
-----------+--------
IT | 80000
Sales | 60000
IT | 90000
HR | 70000
Sales | 65000
Hash Aggregation 過程:
1. 為每行計算 hash(department)
IT → hash = 12345 → bucket 0
Sales → hash = 67890 → bucket 1
HR → hash = 11111 → bucket 2
2. 在 Hash Table 中查找或創建分組
Hash Table:
bucket 0: IT → {sum: 0, count: 0}
bucket 1: Sales → {sum: 0, count: 0}
bucket 2: HR → {sum: 0, count: 0}
3. 逐行更新對應分組的累積器
處理 "IT, 80000": IT.sum = 80000, IT.count = 1
處理 "Sales, 60000": Sales.sum = 60000, Sales.count = 1
處理 "IT, 90000": IT.sum = 170000, IT.count = 2
...
4. 輸出所有分組的結果
IT → AVG = 170000 / 2 = 85000
Sales → AVG = 125000 / 2 = 62500
HR → AVG = 70000 / 1 = 70000
1. 單次掃描(Single-Pass)
Hash Aggregation 最大的優勢是只需一次遍歷輸入數據:
// 偽代碼展示單次掃描的特性
let mut hash_table = HashMap::new();
for batch in input_stream {
for row in batch {
let group_key = evaluate_group_by(row);
let hash_value = hash(group_key);
// 直接在 Hash Table 中查找或創建分組
let accumulator = hash_table
.entry(hash_value)
.or_insert_with(|| create_accumulator());
// 立即更新累積器
accumulator.update(row.value);
}
}
// 輸出結果
for (group_key, accumulator) in hash_table {
output(group_key, accumulator.evaluate());
}
這種單次掃描的特性意味著:
2. 對輸入順序無要求
無論數據以何種順序到達,Hash Aggregation 都能正確處理:
場景 A - 數據集中到達:
IT, IT, IT, Sales, Sales, HR
✓ 正確處理
場景 B - 數據交錯到達:
IT, Sales, IT, HR, Sales, IT
✓ 同樣正確處理
場景 C - 完全隨機:
Sales, IT, HR, IT, Sales, IT
✓ 依然正確處理
這個特性使得 Hash Aggregation 成為默認首選策略。
3. 並行友好
Hash Aggregation 非常適合並行處理。在多核 CPU 環境下:
輸入數據(分成 4 個分區):
分區 0: IT, Sales, IT ┐
分區 1: HR, IT, Sales ├─→ 各自獨立進行 Hash Aggregation (Partial)
分區 2: Sales, IT, HR │
分區 3: IT, Sales, Sales ┘
Partial 結果:
分區 0: IT → 2, Sales → 1
分區 1: IT → 1, Sales → 1, HR → 1
分區 2: IT → 1, Sales → 1, HR → 1
分區 3: IT → 1, Sales → 2
重分區(按 GROUP BY 鍵 Hash):
IT → 分區 0: 收集所有 IT 的部分結果
Sales → 分區 1: 收集所有 Sales 的部分結果
HR → 分區 2: 收集所有 HR 的部分結果
Final Aggregation:
並行合併各分組的結果
1. 記憶體壓力
Hash Table 必須將所有分組的狀態保留在記憶體中:
查詢: SELECT user_id, COUNT(*) FROM logs GROUP BY user_id;
假設:
- 1000 萬個不同的 user_id
- 每個分組狀態約 100 bytes(包括 key + accumulator)
記憶體需求:
10,000,000 * 100 bytes = 1 GB
如果有 1 億個不同的 user_id:
100,000,000 * 100 bytes = 10 GB!
當分組數量(group cardinality)非常高時,記憶體可能不足以容納所有分組。
2. Hash Table 性能衰減
隨著 Hash Table 的增長,性能會逐漸下降:
分組數量與查找時間的關係:
10,000 個分組:
Hash Table 負載因子 < 0.7
查找時間: ~O(1),快速
1,000,000 個分組:
Hash Table 需要多次 resize
Cache miss 增加
查找時間: 變慢
10,000,000 個分組:
Hash Table 可能超過 CPU Cache
每次查找都會導致 memory access
查找時間: 顯著變慢
3. Spilling 開銷
當記憶體不足時,DataFusion 會將部分數據 spill 到磁碟:
// 簡化的 Spilling 邏輯
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
// 檢查記憶體使用量
if self.memory_usage > self.memory_limit {
// 需要 spill 部分數據到磁碟
self.spill_to_disk()?;
}
// 正常的聚合邏輯
self.aggregate_batch(batch)?;
Ok(())
}
fn spill_to_disk(&mut self) -> Result<()> {
// 1. 選擇要 spill 的分組(通常是最大的或最冷的)
let groups_to_spill = self.select_groups_to_spill();
// 2. 將這些分組的狀態序列化到磁碟
let spill_file = self.spill_manager.create_spill_file()?;
for group in groups_to_spill {
spill_file.write(group.key, group.accumulator.state())?;
}
// 3. 從記憶體中移除這些分組
self.hash_table.remove_groups(groups_to_spill);
Ok(())
}
Spilling 帶來的開銷:
讓我們看看 DataFusion 如何處理 Hash Aggregation:
// datafusion/physical-plan/src/aggregates/row_hash.rs
impl GroupedHashAggregateStream {
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
// 1. 評估 GROUP BY 表達式
let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
// 2. 評估聚合函數的輸入
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
// 3. 使用 GroupValues 確定每行的分組索引
let starting_num_groups = self.group_values.len();
self.group_values.intern(
&group_by_values,
&mut self.current_group_indices,
)?;
// 4. 如果分組數量增長,擴展累積器
if self.group_values.len() > starting_num_groups {
self.accumulators
.iter_mut()
.try_for_each(|accumulator| {
accumulator.resize(self.group_values.len())
})?;
}
// 5. 更新每個累積器
for (accumulator, values) in self.accumulators.iter_mut()
.zip(input_values.iter())
{
accumulator.update_batch(
values,
&self.current_group_indices,
opt_filter,
self.group_values.len(),
)?;
}
// 6. 檢查是否需要 emit 或 spill
self.emit_if_necessary()?;
Ok(())
}
}
這個實現體現了 Hash Aggregation 的核心特點:
GroupValues
trait 的高效實現來管理分組GroupsAccumulator
進行批次化更新Sort-based Aggregation(基於排序的聚合)是另一種聚合策略,它依賴於輸入數據按 GROUP BY 鍵排序這一前提:
前提: 輸入數據已按 department 排序
輸入數據流:
department | salary
-----------+--------
HR | 70000 ┐
HR | 72000 ├─ HR 組
HR | 68000 ┘
IT | 80000 ┐
IT | 90000 ├─ IT 組
IT | 85000 │
IT | 95000 ┘
Sales | 60000 ┐
Sales | 65000 ├─ Sales 組
Sales | 63000 ┘
Sort-based Aggregation 過程:
1. 初始化當前分組: current_group = null
2. 讀取第一行: department = HR
- 當前分組為空,創建新分組 HR
- 累積: HR.sum = 70000, HR.count = 1
3. 讀取第二行: department = HR (相同)
- 繼續累積到當前分組
- 累積: HR.sum = 142000, HR.count = 2
4. 讀取第三行: department = HR (相同)
- 繼續累積
- 累積: HR.sum = 210000, HR.count = 3
5. 讀取第四行: department = IT (不同!)
- 當前分組結束,輸出結果: HR → AVG = 210000/3 = 70000
- 創建新分組 IT
- 累積: IT.sum = 80000, IT.count = 1
6. 持續處理...
1. 極低的記憶體佔用
與 Hash Aggregation 最大的不同:Sort-based Aggregation 只需要保留當前分組的狀態:
Hash Aggregation 記憶體需求:
記憶體 = 分組數量 × 每個分組狀態大小
1000 萬個分組 → 1 GB+ 記憶體
Sort-based Aggregation 記憶體需求:
記憶體 = 1 × 單個分組狀態大小
無論多少分組 → 只需幾 KB 記憶體!
實際案例:
查詢: SELECT user_id, COUNT(*) FROM logs GROUP BY user_id;
Hash:
- 1 億個用戶 → 需要 10 GB 記憶體
- 可能需要 spill 到磁碟
Sort-based (假設已排序):
- 1 億個用戶 → 只需 ~1 KB 記憶體
- 不需要 spill
這個特性使得 Sort-based Aggregation 在高基數分組和記憶體受限環境中非常有吸引力。
2. 無需 Hash Table 維護
不需要 Hash Table 意味著:
3. Cache 友好
由於只處理當前分組,數據局部性極好:
記憶體訪問模式:
Hash Aggregation:
處理 IT 員工 #1 → 訪問 hash_table[IT]
處理 Sales 員工 #1 → 訪問 hash_table[Sales] (Cache miss)
處理 HR 員工 #1 → 訪問 hash_table[HR] (Cache miss)
處理 IT 員工 #2 → 訪問 hash_table[IT] (Cache miss)
...
隨機訪問模式,Cache 命中率低
Sort-based Aggregation:
處理 HR 員工 #1 → 訪問 current_group
處理 HR 員工 #2 → 訪問 current_group (Cache hit)
處理 HR 員工 #3 → 訪問 current_group (Cache hit)
...
處理 IT 員工 #1 → 訪問 current_group
處理 IT 員工 #2 → 訪問 current_group (Cache hit)
...
順序訪問模式,Cache 命中率極高
1. 必須預先排序
這是最大的限制:輸入數據必須按 GROUP BY 鍵排序。
如果數據未排序,需要先進行排序:
場景 A: 數據已經排序(例如從索引掃描獲得)
無需額外操作 → Sort-based Aggregation 免費獲得!
場景 B: 數據未排序
需要先排序:
SortExec → AggregateExec
↑
排序開銷可能很大!
排序成本:
- 時間複雜度: O(N log N)
- 空間複雜度: 可能需要額外的 O(N) 空間
- 如果數據無法全部放入記憶體,需要外部排序(更慢)
2. 需要完整數據(或分區內完整)
Sort-based Aggregation 要求同一分組的所有數據連續出現:
正確的輸入(可以處理):
HR, HR, HR, IT, IT, Sales, Sales
錯誤的輸入(無法處理):
HR, IT, HR, Sales, IT, HR, Sales
↑ ↑
HR 分組分散在多處,無法正確聚合
這意味著在分散式環境中,必須先進行 shuffle(重分區),確保同一分組的數據在同一個分區。
3. 不適合流式處理
因為需要排序,Sort-based Aggregation 不適合真正的流式場景:
流式數據:
時間 0s: IT, Sales, HR
時間 1s: IT, HR, Sales
時間 2s: Sales, IT, HR
...
問題:
- 無法保證數據按分組鍵有序到達
- 如果等待排序,會引入延遲
- 與流式處理的低延遲目標衝突
需要注意的是,DataFusion 並沒有實現獨立的 Sort-based Aggregation 算子。相反,它採用了一種混合策略:
// DataFusion 的做法:
// 1. 主要使用 Hash Aggregation(GroupedHashAggregateStream)
// 2. 但如果輸入已經部分排序,會利用這個特性來優化
// 來自 update_aggr_exprs 優化規則
fn optimize_aggregate_with_ordering(
aggr_exec: &AggregateExec,
) -> Result<AggregateExec> {
let groupby_exprs = aggr_exec.group_expr().input_exprs();
let input = aggr_exec.input();
// 檢查輸入是否按 GROUP BY 鍵的前綴排序
let indices = get_ordered_partition_by_indices(&groupby_exprs, input)?;
if !indices.is_empty() {
// 輸入有序!可以利用這個特性
// 調整聚合表達式以利用順序
let requirement = indices
.iter()
.map(|&idx| PhysicalSortRequirement::new(
Arc::clone(&groupby_exprs[idx]),
None,
))
.collect();
// 某些聚合函數(如 FIRST_VALUE)可以在有序輸入下更高效
let optimized_aggr_exprs = try_convert_aggregate_if_better(
aggr_exec.aggr_expr(),
&requirement,
input.equivalence_properties(),
)?;
return Ok(aggr_exec.with_new_aggr_exprs(optimized_aggr_exprs));
}
Ok(aggr_exec.clone())
}
這種方法的優勢:
FIRST_VALUE
、LAST_VALUE
)在有序輸入下可以更高效地實現讓我們通過具體案例來比較兩種策略:
SELECT department, AVG(salary), COUNT(*)
FROM employees -- 100 萬行
GROUP BY department; -- 只有 10 個部門
Hash Aggregation:
Sort-based Aggregation:
性能對比:
Hash Aggregation: 100 ms(單次掃描)
Sort + Aggregate: 500 ms(400 ms 排序 + 100 ms 聚合)
結論: Hash 快 5 倍
SELECT user_id, SUM(purchase_amount)
FROM purchases -- 10 億行,已按 user_id 排序(索引掃描)
GROUP BY user_id; -- 1 億個不同用戶
Hash Aggregation:
Sort-based Aggregation:
性能對比:
Hash Aggregation: 300 秒(大量 spill I/O)
Sort-based: 30 秒(流式處理,無 spill)
結論: Sort-based 快 10 倍
DataFusion 採用以 Hash 為主,智能利用排序的策略。讓我們看看優化器如何工作:
物理規劃器 (PhysicalPlanner
) 默認將 LogicalPlan::Aggregate
轉換為 AggregateExec
,使用 Hash-based 實現:
// 簡化的物理規劃邏輯
fn create_physical_plan_for_aggregate(
logical_agg: &Aggregate,
) -> Result<Arc<dyn ExecutionPlan>> {
// 默認策略:使用 Hash Aggregation
let aggregate_mode = if needs_two_phase {
AggregateMode::Partial // 第一階段
} else {
AggregateMode::Single // 單階段
};
let aggr_exec = AggregateExec::try_new(
aggregate_mode,
group_by_exprs,
aggr_exprs,
input_plan,
input_schema,
)?;
Ok(Arc::new(aggr_exec))
}
UpdateAggregateExprWithOrdering
規則這個優化規則檢查輸入是否按 GROUP BY 鍵排序,並據此優化:
// datafusion/physical-optimizer/src/update_aggr_exprs.rs
impl PhysicalOptimizerRule for UpdateAggregateExprWithOrdering {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
// 只優化第一階段(Partial)
if !aggr_exec.mode().is_first_stage() {
return Ok(Transformed::no(plan));
}
let input = aggr_exec.input();
let groupby_exprs = aggr_exec.group_expr().input_exprs();
// 檢查輸入是否按 GROUP BY 的前綴排序
let ordered_indices = get_ordered_partition_by_indices(
&groupby_exprs,
input,
)?;
if ordered_indices.is_empty() {
// 沒有排序,無法優化
return Ok(Transformed::no(plan));
}
// 有排序!可以優化
let sort_requirement: Vec<_> = ordered_indices
.iter()
.map(|&idx| PhysicalSortRequirement::new(
Arc::clone(&groupby_exprs[idx]),
None,
))
.collect();
// 嘗試將聚合函數轉換為更高效的版本
let new_aggr_exprs = try_convert_aggregate_if_better(
aggr_exec.aggr_expr().to_vec(),
&sort_requirement,
input.equivalence_properties(),
)?;
let new_aggr_exec = aggr_exec.with_new_aggr_exprs(new_aggr_exprs);
Ok(Transformed::yes(Arc::new(new_aggr_exec) as _))
} else {
Ok(Transformed::no(plan))
}
})
.data()
}
}
這個規則的關鍵作用:
讓我們通過 EXPLAIN 來觀察優化器的決策:
-- 場景 1: 無序數據
EXPLAIN SELECT department, AVG(salary)
FROM employees
GROUP BY department;
Physical Plan:
AggregateExec: mode=Single, gby=[department@0], aggr=[AVG(salary)]
ParquetExec: file=employees.parquet, partitions={4}
分析:
- 使用 Hash Aggregation(GroupedHashAggregateStream)
- 單階段聚合(因為數據量不大)
-- 場景 2: 數據已按 department 排序
EXPLAIN SELECT department, AVG(salary)
FROM employees_sorted -- 已按 department 排序
GROUP BY department;
Physical Plan:
AggregateExec: mode=Single, gby=[department@0], aggr=[AVG(salary)]
-- 注意: 保留了輸入的排序
ParquetExec: file=employees_sorted.parquet,
output_ordering=[department ASC]
分析:
- 仍使用 AggregateExec(基於 Hash)
- 但優化器知道輸入有序,可以利用這個特性
- 聚合函數可能被優化為更高效的版本
在某些情況下,你可以手動在聚合前插入排序:
-- 方法 1: 顯式 ORDER BY(如果後面有需要排序的操作)
SELECT user_id, SUM(amount) as total
FROM transactions
GROUP BY user_id
ORDER BY user_id; -- 這會在聚合後排序
-- 方法 2: 使用子查詢先排序(強制策略,通常不推薦)
SELECT user_id, SUM(amount) as total
FROM (
SELECT * FROM transactions ORDER BY user_id
)
GROUP BY user_id;
但通常不推薦手動插入排序,因為:
今天我們深入探討了聚合策略的選擇:Hash Aggregation 與 Sort-based Aggregation。
UpdateAggregateExprWithOrdering
規則利用已有排序聚合策略的選擇沒有絕對的"最佳"策略,只有"最適合"的策略。理解兩種策略的權衡,能幫助我們更好地設計查詢和優化系統。
明天,我們將探討 Join 算子 Part 1 - Hash Join 原理,了解另一個重要的有狀態算子如何高效執行。