iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 26

Day 26: 並行執行與分區策略 - DataFusion 的性能倍增器

  • 分享至 

  • xImage
  •  

前言

在 Day 14 的文章中,我們初步認識了分區(Partitioning)的概念,了解 DataFusion 支援 RoundRobin、Hash 等分區策略。

今天,我們將專門深入探討並行執行與分區策略這個主題。理解分區是如何工作的?為什麼需要數據重分配?RepartitionExec 扮演什麼角色?如何選擇合適的並行度?這些概念直接影響查詢性能。

為什麼需要並行執行?

在現代數據處理中,單執行緒已經無法滿足性能需求。想像一個場景:你需要處理 1GB 的數據,如果使用單核心處理需要 10 秒,那麼如果能充分利用 8 核心的 CPU,理論上可以將處理時間縮短到約 1.25 秒。

DataFusion 透過**分區(Partitioning)**機制來實現並行執行。分區的核心概念是將數據切分成多個獨立的部分,每個部分可以在不同的執行緒或核心上獨立處理,最後再將結果合併。

分區(Partitioning)的基本概念

ExecutionPlan 的輸出分區

在 DataFusion 中,每個 ExecutionPlan 都會產生一個或多個獨立的數據流(Stream),這些流被稱為分區(Partitions)。當我們調用 execute(partition_id) 方法時,會得到對應分區的數據流。

                  ┌─────────────────┐
                  │  ExecutionPlan  │
                  │  (3 partitions) │
                  └─────────────────┘
                           │
         ┌─────────────────┼─────────────────┐
         │                 │                 │
         ▼                 ▼                 ▼
    Stream 0          Stream 1          Stream 2
   (partition 0)    (partition 1)    (partition 2)
         │                 │                 │
         ▼                 ▼                 ▼
    RecordBatch      RecordBatch      RecordBatch
    RecordBatch      RecordBatch      RecordBatch
       ...               ...               ...

分區策略類型

DataFusion 支援三種主要的分區策略,定義在 datafusion/physical-expr/src/partitioning.rs 中:

#[derive(Debug, Clone)]
pub enum Partitioning {
    /// 使用輪詢演算法分配批次到指定數量的分區
    RoundRobinBatch(usize),
    
    /// 基於一個或多個表達式的雜湊值分配資料列到指定數量的分區
    Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
    
    /// 未知的分區方案,但已知分區數量
    UnknownPartitioning(usize),
}

讓我們詳細了解每種策略:

1. RoundRobinBatch(輪詢分區)

工作原理:將 RecordBatch 按照輪詢方式分配到各個分區,類似於發牌。第一個 batch 給分區 0,第二個給分區 1,以此類推。

適用場景

  • 數據分佈較為均勻,不需要特定的分組邏輯
  • 需要簡單快速的數據重分配
  • 作為初始數據掃描後的均勻分布

範例:假設我們有 5 個 RecordBatch,目標是 3 個分區:

Input Batches:  [Batch0, Batch1, Batch2, Batch3, Batch4]
                   │       │       │       │       │
                   ▼       ▼       ▼       ▼       ▼
Partition 0:    Batch0           Batch3
Partition 1:            Batch1           Batch4
Partition 2:                    Batch2

2. Hash(雜湊分區)

工作原理:基於指定的表達式(通常是欄位)計算雜湊值,然後使用 hash % partition_count 來決定每一行應該分配到哪個分區。

適用場景

  • GROUP BY 聚合:相同分組鍵的資料必須在同一分區
  • JOIN 操作:相同連接鍵的資料必須在同一分區
  • 需要保證相同鍵值的資料聚集在一起

範例:假設我們按 user_id 進行雜湊分區,目標是 3 個分區:

Row: {user_id: 100, name: "Alice"}  → hash(100) % 3 = 1 → Partition 1
Row: {user_id: 201, name: "Bob"}    → hash(201) % 3 = 0 → Partition 0
Row: {user_id: 102, name: "Carol"}  → hash(102) % 3 = 1 → Partition 1
Row: {user_id: 303, name: "David"}  → hash(303) % 3 = 0 → Partition 0

