iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 27

Day 27: 記憶體管理與 Spilling - 在有限資源下處理大量數據

  • 分享至 

  • xImage
  •  

前言

在 Day 25 學習並行執行與分區策略時,我們多次提到了記憶體的重要性。當時我們了解到,如果 target_partitions 設定過高,每個分區可用的記憶體就會變少,可能導致頻繁的 Spilling(溢寫),反而降低查詢性能。我們還看到了這樣一個事實:在記憶體受限環境中,並行度並非越高越好,需要在 CPU 利用率和記憶體使用之間找到平衡點。

然而,那時我們只是知道「會發生 Spilling」、「需要考慮記憶體限制」,卻還沒深入理解這背後的機制:DataFusion 如何追蹤記憶體使用?當記憶體不足時會發生什麼?Spilling 的過程是怎樣的?FairSpillPool 如何公平分配記憶體?

今天,我們將深入探討 DataFusion 的記憶體管理與 Spilling 機制。這是一個非常實務的主題,因為在真實的生產環境中,我們總是面臨記憶體受限的情況。透過本篇的學習,你將理解 DataFusion 如何在有限資源下優雅地處理超出記憶體容量的海量數據。

為什麼需要記憶體管理?

在理想世界中,我們希望所有數據都能放在記憶體中處理,這樣速度最快。但現實是:

  1. 數據量可能遠超記憶體:處理 100GB 數據,但只有 8GB 記憶體
  2. 多租戶環境:多個查詢同時執行,需要共享有限的記憶體資源
  3. 避免 OOM(Out of Memory):如果不加控制,程序可能被作業系統強制終止

DataFusion 採取了一個務實的方法來解決這個問題。

DataFusion 的記憶體管理設計哲學

選擇性追蹤(Selective Tracking)

DataFusion 並不追蹤所有的記憶體分配,而是採取了一個實用的策略:

記憶體分類:

┌────────────────────────────────────────────────┐
│ 不追蹤的「小額」記憶體                           │
│ - RecordBatch 在算子間流動的臨時記憶體          │
│ - 表達式求值的暫存空間                          │
│ - 元數據和小型結構                              │
│ 假設:這些都很小,不會造成問題                   │
└────────────────────────────────────────────────┘

┌────────────────────────────────────────────────┐
│ 追蹤的「大額」記憶體 ✓                          │
│ - Hash Table (Join, Aggregate)                │
│ - 排序緩衝區                                    │
│ - 累積的輸入數據                                │
│ 特點:與輸入行數成正比,需要嚴格管理              │
└────────────────────────────────────────────────┘

這種設計的優點是:

  • 降低開銷:不需要追蹤每個小分配,減少管理成本
  • 關注重點:集中管理真正可能耗盡記憶體的大型結構
  • 實用性:在代碼複雜度和資源控制之間取得平衡

官方建議預留 10% 左右的記憶體作為「未追蹤」記憶體的緩衝。例如,如果系統有 10GB 記憶體,可以將 MemoryPool 限制設為 9GB。

MemoryPool 的設計與實現

MemoryPool Trait

MemoryPool 是 DataFusion 記憶體管理的核心抽象。它的 trait 定義揭示了記憶體管理的基本操作:

pub trait MemoryPool: Send + Sync + std::fmt::Debug {
    /// 註冊一個新的 MemoryConsumer(記憶體消費者)
    fn register(&self, _consumer: &MemoryConsumer) {}
    
    /// 註銷 MemoryConsumer
    fn unregister(&self, _consumer: &MemoryConsumer) {}
    
    /// 無條件增加記憶體預留(必定成功)
    fn grow(&self, reservation: &MemoryReservation, additional: usize);
    
    /// 縮減記憶體預留
    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
    
    /// 嘗試增加記憶體預留(可能失敗)
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
    
    /// 返回當前已預留的記憶體總量
    fn reserved(&self) -> usize;
    
    /// 返回記憶體池的限制
    fn memory_limit(&self) -> MemoryLimit;
}

三個核心概念

DataFusion 的記憶體管理涉及三個關鍵角色:

