在 Day 25 學習並行執行與分區策略時,我們多次提到了記憶體的重要性。當時我們了解到,如果 target_partitions
設定過高,每個分區可用的記憶體就會變少,可能導致頻繁的 Spilling(溢寫),反而降低查詢性能。我們還看到了這樣一個事實:在記憶體受限環境中,並行度並非越高越好,需要在 CPU 利用率和記憶體使用之間找到平衡點。
然而,那時我們只是知道「會發生 Spilling」、「需要考慮記憶體限制」,卻還沒深入理解這背後的機制:DataFusion 如何追蹤記憶體使用?當記憶體不足時會發生什麼?Spilling 的過程是怎樣的?FairSpillPool
如何公平分配記憶體?
今天,我們將深入探討 DataFusion 的記憶體管理與 Spilling 機制。這是一個非常實務的主題,因為在真實的生產環境中,我們總是面臨記憶體受限的情況。透過本篇的學習,你將理解 DataFusion 如何在有限資源下優雅地處理超出記憶體容量的海量數據。
在理想世界中,我們希望所有數據都能放在記憶體中處理,這樣速度最快。但現實是:
DataFusion 採取了一個務實的方法來解決這個問題。
DataFusion 並不追蹤所有的記憶體分配,而是採取了一個實用的策略:
記憶體分類:
┌────────────────────────────────────────────────┐
│ 不追蹤的「小額」記憶體 │
│ - RecordBatch 在算子間流動的臨時記憶體 │
│ - 表達式求值的暫存空間 │
│ - 元數據和小型結構 │
│ 假設:這些都很小,不會造成問題 │
└────────────────────────────────────────────────┘
┌────────────────────────────────────────────────┐
│ 追蹤的「大額」記憶體 ✓ │
│ - Hash Table (Join, Aggregate) │
│ - 排序緩衝區 │
│ - 累積的輸入數據 │
│ 特點:與輸入行數成正比,需要嚴格管理 │
└────────────────────────────────────────────────┘
這種設計的優點是:
官方建議預留 10% 左右的記憶體作為「未追蹤」記憶體的緩衝。例如,如果系統有 10GB 記憶體,可以將 MemoryPool 限制設為 9GB。
MemoryPool
是 DataFusion 記憶體管理的核心抽象。它的 trait 定義揭示了記憶體管理的基本操作:
pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// 註冊一個新的 MemoryConsumer(記憶體消費者)
fn register(&self, _consumer: &MemoryConsumer) {}
/// 註銷 MemoryConsumer
fn unregister(&self, _consumer: &MemoryConsumer) {}
/// 無條件增加記憶體預留(必定成功)
fn grow(&self, reservation: &MemoryReservation, additional: usize);
/// 縮減記憶體預留
fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
/// 嘗試增加記憶體預留(可能失敗)
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
/// 返回當前已預留的記憶體總量
fn reserved(&self) -> usize;
/// 返回記憶體池的限制
fn memory_limit(&self) -> MemoryLimit;
}
DataFusion 的記憶體管理涉及三個關鍵角色:
┌─────────────────────────────────────────────────────────┐
│ MemoryPool │
│ (全局記憶體資源池,例如:限制 8GB) │
└──────────────────┬──────────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│MemoryConsumer│ │MemoryConsumer│
│ (消費者1) │ │ (消費者2) │
│ 例如: │ │ 例如: │
│ HashJoin 的 │ │ Aggregate 的 │
│ 一個分區 │ │ 一個分區 │
└──────┬───────┘ └──────┬───────┘
│ │
┌───┴────┐ ┌────┴────┐
▼ ▼ ▼ ▼
[Res1] [Res2] [Res3] [Res4]
Hash Input Group Input
Table Buffer Buffer Buffer
DataFusion 提供了幾種開箱即用的 MemoryPool 實現:
pub struct UnboundedMemoryPool {
used: AtomicUsize,
}
特性:
特性:
try_grow
會失敗工作流程:
MemoryPool (限制: 1GB)
時刻 1:Consumer A 請求 400MB ✓ 批准(已用:400MB)
時刻 2:Consumer B 請求 400MB ✓ 批准(已用:800MB)
時刻 3:Consumer C 請求 300MB ✗ 拒絕(800+300 > 1000)
Consumer C 必須等待或 Spill
時刻 4:Consumer A 釋放 200MB (已用:600MB)
時刻 5:Consumer C 再次請求 300MB ✓ 批准(已用:900MB)
這種策略的問題是:先到的消費者可能佔用大量記憶體,導致後來者餓死。
特性:
FairSpillPool
是 DataFusion 記憶體管理的核心創新,它解決了 GreedyMemoryPool
的不公平問題。
假設有 N 個消費者需要 Spilling,總記憶體限制是 M,那麼:
每個消費者的公平配額 = M / N
但實際情況更複雜,因為:
場景:總記憶體 1GB,3 個消費者
初始狀態:
┌─────────┬─────────┬─────────┐
│Consumer1│Consumer2│Consumer3│
│ 333MB │ 333MB │ 333MB │ 每人配額
└─────────┴─────────┴─────────┘
Consumer1 只需要 100MB:
┌───┬──────────┬──────────┐
│C1 │Consumer2 │Consumer3 │
│100│ 450MB │ 450MB │ 剩餘記憶體重新分配
└───┴──────────┴──────────┘
Consumer2 達到配額並請求更多:
┌───┬──────────┬──────────┐
│C1 │Consumer2 │Consumer3 │
│100│ 450MB │ 450MB │
└───┴──────────┴──────────┘
↓ Spill ← 必須 Spill 才能繼續
動態配額計算:
Spilling 觸發條件:
try_grow
返回錯誤公平性保證:
當記憶體不足時,DataFusion 會將部分數據「溢寫」到磁碟,以釋放記憶體空間。
┌─────────────────────────────────────────────────┐
│ 1. 算子需要更多記憶體 │
│ 例如:HashJoin 要擴大 Hash Table │
└─────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ 2. 調用 reservation.try_grow(size) │
└─────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ 3. MemoryPool 檢查是否有足夠記憶體 │
└─────────────┬───────────────────────────────────┘
│
┌────┴────┐
│ 有空間? │
└────┬────┘
│
┌─────────┴─────────┐
│YES NO│
▼ ▼
┌───────┐ ┌──────────────────────┐
│ 批准 │ │ 4. 返回錯誤 │
│ 分配 │ │ MemoryExhausted │
└───────┘ └──────────┬───────────┘
│
▼
┌──────────────────────┐
│ 5. 算子執行 Spill │
│ - 選擇要溢寫的數據 │
│ - 寫入臨時檔案 │
│ - 釋放記憶體 │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ 6. 重試 try_grow() │
└──────────────────────┘
以 HashJoinExec
為例,當記憶體不足時:
Hash Table 分區:
┌────────┬────────┬────────┬────────┐
│Partition│Partition│Partition│Partition│
│ 0 │ 1 │ 2 │ 3 │
│ 200MB │ 180MB │ 190MB │ 170MB │
└────────┴────────┴────────┴────────┘
↓ ↓
最大 最小
選擇策略:通常選擇最大的分區溢寫
// 簡化的 Spilling 邏輯
async fn spill_partition(&mut self, partition_id: usize) -> Result<()> {
// 1. 創建臨時檔案
let spill_file = create_temp_file()?;
// 2. 將分區數據寫入 IPC 格式
let mut writer = FileWriter::try_new(spill_file, &schema)?;
for batch in &self.partitions[partition_id] {
writer.write(batch)?;
}
writer.finish()?;
// 3. 記錄 Spill 檔案位置
self.spilled_files[partition_id].push(spill_file);
// 4. 清空記憶體中的數據
self.partitions[partition_id].clear();
// 5. 釋放記憶體預留
self.reservation.shrink(spilled_size);
Ok(())
}
後續查詢需要該分區時:
1. 檢測到分區已被 Spill
↓
2. 從磁碟讀取 Spill 檔案
↓
3. 使用 Arrow IPC Reader 解析
↓
4. 流式處理(不一次性載入全部)
Spilling 並非免費,它帶來多個開銷:
性能比較(處理 10GB 數據):
全記憶體處理:
記憶體使用:10GB
處理時間:10 秒
I/O:0
部分 Spilling(5GB 記憶體):
記憶體使用:5GB
處理時間:30 秒
I/O:寫入 5GB + 讀取 5GB
開銷來源:
- 序列化/反序列化
- 磁碟 I/O(遠慢於記憶體)
- 額外的 CPU 時間
頻繁 Spilling(1GB 記憶體):
記憶體使用:1GB
處理時間:90 秒
I/O:多次寫入/讀取,總量 > 20GB
問題:磁碟 I/O 成為瓶頸
這就是為什麼在 Day 25 我們強調:過高的 target_partitions
會導致每個分區記憶體不足,頻繁 Spilling,反而降低性能。
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// 創建 8GB 的 FairSpillPool
let memory_pool = Arc::new(FairSpillPool::new(8 * 1024 * 1024 * 1024));
let config = SessionConfig::new()
.with_target_partitions(4); // 每個分區約 2GB
let runtime = RuntimeEnv::new(
RuntimeConfig::new().with_memory_pool(memory_pool)
)?;
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
// 執行查詢...
Ok(())
}
-- 需要大量記憶體建立 Hash Table
SET datafusion.execution.target_partitions = 4; -- 減少分區,給每個更多記憶體
SET datafusion.optimizer.prefer_hash_join = true;
-- 需要記憶體累積聚合狀態
SET datafusion.execution.target_partitions = 4;
SET datafusion.optimizer.hash_join_single_partition_threshold = 1048576;
SET datafusion.execution.target_partitions = 16; -- 最大化並行度
-- 使用 UnboundedMemoryPool 或設定較大的限制
use datafusion::execution::memory_pool::TrackConsumersPool;
// 包裝 FairSpillPool,追蹤每個消費者的使用情況
let base_pool = Arc::new(FairSpillPool::new(8 * 1024 * 1024 * 1024));
let tracked_pool = Arc::new(TrackConsumersPool::new(base_pool));
// 查詢執行後
if let Some(top_consumers) = tracked_pool.top_consumers(5) {
for (name, size) in top_consumers {
println!("{}: {} bytes", name, size);
}
}
輸出可能是:
HashJoinExec[partition=2]: 1,200,000,000 bytes
AggregateExec[partition=1]: 800,000,000 bytes
HashJoinExec[partition=0]: 750,000,000 bytes
...
這幫助我們識別哪些算子消耗了最多記憶體。
假設我們需要在一台記憶體 16GB 的機器上處理 100GB 的日誌數據,進行用戶行為分析:
SELECT
user_id,
DATE(timestamp) as date,
COUNT(*) as event_count,
COUNT(DISTINCT session_id) as session_count,
AVG(duration) as avg_duration
FROM events
GROUP BY user_id, DATE(timestamp)
ORDER BY user_id, date;
GROUP BY
需要累積大量中間狀態COUNT(DISTINCT)
進一步增加記憶體需求ORDER BY
也需要排序緩衝區let config = SessionConfig::new()
.with_target_partitions(4); // 不要太高!
// 每個分區約 3GB 記憶體(12GB / 4)
// 保留 4GB 給系統和未追蹤記憶體
如果設為 16 個分區,每個只有 750MB,會頻繁 Spilling。
let memory_pool = Arc::new(FairSpillPool::new(12 * 1024 * 1024 * 1024));
let tracked_pool = Arc::new(TrackConsumersPool::new(memory_pool));
let runtime = RuntimeEnv::new(
RuntimeConfig::new().with_memory_pool(tracked_pool)
)?;
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
// 執行查詢
let df = ctx.sql(&query).await?;
let start = Instant::now();
let results = df.collect().await?;
let duration = start.elapsed();
println!("Query completed in {:?}", duration);
println!("Peak memory: {} bytes", tracked_pool.reserved());
如果單個查詢仍然太重,可以分階段處理:
-- 階段 1:先按天聚合,寫入中間表
CREATE TABLE daily_stats AS
SELECT
DATE(timestamp) as date,
user_id,
session_id,
SUM(duration) as total_duration,
COUNT(*) as event_count
FROM events
GROUP BY DATE(timestamp), user_id, session_id;
-- 階段 2:在中間表上進行最終聚合
SELECT
user_id,
date,
SUM(event_count) as event_count,
COUNT(DISTINCT session_id) as session_count,
AVG(total_duration) as avg_duration
FROM daily_stats
GROUP BY user_id, date
ORDER BY user_id, date;
配置:16GB 記憶體,4 個分區,12GB MemoryPool
查詢執行:
階段 1:Table Scan(流式,記憶體使用低)
階段 2:Partial Aggregate(每分區約 3GB)
- Partition 0: Spilled 2 次
- Partition 1: Spilled 1 次
- Partition 2: Spilled 3 次
- Partition 3: Spilled 2 次
階段 3:Repartition(重新分配)
階段 4:Final Aggregate(每分區約 2GB)
- Partition 0: No spilling
- Partition 1: Spilled 1 次
...
階段 5:Sort(全局排序)
- Spilled 4 次
總執行時間:~8 分鐘
磁碟 I/O:寫入 35GB,讀取 35GB
峰值記憶體:11.8GB
對比如果 target_partitions=16:
總執行時間:~25 分鐘(頻繁 Spilling)
磁碟 I/O:寫入 120GB,讀取 120GB
如果內建的 MemoryPool 不滿足需求,可以實現自定義的分配策略:
use datafusion::execution::memory_pool::{MemoryPool, MemoryReservation};
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct CustomMemoryPool {
limit: usize,
used: AtomicUsize,
// 自定義的策略字段...
}
impl MemoryPool for CustomMemoryPool {
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
loop {
let current = self.used.load(Ordering::SeqCst);
if current + additional > self.limit {
// 自定義的記憶體不足處理邏輯
return Err(DataFusionError::ResourcesExhausted(
format!("Memory limit exceeded")
));
}
// 使用 CAS (Compare-And-Swap) 原子操作
if self.used.compare_exchange(
current,
current + additional,
Ordering::SeqCst,
Ordering::SeqCst
).is_ok() {
return Ok(());
}
}
}
// 實現其他必需方法...
}
從 Day 25 關注並行執行中記憶體的影響,到今天深入理解記憶體管理的完整機制,我們建立了對 DataFusion 資源管理的全面認知。今天我們深入探討了:
DataFusion 的記憶體管理哲學:選擇性追蹤大型數據結構,而非追蹤所有分配,在代碼複雜度和資源控制之間取得平衡
MemoryPool 體系架構:理解了 MemoryPool、MemoryConsumer、MemoryReservation 三層結構,以及 UnboundedMemoryPool、GreedyMemoryPool、FairSpillPool 三種內建實現
FairSpillPool 的公平分配:確保所有需要 Spilling 的消費者都能公平獲得記憶體配額,避免資源餓死
Spilling 機制的完整流程:從觸發條件、選擇策略、磁碟寫入到後續讀取的全過程,理解了 Spilling 的成本和必要性
實戰調優技巧:學會根據查詢特性和資源限制,選擇合適的 target_partitions
、記憶體限制和 MemoryPool 實現
真實案例分析:透過處理超大數據集的案例,綜合應用並行度調整、Spilling 監控和分階段處理等技巧
記憶體管理是 DataFusion 性能調優的關鍵。理解了這個主題,你就能在有限資源下處理海量數據,避免 OOM,並在記憶體使用和查詢性能之間找到最佳平衡點。
明天我們將進行性能優化總結與未來方向的探討,回顧 DataFusion 的核心性能優勢,總結調優技巧,並展望未來的發展方向。這將是第二階段到第四週內容的完美總結。