這確保了相同 user_id 的資料會被分配到同一分區。

3. UnknownPartitioning(未知分區)

這種策略表示我們知道有多少個分區,但不知道具體的分區邏輯。這通常用於:

  • 檔案掃描:每個檔案是一個分區
  • 外部數據源:分區邏輯由外部系統決定

RepartitionExec - 數據重分配的核心

RepartitionExec 的角色

RepartitionExec 是 DataFusion 中實現數據重分配的核心算子。它的作用是將 N 個輸入分區重新組織成 M 個輸出分區。這個過程在查詢執行中扮演關鍵角色。

讓我們看一個實際場景:假設我們掃描了 2 個 Parquet 檔案(產生 2 個分區),但我們希望使用 4 個核心進行聚合計算:

           ┌─────────────┐      ┌─────────────┐
           │   File 1    │      │   File 2    │
           │ (Partition) │      │ (Partition) │
           └──────┬──────┘      └──────┬──────┘
                  │                     │
                  └──────────┬──────────┘
                             │
                  ┌──────────▼──────────┐
                  │   RepartitionExec   │
                  │  Hash(user_id, 4)   │
                  └──────────┬──────────┘
                             │
         ┌───────────┬───────┴───────┬───────────┐
         │           │               │           │
         ▼           ▼               ▼           ▼
    Partition 0  Partition 1   Partition 2  Partition 3
         │           │               │           │
         ▼           ▼               ▼           ▼
    AggregateExec AggregateExec AggregateExec AggregateExec

RepartitionExec 的實作特點

從原始碼 datafusion/physical-plan/src/repartition/mod.rs 中,我們可以看到 RepartitionExec 的幾個重要特性:

/// RepartitionExec 將 N 個輸入分區映射到 M 個輸出分區
/// 基於分區方案,可選地保持輸入行的順序
pub struct RepartitionExec {
    /// 輸入執行計劃
    input: Arc<dyn ExecutionPlan>,
    /// 內部狀態
    state: Arc<Mutex<RepartitionExecState>>,
    /// 執行指標
    metrics: ExecutionPlanMetricsSet,
    /// 是否保持順序(SortPreservingRepartitionExec)
    preserve_order: bool,
    cache: PlanProperties,
}

關鍵設計點

  1. 順序保持(Order Preservation)

    • 普通 RepartitionExec:不保證輸出順序
    • SortPreservingRepartitionExecpreserve_order = true):保持排序順序,但開銷更大
  2. 批次分區器(BatchPartitioner)
    負責實際的資料分區邏輯

impl BatchPartitioner {
    pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result<Self> {
        let state = match partitioning {
            Partitioning::RoundRobinBatch(num_partitions) => {
                BatchPartitionerState::RoundRobin {
                    num_partitions,
                    next_idx: 0,
                }
            }
            Partitioning::Hash(exprs, num_partitions) => {
                BatchPartitionerState::Hash {
                    exprs,
                    num_partitions,
                    random_state: ahash::RandomState::with_seeds(0, 0, 0, 0),
                    hash_buffer: vec![],
                }
            }
            // ...
        };
        Ok(Self { state, timer })
    }
}

數據重分配的開銷分析

數據重分配涉及多個開銷:

1. 計算開銷

  • 雜湊計算:對於 Hash 分區,需要計算每一行的雜湊值
  • 行重組:需要將資料行重新組織到不同的 RecordBatch 中

2. 記憶體開銷

  • 緩衝區:需要為每個輸出分區維護緩衝區
  • 複製:可能需要複製資料(雖然 Arrow 盡量避免)

3. 時間開銷

重分配會引入額外的處理時間。根據 DataFusion 文件,對於小數據集(<1MB),重分配的開銷可能超過並行執行帶來的收益。

開銷示意圖

時間軸:
─────────────────────────────────────────────────────────►

單分區執行:
[───────掃描───────][──────處理──────]
總時間: 10 秒