┌─────────────────────────────────────────────────────────┐
│                    MemoryPool                           │
│   (全局記憶體資源池,例如:限制 8GB)                     │
└──────────────────┬──────────────────────────────────────┘
                   │
        ┌──────────┴──────────┐
        │                     │
        ▼                     ▼
┌──────────────┐      ┌──────────────┐
│MemoryConsumer│      │MemoryConsumer│
│  (消費者1)   │      │  (消費者2)   │
│   例如:      │      │   例如:      │
│ HashJoin 的  │      │ Aggregate 的 │
│ 一個分區     │      │ 一個分區     │
└──────┬───────┘      └──────┬───────┘
       │                     │
   ┌───┴────┐           ┌────┴────┐
   ▼        ▼           ▼         ▼
 [Res1]  [Res2]      [Res3]    [Res4]
 Hash    Input      Group     Input
 Table   Buffer     Buffer    Buffer
  1. MemoryPool:全局記憶體資源池,管理總體限制
  2. MemoryConsumer:記憶體消費者,通常是一個算子的一個分區
  3. MemoryReservation:記憶體預留,代表消費者內部某個數據結構的記憶體

內建的 MemoryPool 實現

DataFusion 提供了幾種開箱即用的 MemoryPool 實現:

1. UnboundedMemoryPool(預設)

pub struct UnboundedMemoryPool {
    used: AtomicUsize,
}

特性

  • 不限制記憶體使用
  • 僅追蹤使用量,不拒絕任何分配
  • 適合測試或記憶體充足的環境
  • 風險:可能導致 OOM

2. GreedyMemoryPool

特性

  • 設定固定的記憶體限制(例如 8GB)
  • 採用「先來先服務」策略
  • 當總使用量達到限制時,新的 try_grow 會失敗
  • 適合不需要 Spilling 的查詢

工作流程

MemoryPool (限制: 1GB)

時刻 1:Consumer A 請求 400MB  ✓ 批准(已用:400MB)
時刻 2:Consumer B 請求 400MB  ✓ 批准(已用:800MB)
時刻 3:Consumer C 請求 300MB  ✗ 拒絕(800+300 > 1000)
        Consumer C 必須等待或 Spill
時刻 4:Consumer A 釋放 200MB  (已用:600MB)
時刻 5:Consumer C 再次請求 300MB  ✓ 批准(已用:900MB)

這種策略的問題是:先到的消費者可能佔用大量記憶體,導致後來者餓死。

3. FairSpillPool(推薦用於生產環境)

特性

  • 在所有需要 Spilling 的消費者之間公平分配記憶體
  • 動態調整每個消費者的配額
  • 確保所有消費者都能取得進展

FairSpillPool 的公平分配算法

FairSpillPool 是 DataFusion 記憶體管理的核心創新,它解決了 GreedyMemoryPool 的不公平問題。

核心思想

假設有 N 個消費者需要 Spilling,總記憶體限制是 M,那麼:

每個消費者的公平配額 = M / N

但實際情況更複雜,因為:

  1. 消費者會動態註冊和註銷
  2. 不是所有消費者都需要相同的記憶體
  3. 需要處理消費者使用量超過配額的情況

分配策略

場景:總記憶體 1GB,3 個消費者

初始狀態:
┌─────────┬─────────┬─────────┐
│Consumer1│Consumer2│Consumer3│
│  333MB  │  333MB  │  333MB  │  每人配額
└─────────┴─────────┴─────────┘

Consumer1 只需要 100MB:
┌───┬──────────┬──────────┐
│C1 │Consumer2 │Consumer3 │
│100│  450MB   │  450MB   │  剩餘記憶體重新分配
└───┴──────────┴──────────┘

Consumer2 達到配額並請求更多:
┌───┬──────────┬──────────┐
│C1 │Consumer2 │Consumer3 │
│100│ 450MB    │  450MB   │
└───┴──────────┴──────────┘
      ↓ Spill              ← 必須 Spill 才能繼續

關鍵設計點

  1. 動態配額計算

    • 當新消費者註冊時,重新計算配額
    • 當消費者註銷時,釋放的記憶體重新分配
  2. Spilling 觸發條件

    • 消費者使用量達到配額
    • 全局記憶體已達限制
    • try_grow 返回錯誤
  3. 公平性保證

    • 沒有消費者能長期佔用超過公平配額的記憶體
    • 所有消費者都能取得進展

