在 Day 14 的文章中,我們初步認識了分區(Partitioning)的概念,了解 DataFusion 支援 RoundRobin、Hash 等分區策略。
今天,我們將專門深入探討並行執行與分區策略這個主題。理解分區是如何工作的?為什麼需要數據重分配?RepartitionExec
扮演什麼角色?如何選擇合適的並行度?這些概念直接影響查詢性能。
在現代數據處理中,單執行緒已經無法滿足性能需求。想像一個場景:你需要處理 1GB 的數據,如果使用單核心處理需要 10 秒,那麼如果能充分利用 8 核心的 CPU,理論上可以將處理時間縮短到約 1.25 秒。
DataFusion 透過**分區(Partitioning)**機制來實現並行執行。分區的核心概念是將數據切分成多個獨立的部分,每個部分可以在不同的執行緒或核心上獨立處理,最後再將結果合併。
在 DataFusion 中,每個 ExecutionPlan
都會產生一個或多個獨立的數據流(Stream),這些流被稱為分區(Partitions)。當我們調用 execute(partition_id)
方法時,會得到對應分區的數據流。
┌─────────────────┐
│ ExecutionPlan │
│ (3 partitions) │
└─────────────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
▼ ▼ ▼
Stream 0 Stream 1 Stream 2
(partition 0) (partition 1) (partition 2)
│ │ │
▼ ▼ ▼
RecordBatch RecordBatch RecordBatch
RecordBatch RecordBatch RecordBatch
... ... ...
DataFusion 支援三種主要的分區策略,定義在 datafusion/physical-expr/src/partitioning.rs
中:
#[derive(Debug, Clone)]
pub enum Partitioning {
/// 使用輪詢演算法分配批次到指定數量的分區
RoundRobinBatch(usize),
/// 基於一個或多個表達式的雜湊值分配資料列到指定數量的分區
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// 未知的分區方案,但已知分區數量
UnknownPartitioning(usize),
}
讓我們詳細了解每種策略:
工作原理:將 RecordBatch 按照輪詢方式分配到各個分區,類似於發牌。第一個 batch 給分區 0,第二個給分區 1,以此類推。
適用場景:
範例:假設我們有 5 個 RecordBatch,目標是 3 個分區:
Input Batches: [Batch0, Batch1, Batch2, Batch3, Batch4]
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
Partition 0: Batch0 Batch3
Partition 1: Batch1 Batch4
Partition 2: Batch2
工作原理:基於指定的表達式(通常是欄位)計算雜湊值,然後使用 hash % partition_count
來決定每一行應該分配到哪個分區。
適用場景:
範例:假設我們按 user_id
進行雜湊分區,目標是 3 個分區:
Row: {user_id: 100, name: "Alice"} → hash(100) % 3 = 1 → Partition 1
Row: {user_id: 201, name: "Bob"} → hash(201) % 3 = 0 → Partition 0
Row: {user_id: 102, name: "Carol"} → hash(102) % 3 = 1 → Partition 1
Row: {user_id: 303, name: "David"} → hash(303) % 3 = 0 → Partition 0
這確保了相同 user_id
的資料會被分配到同一分區。
這種策略表示我們知道有多少個分區,但不知道具體的分區邏輯。這通常用於:
RepartitionExec
是 DataFusion 中實現數據重分配的核心算子。它的作用是將 N 個輸入分區重新組織成 M 個輸出分區。這個過程在查詢執行中扮演關鍵角色。
讓我們看一個實際場景:假設我們掃描了 2 個 Parquet 檔案(產生 2 個分區),但我們希望使用 4 個核心進行聚合計算:
┌─────────────┐ ┌─────────────┐
│ File 1 │ │ File 2 │
│ (Partition) │ │ (Partition) │
└──────┬──────┘ └──────┬──────┘
│ │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ RepartitionExec │
│ Hash(user_id, 4) │
└──────────┬──────────┘
│
┌───────────┬───────┴───────┬───────────┐
│ │ │ │
▼ ▼ ▼ ▼
Partition 0 Partition 1 Partition 2 Partition 3
│ │ │ │
▼ ▼ ▼ ▼
AggregateExec AggregateExec AggregateExec AggregateExec
從原始碼 datafusion/physical-plan/src/repartition/mod.rs
中,我們可以看到 RepartitionExec
的幾個重要特性:
/// RepartitionExec 將 N 個輸入分區映射到 M 個輸出分區
/// 基於分區方案,可選地保持輸入行的順序
pub struct RepartitionExec {
/// 輸入執行計劃
input: Arc<dyn ExecutionPlan>,
/// 內部狀態
state: Arc<Mutex<RepartitionExecState>>,
/// 執行指標
metrics: ExecutionPlanMetricsSet,
/// 是否保持順序(SortPreservingRepartitionExec)
preserve_order: bool,
cache: PlanProperties,
}
關鍵設計點:
順序保持(Order Preservation):
RepartitionExec
:不保證輸出順序SortPreservingRepartitionExec
(preserve_order = true
):保持排序順序,但開銷更大批次分區器(BatchPartitioner):
負責實際的資料分區邏輯
impl BatchPartitioner {
pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result<Self> {
let state = match partitioning {
Partitioning::RoundRobinBatch(num_partitions) => {
BatchPartitionerState::RoundRobin {
num_partitions,
next_idx: 0,
}
}
Partitioning::Hash(exprs, num_partitions) => {
BatchPartitionerState::Hash {
exprs,
num_partitions,
random_state: ahash::RandomState::with_seeds(0, 0, 0, 0),
hash_buffer: vec![],
}
}
// ...
};
Ok(Self { state, timer })
}
}
數據重分配涉及多個開銷:
重分配會引入額外的處理時間。根據 DataFusion 文件,對於小數據集(<1MB),重分配的開銷可能超過並行執行帶來的收益。
時間軸:
─────────────────────────────────────────────────────────►
單分區執行:
[───────掃描───────][──────處理──────]
總時間: 10 秒
重分配後並行執行(4核心):
[──掃描──][重分配][──並行處理──]
↑ 開銷 ↑ 4倍加速
總時間: 2.5 + 0.5 + 2.5 = 5.5 秒
對於小數據集,重分配的 0.5 秒開銷可能佔比過大,反而降低了整體性能。
target_partitions
是 DataFusion 中最重要的並行度控制參數,定義在 datafusion/common/src/config.rs
:
/// 查詢執行的分區數量。增加分區可以提高並行度。
/// 預設值為系統的 CPU 核心數
pub target_partitions: usize,
default = get_available_parallelism()
建議:使用預設值(CPU 核心數)或稍高
let config = SessionConfig::new()
.with_target_partitions(8); // 假設 8 核心 CPU
原因:
建議:設定為 1,避免重分配
SET datafusion.execution.target_partitions = 1;
原因:
建議:減少 target_partitions
和 batch_size
-- 減少並行度,讓每個分區有更多記憶體
SET datafusion.execution.target_partitions = 4;
-- 減少批次大小,降低記憶體峰值
SET datafusion.execution.batch_size = 1024;
原因:
根據 DataFusion 文件,使用 FairSpillPool
時,記憶體會平均分配給所有分區。如果 target_partitions = 16
,每個分區只能獲得 1/16 的記憶體,更容易觸發 Spilling。
記憶體限制: 1GB
target_partitions = 1:
┌────────────────────────────────────┐
│ Partition 0 (1GB 可用) │
│ ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ │
└────────────────────────────────────┘
處理時間: 10秒,無 Spilling
target_partitions = 8:
┌────┬────┬────┬────┬────┬────┬────┬────┐
│ P0 │ P1 │ P2 │ P3 │ P4 │ P5 │ P6 │ P7 │
│128M│128M│128M│128M│128M│128M│128M│128M│
│▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │
└────┴────┴────┴────┴────┴────┴────┴────┘
處理時間: 3秒,可能 Spilling (如果單個分區需要 >128M)
target_partitions = 16:
┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
│P0│P1│... │P14│P15│
│64M each │ │ │
│▓ │▓ │▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓│▓ │▓ │
└──┴──┴──────────────────────────────┴───┴───┘
處理時間: 2秒,但頻繁 Spilling (如果單個分區需要 >64M)
選擇正確的並行度需要在多個因素間取得平衡:
過低的並行度:
過高的並行度:
最佳點:通常在 CPU 核心數附近
記憶體壓力隨並行度增加:
並行度 ──────────────────────────────►
低 高
記憶體 │ ╱╲
壓力 │ ╱ ╲ (Spilling 增加)
│ ╱ ╲
│ ╱ ╲
└─────────────────────────────
最佳 │ ╱╲
性能 │ ╱ ╲
│ ╱ ╲_____________
│ ╱ (Spilling 降低性能)
└─────────────────────────────
↑
最佳點
案例 1:OLAP 查詢(大表掃描 + 聚合)
-- 查詢:統計每個用戶的訂單總額
SELECT user_id, SUM(amount)
FROM orders -- 10GB 數據
GROUP BY user_id;
執行計劃:
AggregateExec: mode=FinalPartitioned, gby=[user_id]
RepartitionExec: partitioning=Hash([user_id], 8)
AggregateExec: mode=Partial, gby=[user_id]
ParquetExec: file_groups={4 groups}
調優策略:
target_partitions = 8
(假設 8 核心)repartition_aggregations = true
)案例 2:小表 JOIN 大表
-- 小表:users (10MB)
-- 大表:events (5GB)
SELECT u.name, COUNT(*)
FROM users u
JOIN events e ON u.user_id = e.user_id
GROUP BY u.name;
執行計劃:
AggregateExec
HashJoinExec: mode=CollectLeft
MemoryExec: partitions=1 -- 小表不需要分區
ParquetExec: partitions=8 -- 大表使用 8 個分區
調優策略:
CollectLeft
模式,避免重分配DataFusion 的優化器會根據數據大小動態選擇分區策略。例如,在 datafusion/physical-optimizer/src/enforce_distribution.rs
中:
// 如果 Build 側小於閾值,使用 CollectLeft 模式
if left_size < hash_join_single_partition_threshold {
return PartitionMode::CollectLeft;
}
// 否則使用分區模式
PartitionMode::Partitioned
這個機制確保了:
EXPLAIN SELECT user_id, COUNT(*)
FROM large_table
GROUP BY user_id;
輸出會顯示:
AggregateExec: mode=FinalPartitioned, gby=[user_id]
RepartitionExec: partitioning=Hash([user_id], 8)
...
注意觀察:
RepartitionExec
?// 測試腳本
for partitions in [1, 2, 4, 8, 16] {
let config = SessionConfig::new()
.with_target_partitions(partitions);
let ctx = SessionContext::new_with_config(config);
let start = Instant::now();
// 執行查詢
let duration = start.elapsed();
println!("Partitions: {}, Time: {:?}", partitions, duration);
}
如果發現性能下降,可能是 Spilling 過於頻繁。可以透過:
target_partitions
batch_size
今天我們探討了:
合理的分區策略能夠成倍提升查詢性能,但錯誤的配置反而可能降低性能。透過今天的學習,你應該能夠:
target_partitions
配置