重分配後並行執行(4核心):
[──掃描──][重分配][──並行處理──]
          ↑ 開銷    ↑ 4倍加速
總時間: 2.5 + 0.5 + 2.5 = 5.5 秒

對於小數據集,重分配的 0.5 秒開銷可能佔比過大,反而降低了整體性能。

target_partitions 配置的影響

target_partitions 是 DataFusion 中最重要的並行度控制參數,定義在 datafusion/common/src/config.rs

/// 查詢執行的分區數量。增加分區可以提高並行度。
/// 預設值為系統的 CPU 核心數
pub target_partitions: usize, 
    default = get_available_parallelism()

配置策略

場景 1:大數據集(> 100MB)

建議:使用預設值(CPU 核心數)或稍高

let config = SessionConfig::new()
    .with_target_partitions(8);  // 假設 8 核心 CPU

原因

  • 數據量大,重分配開銷相對較小
  • 能充分利用多核心並行處理
  • 每個分區的數據量仍然足夠大,保持向量化效率

場景 2:小數據集(< 1MB)

建議:設定為 1,避免重分配

SET datafusion.execution.target_partitions = 1;

原因

  • 重分配開銷可能超過並行收益
  • 單分區可以減少協調開銷
  • 小數據集本身處理就很快

場景 3:記憶體受限環境

建議:減少 target_partitionsbatch_size

-- 減少並行度,讓每個分區有更多記憶體
SET datafusion.execution.target_partitions = 4;
-- 減少批次大小,降低記憶體峰值
SET datafusion.execution.batch_size = 1024;

原因
根據 DataFusion 文件,使用 FairSpillPool 時,記憶體會平均分配給所有分區。如果 target_partitions = 16,每個分區只能獲得 1/16 的記憶體,更容易觸發 Spilling。

target_partitions 的影響示意圖

記憶體限制: 1GB

target_partitions = 1:
┌────────────────────────────────────┐
│     Partition 0 (1GB 可用)         │
│  ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓  │
└────────────────────────────────────┘
處理時間: 10秒,無 Spilling

target_partitions = 8:
┌────┬────┬────┬────┬────┬────┬────┬────┐
│ P0 │ P1 │ P2 │ P3 │ P4 │ P5 │ P6 │ P7 │
│128M│128M│128M│128M│128M│128M│128M│128M│
│▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │▓▓▓ │
└────┴────┴────┴────┴────┴────┴────┴────┘
處理時間: 3秒,可能 Spilling (如果單個分區需要 >128M)

target_partitions = 16:
┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
│P0│P1│...                            │P14│P15│
│64M each                             │   │   │
│▓ │▓ │▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓│▓  │▓  │
└──┴──┴──────────────────────────────┴───┴───┘
處理時間: 2秒,但頻繁 Spilling (如果單個分區需要 >64M)

並行度的權衡與調優

選擇正確的並行度需要在多個因素間取得平衡:

1. CPU 利用率 vs 開銷

過低的並行度

  • ❌ CPU 核心閒置,浪費計算資源
  • ❌ 單個分區資料過多,處理時間長

過高的並行度

  • ❌ 重分配開銷增加
  • ❌ 執行緒切換和協調開銷
  • ❌ 每個分區資料過少,無法充分利用向量化

最佳點:通常在 CPU 核心數附近

2. 記憶體使用 vs 性能

記憶體壓力隨並行度增加:

並行度 ──────────────────────────────►
        低                        高
        
記憶體   │             ╱╲
壓力    │          ╱      ╲ (Spilling 增加)
        │       ╱            ╲
        │    ╱                  ╲
        └─────────────────────────────
        
最佳   │    ╱╲
性能    │   ╱  ╲
        │  ╱    ╲_____________
        │ ╱       (Spilling 降低性能)
        └─────────────────────────────
               ↑
            最佳點

3. 實際案例調優

案例 1:OLAP 查詢(大表掃描 + 聚合)

-- 查詢:統計每個用戶的訂單總額
SELECT user_id, SUM(amount) 
FROM orders  -- 10GB 數據
GROUP BY user_id;