Spilling 到磁碟的機制

當記憶體不足時,DataFusion 會將部分數據「溢寫」到磁碟,以釋放記憶體空間。

Spilling 的觸發流程

┌─────────────────────────────────────────────────┐
│  1. 算子需要更多記憶體                           │
│     例如:HashJoin 要擴大 Hash Table            │
└─────────────┬───────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────┐
│  2. 調用 reservation.try_grow(size)             │
└─────────────┬───────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────┐
│  3. MemoryPool 檢查是否有足夠記憶體              │
└─────────────┬───────────────────────────────────┘
              │
         ┌────┴────┐
         │ 有空間? │
         └────┬────┘
              │
    ┌─────────┴─────────┐
    │YES               NO│
    ▼                   ▼
┌───────┐      ┌──────────────────────┐
│ 批准  │      │ 4. 返回錯誤           │
│ 分配  │      │    MemoryExhausted   │
└───────┘      └──────────┬───────────┘
                          │
                          ▼
              ┌──────────────────────┐
              │ 5. 算子執行 Spill     │
              │    - 選擇要溢寫的數據 │
              │    - 寫入臨時檔案     │
              │    - 釋放記憶體       │
              └──────────┬───────────┘
                          │
                          ▼
              ┌──────────────────────┐
              │ 6. 重試 try_grow()    │
              └──────────────────────┘

Spilling 的實際過程

HashJoinExec 為例,當記憶體不足時:

第一階段:選擇溢寫目標

Hash Table 分區:
┌────────┬────────┬────────┬────────┐
│Partition│Partition│Partition│Partition│
│   0    │   1    │   2    │   3    │
│ 200MB  │ 180MB  │ 190MB  │ 170MB  │
└────────┴────────┴────────┴────────┘
    ↓                               ↓
  最大                            最小

選擇策略:通常選擇最大的分區溢寫

第二階段:寫入磁碟

// 簡化的 Spilling 邏輯
async fn spill_partition(&mut self, partition_id: usize) -> Result<()> {
    // 1. 創建臨時檔案
    let spill_file = create_temp_file()?;
    
    // 2. 將分區數據寫入 IPC 格式
    let mut writer = FileWriter::try_new(spill_file, &schema)?;
    for batch in &self.partitions[partition_id] {
        writer.write(batch)?;
    }
    writer.finish()?;
    
    // 3. 記錄 Spill 檔案位置
    self.spilled_files[partition_id].push(spill_file);
    
    // 4. 清空記憶體中的數據
    self.partitions[partition_id].clear();
    
    // 5. 釋放記憶體預留
    self.reservation.shrink(spilled_size);
    
    Ok(())
}

第三階段:後續處理

後續查詢需要該分區時:

1. 檢測到分區已被 Spill
   ↓
2. 從磁碟讀取 Spill 檔案
   ↓
3. 使用 Arrow IPC Reader 解析
   ↓
4. 流式處理(不一次性載入全部)

Spilling 的成本

Spilling 並非免費,它帶來多個開銷:

性能比較(處理 10GB 數據):

全記憶體處理:
  記憶體使用:10GB
  處理時間:10 秒
  I/O:0

部分 Spilling(5GB 記憶體):
  記憶體使用:5GB
  處理時間:30 秒
  I/O:寫入 5GB + 讀取 5GB
  開銷來源:
    - 序列化/反序列化
    - 磁碟 I/O(遠慢於記憶體)
    - 額外的 CPU 時間

頻繁 Spilling(1GB 記憶體):
  記憶體使用:1GB
  處理時間:90 秒
  I/O:多次寫入/讀取,總量 > 20GB
  問題:磁碟 I/O 成為瓶頸

這就是為什麼在 Day 25 我們強調:過高的 target_partitions 會導致每個分區記憶體不足,頻繁 Spilling,反而降低性能。

記憶體配置調優技巧

1. 設定合適的記憶體限制

use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    // 創建 8GB 的 FairSpillPool
    let memory_pool = Arc::new(FairSpillPool::new(8 * 1024 * 1024 * 1024));
    
    let config = SessionConfig::new()
        .with_target_partitions(4);  // 每個分區約 2GB
    
    let runtime = RuntimeEnv::new(
        RuntimeConfig::new().with_memory_pool(memory_pool)
    )?;
    
    let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
    
    // 執行查詢...
    Ok(())
}

2. 根據查詢類型調整配置

場景 A:大量 JOIN 操作

-- 需要大量記憶體建立 Hash Table
SET datafusion.execution.target_partitions = 4;  -- 減少分區,給每個更多記憶體
SET datafusion.optimizer.prefer_hash_join = true;

場景 B:大量聚合操作

-- 需要記憶體累積聚合狀態
SET datafusion.execution.target_partitions = 4;
SET datafusion.optimizer.hash_join_single_partition_threshold = 1048576;

場景 C:記憶體充足,追求速度

SET datafusion.execution.target_partitions = 16;  -- 最大化並行度
-- 使用 UnboundedMemoryPool 或設定較大的限制

3. 使用 TrackConsumersPool 調試

use datafusion::execution::memory_pool::TrackConsumersPool;

// 包裝 FairSpillPool,追蹤每個消費者的使用情況
let base_pool = Arc::new(FairSpillPool::new(8 * 1024 * 1024 * 1024));
let tracked_pool = Arc::new(TrackConsumersPool::new(base_pool));

// 查詢執行後
if let Some(top_consumers) = tracked_pool.top_consumers(5) {
    for (name, size) in top_consumers {
        println!("{}: {} bytes", name, size);
    }
}

輸出可能是:

HashJoinExec[partition=2]: 1,200,000,000 bytes
AggregateExec[partition=1]: 800,000,000 bytes
HashJoinExec[partition=0]: 750,000,000 bytes
...

這幫助我們識別哪些算子消耗了最多記憶體。

實戰案例:處理超大數據集

問題場景

假設我們需要在一台記憶體 16GB 的機器上處理 100GB 的日誌數據,進行用戶行為分析:

SELECT 
    user_id,
    DATE(timestamp) as date,
    COUNT(*) as event_count,
    COUNT(DISTINCT session_id) as session_count,
    AVG(duration) as avg_duration
FROM events
GROUP BY user_id, DATE(timestamp)
ORDER BY user_id, date;

挑戰

  1. 數據量遠超記憶體(100GB vs 16GB)
  2. GROUP BY 需要累積大量中間狀態
  3. COUNT(DISTINCT) 進一步增加記憶體需求
  4. 最後的 ORDER BY 也需要排序緩衝區

優化策略

策略 1:合理的並行度

let config = SessionConfig::new()
    .with_target_partitions(4);  // 不要太高!
    // 每個分區約 3GB 記憶體(12GB / 4)
    // 保留 4GB 給系統和未追蹤記憶體

如果設為 16 個分區,每個只有 750MB,會頻繁 Spilling。

策略 2:啟用 Spilling 並監控

let memory_pool = Arc::new(FairSpillPool::new(12 * 1024 * 1024 * 1024));
let tracked_pool = Arc::new(TrackConsumersPool::new(memory_pool));

let runtime = RuntimeEnv::new(
    RuntimeConfig::new().with_memory_pool(tracked_pool)
)?;

let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));

// 執行查詢
let df = ctx.sql(&query).await?;
let start = Instant::now();
let results = df.collect().await?;
let duration = start.elapsed();

println!("Query completed in {:?}", duration);
println!("Peak memory: {} bytes", tracked_pool.reserved());

策略 3:分階段處理

如果單個查詢仍然太重,可以分階段處理:

-- 階段 1:先按天聚合,寫入中間表
CREATE TABLE daily_stats AS
SELECT 
    DATE(timestamp) as date,
    user_id,
    session_id,
    SUM(duration) as total_duration,
    COUNT(*) as event_count
FROM events
GROUP BY DATE(timestamp), user_id, session_id;

-- 階段 2:在中間表上進行最終聚合
SELECT 
    user_id,
    date,
    SUM(event_count) as event_count,
    COUNT(DISTINCT session_id) as session_count,
    AVG(total_duration) as avg_duration