執行計劃

AggregateExec: mode=FinalPartitioned, gby=[user_id]
  RepartitionExec: partitioning=Hash([user_id], 8)
    AggregateExec: mode=Partial, gby=[user_id]
      ParquetExec: file_groups={4 groups}

調優策略

  • target_partitions = 8(假設 8 核心)
  • 允許重分配(repartition_aggregations = true
  • Partial Aggregation 先在 4 個分區上執行
  • 重分配後在 8 個分區上並行執行 Final Aggregation

案例 2:小表 JOIN 大表

-- 小表:users (10MB)
-- 大表:events (5GB)
SELECT u.name, COUNT(*) 
FROM users u
JOIN events e ON u.user_id = e.user_id
GROUP BY u.name;

執行計劃

AggregateExec
  HashJoinExec: mode=CollectLeft
    MemoryExec: partitions=1  -- 小表不需要分區
    ParquetExec: partitions=8 -- 大表使用 8 個分區

調優策略

  • 小表使用 CollectLeft 模式,避免重分配
  • 大表保持多分區並行掃描
  • 節省了一次重分配的開銷

動態分區選擇

DataFusion 的優化器會根據數據大小動態選擇分區策略。例如,在 datafusion/physical-optimizer/src/enforce_distribution.rs 中:

// 如果 Build 側小於閾值,使用 CollectLeft 模式
if left_size < hash_join_single_partition_threshold {
    return PartitionMode::CollectLeft;
}

// 否則使用分區模式
PartitionMode::Partitioned

這個機制確保了:

  • 小表不會被過度分區
  • 大表能充分利用並行執行
  • 自動避免不必要的重分配開銷

實戰建議

1. 使用 EXPLAIN 觀察分區策略

EXPLAIN SELECT user_id, COUNT(*) 
FROM large_table 
GROUP BY user_id;

輸出會顯示:

AggregateExec: mode=FinalPartitioned, gby=[user_id]
  RepartitionExec: partitioning=Hash([user_id], 8)
    ...

注意觀察:

  • 是否有 RepartitionExec
  • 分區數是多少?
  • 是否符合預期?

2. 性能測試不同配置

// 測試腳本
for partitions in [1, 2, 4, 8, 16] {
    let config = SessionConfig::new()
        .with_target_partitions(partitions);
    
    let ctx = SessionContext::new_with_config(config);
    
    let start = Instant::now();
    // 執行查詢
    let duration = start.elapsed();
    
    println!("Partitions: {}, Time: {:?}", partitions, duration);
}

3. 監控記憶體使用

如果發現性能下降,可能是 Spilling 過於頻繁。可以透過:

  • 減少 target_partitions
  • 增加記憶體限制
  • 減少 batch_size

小結

今天我們探討了:

  1. 分區的工作原理:不只是知道 RoundRobinBatch、Hash、UnknownPartitioning 三種類型,更理解了它們各自的工作機制和適用場景
  2. RepartitionExec 的核心作用:負責將 N 個輸入分區重組為 M 個輸出分區,是實現並行執行的關鍵算子
  3. 重分配的代價:深入分析了計算、記憶體和時間開銷,理解為什麼小數據集不適合過度並行化
  4. target_partitions 的調優藝術:根據數據大小、記憶體限制和 CPU 核心數做出權衡,找到最佳配置
  5. 並行度的平衡點:在 CPU 利用率、記憶體使用和執行開銷之間取得最佳平衡

合理的分區策略能夠成倍提升查詢性能,但錯誤的配置反而可能降低性能。透過今天的學習,你應該能夠:

  • 使用 EXPLAIN 分析查詢的分區策略
  • 針對不同場景選擇合適的 target_partitions 配置
  • 理解何時應該避免重分配,何時應該充分並行化

上一篇
Day 25: Window Functions - 視窗函數執行機制深度解析
下一篇
Day 27: 記憶體管理與 Spilling - 在有限資源下處理大量數據
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言