FROM daily_stats
GROUP BY user_id, date
ORDER BY user_id, date;

預期結果

配置:16GB 記憶體,4 個分區,12GB MemoryPool

查詢執行:
  階段 1:Table Scan(流式,記憶體使用低)
  階段 2:Partial Aggregate(每分區約 3GB)
    - Partition 0: Spilled 2 次
    - Partition 1: Spilled 1 次
    - Partition 2: Spilled 3 次
    - Partition 3: Spilled 2 次
  階段 3:Repartition(重新分配)
  階段 4:Final Aggregate(每分區約 2GB)
    - Partition 0: No spilling
    - Partition 1: Spilled 1 次
    ...
  階段 5:Sort(全局排序)
    - Spilled 4 次

總執行時間:~8 分鐘
磁碟 I/O:寫入 35GB,讀取 35GB
峰值記憶體:11.8GB

對比如果 target_partitions=16:
  總執行時間:~25 分鐘(頻繁 Spilling)
  磁碟 I/O:寫入 120GB,讀取 120GB

進階主題:自定義 MemoryPool

如果內建的 MemoryPool 不滿足需求,可以實現自定義的分配策略:

use datafusion::execution::memory_pool::{MemoryPool, MemoryReservation};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Debug)]
struct CustomMemoryPool {
    limit: usize,
    used: AtomicUsize,
    // 自定義的策略字段...
}

impl MemoryPool for CustomMemoryPool {
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
        loop {
            let current = self.used.load(Ordering::SeqCst);
            if current + additional > self.limit {
                // 自定義的記憶體不足處理邏輯
                return Err(DataFusionError::ResourcesExhausted(
                    format!("Memory limit exceeded")
                ));
            }
            
            // 使用 CAS (Compare-And-Swap) 原子操作
            if self.used.compare_exchange(
                current,
                current + additional,
                Ordering::SeqCst,
                Ordering::SeqCst
            ).is_ok() {
                return Ok(());
            }
        }
    }
    
    // 實現其他必需方法...
}

小結

從 Day 25 關注並行執行中記憶體的影響,到今天深入理解記憶體管理的完整機制,我們建立了對 DataFusion 資源管理的全面認知。今天我們深入探討了:

  1. DataFusion 的記憶體管理哲學:選擇性追蹤大型數據結構,而非追蹤所有分配,在代碼複雜度和資源控制之間取得平衡

  2. MemoryPool 體系架構:理解了 MemoryPool、MemoryConsumer、MemoryReservation 三層結構,以及 UnboundedMemoryPool、GreedyMemoryPool、FairSpillPool 三種內建實現

  3. FairSpillPool 的公平分配:確保所有需要 Spilling 的消費者都能公平獲得記憶體配額,避免資源餓死

  4. Spilling 機制的完整流程:從觸發條件、選擇策略、磁碟寫入到後續讀取的全過程,理解了 Spilling 的成本和必要性

  5. 實戰調優技巧:學會根據查詢特性和資源限制,選擇合適的 target_partitions、記憶體限制和 MemoryPool 實現

  6. 真實案例分析:透過處理超大數據集的案例,綜合應用並行度調整、Spilling 監控和分階段處理等技巧

記憶體管理是 DataFusion 性能調優的關鍵。理解了這個主題,你就能在有限資源下處理海量數據,避免 OOM,並在記憶體使用和查詢性能之間找到最佳平衡點。

明天我們將進行性能優化總結與未來方向的探討,回顧 DataFusion 的核心性能優勢,總結調優技巧,並展望未來的發展方向。這將是第二階段到第四週內容的完美總結。

參考資料

  1. DataFusion MemoryPool 原始碼
  2. Memory Pool 實現
  3. Memory Pool 使用範例
  4. DataFusion Memory Management Design Doc
  5. Apache Arrow Memory Pool
  6. FairSpillPool Pull Request Discussion

上一篇
Day 26: 並行執行與分區策略 - DataFusion 的性能倍增器
下一篇
Day 28: StringView 深入解析 - Apache Arrow 字串處理的革命性改進
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言