在前兩天的文章中,我們探討了聚合算子的兩種策略:Hash Aggregation 與 Sort-based Aggregation 並觀察 DataFusion 如何選擇最佳的聚合策略,提升分散式處理效能。
不過聚合操作是針對對單個表的數據進行分組和計算,而今天我們要探討的主題——Join(連接)操作,則是關於如何高效地組合多個表的數據。Join 是 SQL 中最常見也最重要的操作之一,幾乎每個複雜查詢都會涉及到它:
-- 電商場景:組合訂單、用戶和產品資訊
SELECT
o.order_id,
u.user_name,
p.product_name,
o.quantity,
o.amount
FROM orders o
JOIN users u ON o.user_id = u.user_id
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2024-01-01';
這個查詢需要將三個表的數據關聯起來。如果 orders 表有 1000 萬行、users 表有 100 萬行、products 表有 10 萬行,如何高效地完成這個連接操作?
答案就在於 Hash Join——現代資料庫系統中最常用的 Join 算法之一。
今天的學習目標:
在深入 Hash Join 之前,讓我們先回顧 Join 的本質。Join 操作是根據**連接條件(Join Condition)**來組合兩個表的行:
左表(Left Table): 右表(Right Table):
user_id | name user_id | city
--------+------- --------+----------
1 | Alice 1 | Taipei
2 | Bob 2 | Taichung
3 | Carol 4 | Kaohsiung
JOIN 條件: left.user_id = right.user_id
INNER JOIN 結果:
user_id | name | city
--------+-------+----------
1 | Alice | Taipei
2 | Bob | Taichung
LEFT OUTER JOIN 結果:
user_id | name | city
--------+-------+----------
1 | Alice | Taipei
2 | Bob | Taichung
3 | Carol | NULL
Join 的挑戰在於如何高效地找到匹配的行。如果使用最簡單的嵌套循環(Nested Loop):
// 嵌套循環 Join(效率低)
for left_row in left_table {
for right_row in right_table {
if left_row.join_key == right_row.join_key {
output(combine(left_row, right_row));
}
}
}
// 時間複雜度: O(N × M)
// 如果左表 100 萬行,右表 100 萬行 → 需要 1 萬億次比較!
對於大型數據集,這種方法顯然不可行。Hash Join 提供了一個幾乎線性時間複雜度 O(N + M) 的解決方案。
Hash Join 的核心思想是:使用 Hash Table 來加速查找。基本流程如下:
階段 1 - Build Phase(構建階段):
從較小的表(Build 側)構建一個 Hash Table
Key = 連接鍵的值
Value = 該行的數據(或索引)
階段 2 - Probe Phase(探測階段):
遍歷較大的表(Probe 側)
對每一行:
1. 計算連接鍵的 hash 值
2. 在 Hash Table 中查找匹配的行
3. 產生結果行
讓我們通過一個具體例子來理解:
左表(users)- 選為 Build 側:
user_id | name
--------+-------
1 | Alice
2 | Bob
3 | Carol
右表(orders)- 選為 Probe 側:
order_id | user_id | amount
---------+---------+--------
101 | 1 | 100
102 | 2 | 200
103 | 1 | 150
104 | 3 | 300
JOIN users ON orders.user_id = users.user_id
步驟 1: 遍歷 users 表,構建 Hash Table
處理 Row 1: user_id=1, name=Alice
hash(1) = 12345
hash_table[12345] = { user_id: 1, name: "Alice" }
處理 Row 2: user_id=2, name=Bob
hash(2) = 67890
hash_table[67890] = { user_id: 2, name: "Bob" }
處理 Row 3: user_id=3, name=Carol
hash(3) = 11111
hash_table[11111] = { user_id: 3, name: "Carol" }
構建完成的 Hash Table:
┌──────────┬─────────────────────────┐
│ Hash值 │ 數據 │
├──────────┼─────────────────────────┤
│ 12345 │ {user_id: 1, name: Alice}│
│ 67890 │ {user_id: 2, name: Bob} │
│ 11111 │ {user_id: 3, name: Carol}│
└──────────┴─────────────────────────┘
步驟 2: 遍歷 orders 表,查找匹配
處理 order 101: user_id=1, amount=100
1. hash(1) = 12345
2. 在 hash_table[12345] 找到: {user_id: 1, name: "Alice"}
3. 驗證: 1 == 1 ✓ 匹配!
4. 產生結果: {order_id: 101, user_id: 1, name: "Alice", amount: 100}
處理 order 102: user_id=2, amount=200
1. hash(2) = 67890
2. 在 hash_table[67890] 找到: {user_id: 2, name: "Bob"}
3. 驗證: 2 == 2 ✓ 匹配!
4. 產生結果: {order_id: 102, user_id: 2, name: "Bob", amount: 200}
處理 order 103: user_id=1, amount=150
1. hash(1) = 12345
2. 在 hash_table[12345] 找到: {user_id: 1, name: "Alice"}
3. 驗證: 1 == 1 ✓ 匹配!
4. 產生結果: {order_id: 103, user_id: 1, name: "Alice", amount: 150}
處理 order 104: user_id=3, amount=300
1. hash(3) = 11111
2. 在 hash_table[11111] 找到: {user_id: 3, name: "Carol"}
3. 驗證: 3 == 3 ✓ 匹配!
4. 產生結果: {order_id: 104, user_id: 3, name: "Carol", amount: 300}
最終結果:
order_id | user_id | name | amount
---------+---------+-------+--------
101 | 1 | Alice | 100
102 | 2 | Bob | 200
103 | 1 | Alice | 150
104 | 3 | Carol | 300
時間複雜度分析:
設左表有 N 行,右表有 M 行
Nested Loop Join:
- 需要 N × M 次比較
- 時間複雜度: O(N × M)
- 例如: 1000 × 10000 = 1000 萬次比較
Hash Join:
- Build Phase: 遍歷 N 行,每行 O(1) 插入 → O(N)
- Probe Phase: 遍歷 M 行,每行 O(1) 查找 → O(M)
- 總時間複雜度: O(N + M)
- 例如: 1000 + 10000 = 11000 次操作
性能提升: 1000 萬 / 11000 ≈ 909 倍!
關鍵優勢:
讓我們看看 DataFusion 如何實現 Hash Join:
// datafusion/physical-plan/src/joins/hash_join/exec.rs
pub struct HashJoinExec {
/// 左側(build 側)輸入
pub left: Arc<dyn ExecutionPlan>,
/// 右側(probe 側)輸入
pub right: Arc<dyn ExecutionPlan>,
/// 連接條件:(left_col, right_col) 對
pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
/// 額外的過濾條件(非等值條件)
pub filter: Option<JoinFilter>,
/// Join 類型(INNER、LEFT、RIGHT、FULL 等)
pub join_type: JoinType,
/// Build 階段的 Future(用於非同步構建 Hash Table)
left_fut: Arc<OnceAsync<JoinLeftData>>,
/// Hash 函數的隨機種子
random_state: RandomState,
/// 分區模式
pub mode: PartitionMode,
// ... 其他欄位
}
關鍵設計決策:
ON a.col1 = b.col1 AND a.col2 = b.col2
)Build 階段完成後,會產生 JoinLeftData
結構:
pub(super) struct JoinLeftData {
/// Hash Table,存儲 hash 值 → 行索引的映射
pub(super) hash_map: Box<dyn JoinHashMapType>,
/// 連接後的單一 RecordBatch(包含所有 build 側數據)
batch: RecordBatch,
/// Build 側連接鍵的值(用於驗證)
values: Vec<ArrayRef>,
/// 用於追蹤哪些 build 側行已被匹配(用於 OUTER JOIN)
visited_indices_bitmap: SharedBitmapBuilder,
/// Probe 執行緒計數器
probe_threads_counter: AtomicUsize,
/// 記憶體保留(用於記憶體管理)
_reservation: MemoryReservation,
/// Build 側的邊界資訊(用於動態過濾優化)
pub(super) bounds: Option<Vec<ColumnBounds>>,
}
重要概念:
Build Phase 是 Hash Join 的第一階段,負責從 build 側構建 Hash Table。讓我們深入理解這個過程:
// 簡化的 Build Phase 實作
async fn build_hash_table(
build_input: SendableRecordBatchStream,
on_columns: &[(PhysicalExprRef, PhysicalExprRef)],
random_state: &RandomState,
) -> Result<JoinLeftData> {
// 步驟 1: 收集所有 build 側的 RecordBatch
let mut batches = Vec::new();
let mut total_rows = 0;
while let Some(batch) = build_input.next().await {
let batch = batch?;
total_rows += batch.num_rows();
batches.push(batch);
}
// 步驟 2: 合併所有 batch 成單一 RecordBatch
let merged_batch = concat_batches(&schema, &batches)?;
// 步驟 3: 評估連接鍵表達式
// 例如:如果 ON 條件是 left.id = right.id,則評估 left.id 欄位
let join_key_values: Vec<ArrayRef> = on_columns
.iter()
.map(|(left_expr, _)| left_expr.evaluate(&merged_batch)?.into_array())
.collect()?;
// 步驟 4: 創建 Hash Table
let mut hash_map = JoinHashMapU64::new();
// 步驟 5: 計算每行的 hash 值並插入 Hash Table
let mut hashes = vec![0u64; total_rows];
create_hashes(&join_key_values, random_state, &mut hashes)?;
// 步驟 6: 以反向順序插入(保持原始順序)
for row_index in (0..total_rows).rev() {
let hash_value = hashes[row_index];
hash_map.insert(hash_value, row_index);
}
// 步驟 7: 創建 visited bitmap(用於 OUTER JOIN)
let visited_bitmap = SharedBitmapBuilder::new(total_rows);
// 步驟 8: 返回 JoinLeftData
Ok(JoinLeftData::new(
Box::new(hash_map),
merged_batch,
join_key_values,
visited_bitmap,
// ... 其他欄位
))
}
這是 DataFusion 的一個巧妙設計。讓我們理解其原因:
假設有 3 個 build 側 batch:
Batch 1: Row 0, Row 1
Batch 2: Row 2, Row 3, Row 4
Batch 3: Row 5, Row 6
合併後的單一 batch:
Index: 0 1 2 3 4 5 6
Row: R0 R1 R2 R3 R4 R5 R6
如果連接鍵有重複值,例如 Row 0 和 Row 5 都是 user_id=1:
方法 A - 正向插入(0 → 6):
insert(hash(1), 0) → hash_table[hash(1)] = 0
insert(hash(1), 5) → hash_table[hash(1)] = 5 (覆蓋了 0)
使用 LIFO(後進先出)結構,查找時會先找到 index 5,再找到 index 0
結果順序: Row 5, Row 0 ❌ 順序顛倒!
方法 B - 反向插入(6 → 0):
insert(hash(1), 5) → hash_table[hash(1)] = 5
insert(hash(1), 0) → hash_table[hash(1)] = 0 (0 在 "top")
查找時會先找到 index 0,再找到 index 5
結果順序: Row 0, Row 5 ✓ 保持原始順序!
DataFusion 在註解中也說明了這一點:
// datafusion/physical-plan/src/joins/hash_join/exec.rs
/// Hash join uses LIFO data structure as a hash table, and in order to retain
/// original build-side input order while obtaining data during probe phase, hash
/// table is updated by iterating batch sequence in reverse order -- it allows to
/// keep rows with smaller indices "on the top" of hash table, and still maintain
/// correct indexing for concatenated build-side data batch.
DataFusion 支援兩種 Hash Table 實現:
pub trait JoinHashMapType {
/// 插入一個 hash 值和對應的行索引
fn insert(&mut self, hash: u64, row_index: usize);
/// 查找匹配的行索引
fn get(&self, hash: u64) -> Option<impl Iterator<Item = usize>>;
/// 返回總行數
fn size(&self) -> usize;
}
// 實現 1: 使用 u32 索引(適合較小的表,< 42 億行)
pub struct JoinHashMapU32 {
// 使用 HashMap<u64, SmallVec<[u32; 1]>>
// SmallVec 優化:如果只有 1 個匹配,不需要 heap 分配
}
// 實現 2: 使用 u64 索引(適合超大表)
pub struct JoinHashMapU64 {
// 使用 HashMap<u64, SmallVec<[u64; 1]>>
}
SmallVec 優化:
大部分情況下,每個 hash 值只對應 1 個行索引:
hash(12345) → [42] // 單一值,存儲在棧上
少數情況下,可能有多個匹配(連接鍵有重複):
hash(67890) → [10, 25, 38] // 多個值,需要 heap 分配
SmallVec<[u32; 1]> 的優勢:
- 單一值時:零 heap 分配,快速
- 多個值時:自動擴展到 heap
- 節省記憶體和提升性能
記憶體需求:
假設 build 側有 1000 萬行,每行平均 100 bytes:
1. 原始數據: 10M × 100 bytes = 1 GB
2. 合併後的 RecordBatch: 1 GB(Arrow 列式格式,高效)
3. Hash Table:
- 10M 個 hash 值(u64): 10M × 8 = 80 MB
- 10M 個索引(u32): 10M × 4 = 40 MB
- HashMap 開銷(約 1.5x): 180 MB
4. 連接鍵 values: 取決於類型,假設 80 MB
總記憶體: 1 GB + 180 MB + 80 MB ≈ 1.26 GB
結論: Hash Table 的開銷相對較小(約 20%)
時間開銷:
操作分解:
1. 收集和合併 batch: O(N),主要是記憶體拷貝
2. 評估連接鍵表達式: O(N),通常很快(簡單欄位引用)
3. 計算 hash 值: O(N),使用高效的 hash 函數(ahash)
4. 插入 Hash Table: O(N) 平均,每次插入 O(1)
總時間: O(N),線性於 build 側的行數
Probe Phase 是 Hash Join 的第二階段,負責遍歷 probe 側數據並在 Hash Table 中查找匹配:
// 簡化的 Probe Phase 實作
async fn probe_hash_table(
probe_input: SendableRecordBatchStream,
join_left_data: Arc<JoinLeftData>,
on_columns: &[(PhysicalExprRef, PhysicalExprRef)],
join_type: JoinType,
) -> Result<Vec<RecordBatch>> {
let mut output_batches = Vec::new();
// 遍歷 probe 側的每個 batch
while let Some(probe_batch) = probe_input.next().await {
let probe_batch = probe_batch?;
let num_rows = probe_batch.num_rows();
// 步驟 1: 評估 probe 側的連接鍵
let probe_join_keys: Vec<ArrayRef> = on_columns
.iter()
.map(|(_, right_expr)| right_expr.evaluate(&probe_batch)?.into_array())
.collect()?;
// 步驟 2: 計算 probe 側的 hash 值
let mut probe_hashes = vec![0u64; num_rows];
create_hashes(&probe_join_keys, &random_state, &mut probe_hashes)?;
// 步驟 3: 查找匹配並記錄索引
let mut build_indices = Vec::new(); // 匹配的 build 側索引
let mut probe_indices = Vec::new(); // 匹配的 probe 側索引
for probe_idx in 0..num_rows {
let hash_value = probe_hashes[probe_idx];
// 在 Hash Table 中查找匹配的 build 側行
if let Some(build_idx_iter) = join_left_data.hash_map().get(hash_value) {
for build_idx in build_idx_iter {
// 步驟 4: 驗證連接鍵是否真正相等(處理 hash 衝突)
if equal_join_keys(
&probe_join_keys,
probe_idx,
join_left_data.values(),
build_idx
)? {
// 找到匹配!
build_indices.push(build_idx);
probe_indices.push(probe_idx);
// 標記 build 側行已被訪問(用於 OUTER JOIN)
join_left_data.visited_indices_bitmap()
.set_bit(build_idx);
}
}
}
}
// 步驟 5: 根據 join 類型調整索引
let (final_build_indices, final_probe_indices) =
adjust_indices_by_join_type(
build_indices,
probe_indices,
num_rows,
join_left_data.batch().num_rows(),
join_type,
)?;
// 步驟 6: 使用索引構建輸出 batch
let output_batch = build_batch_from_indices(
join_left_data.batch(),
&probe_batch,
&final_build_indices,
&final_probe_indices,
&column_indices,
)?;
output_batches.push(output_batch);
}
Ok(output_batches)
}
讓我們通過一個具體例子來理解 Probe Phase:
Build 側(已構建 Hash Table):
Index | user_id | name
------+---------+-------
0 | 1 | Alice
1 | 2 | Bob
2 | 1 | Alice2 (重複的 user_id)
3 | 3 | Carol
Hash Table 內容(簡化表示):
hash(1) → [2, 0] // LIFO: 反向插入,所以 0 在頂部
hash(2) → [1]
hash(3) → [3]
Probe 側 batch:
Index | order_id | user_id | amount
------+----------+---------+--------
0 | 101 | 1 | 100
1 | 102 | 2 | 200
2 | 103 | 1 | 150
3 | 104 | 4 | 300
處理過程:
處理 probe_idx=0 (order_id=101, user_id=1, amount=100):
1. hash(1) = 12345
2. 查找 hash_table[12345] → 找到 [2, 0]
3. 檢查 build_idx=2:
- 比較: probe[0].user_id (1) == build[2].user_id (1) ✓
- 記錄匹配: (build_idx=2, probe_idx=0)
4. 檢查 build_idx=0:
- 比較: probe[0].user_id (1) == build[0].user_id (1) ✓
- 記錄匹配: (build_idx=0, probe_idx=0)
結果: 產生 2 個輸出行(笛卡爾積)
處理 probe_idx=1 (order_id=102, user_id=2, amount=200):
1. hash(2) = 67890
2. 查找 hash_table[67890] → 找到 [1]
3. 檢查 build_idx=1:
- 比較: probe[1].user_id (2) == build[1].user_id (2) ✓
- 記錄匹配: (build_idx=1, probe_idx=1)
結果: 產生 1 個輸出行
處理 probe_idx=2 (order_id=103, user_id=1, amount=150):
1. hash(1) = 12345
2. 查找 hash_table[12345] → 找到 [2, 0]
3. 檢查 build_idx=2:
- 比較: probe[2].user_id (1) == build[2].user_id (1) ✓
- 記錄匹配: (build_idx=2, probe_idx=2)
4. 檢查 build_idx=0:
- 比較: probe[2].user_id (1) == build[0].user_id (1) ✓
- 記錄匹配: (build_idx=0, probe_idx=2)
結果: 產生 2 個輸出行
處理 probe_idx=3 (order_id=104, user_id=4, amount=300):
1. hash(4) = 99999
2. 查找 hash_table[99999] → 未找到
3. 無匹配
結果: 對於 INNER JOIN,不產生輸出;對於 LEFT JOIN,產生 NULL 填充行
最終 build_indices = [2, 0, 1, 2, 0]
最終 probe_indices = [0, 0, 1, 2, 2]
構建輸出 batch:
order_id | user_id | name | amount
---------+---------+--------+--------
101 | 1 | Alice2 | 100 (build[2], probe[0])
101 | 1 | Alice | 100 (build[0], probe[0])
102 | 2 | Bob | 200 (build[1], probe[1])
103 | 1 | Alice2 | 150 (build[2], probe[2])
103 | 1 | Alice | 150 (build[0], probe[2])
Hash 衝突是指不同的連接鍵值產生相同的 hash 值:
範例情況:
hash(user_id=1) = 12345
hash(user_id=999) = 12345 // 衝突!
Hash Table:
hash_table[12345] → [build_idx for user_id=1, build_idx for user_id=999]
Probe 時處理:
當 probe user_id=1 時:
1. hash(1) = 12345
2. 查找 hash_table[12345] → 找到多個候選
3. **必須逐一驗證連接鍵值**:
- build[x].user_id == 1? ✓ 真正匹配
- build[y].user_id == 999? ✗ Hash 衝突,跳過
這就是為何 Probe 階段不能只依賴 hash 值,必須明確比較連接鍵的實際值。
Probe Phase 天然適合並行化:
場景: probe 側有 4 個分區
分區 0 (獨立執行緒 0): ┐
probe_batch_0 → 查找 hash_table → output_batch_0
│
分區 1 (獨立執行緒 1): │ 所有執行緒共享
probe_batch_1 → 查找 hash_table → output_batch_1 同一個 Hash Table
│
分區 2 (獨立執行緒 2): │ (只讀操作,無競爭)
probe_batch_2 → 查找 hash_table → output_batch_2
│
分區 3 (獨立執行緒 3): ┘
probe_batch_3 → 查找 hash_table → output_batch_3
優勢:
- Hash Table 構建一次,多個執行緒共享(節省記憶體)
- Probe 操作只讀,無需鎖(高並行度)
- 線性擴展性: 4 個核心 ≈ 4 倍速度
注意事項:
對於需要追蹤 visited bitmap 的 OUTER JOIN,需要原子操作來更新 bitmap,但這個開銷很小:
// 使用原子操作設置 bitmap
join_left_data.visited_indices_bitmap().set_bit(build_idx);
// 內部使用 atomic compare-and-swap
fn set_bit(&self, index: usize) {
let byte_index = index / 8;
let bit_mask = 1 << (index % 8);
// 原子操作,多執行緒安全
self.bitmap[byte_index].fetch_or(bit_mask, Ordering::Relaxed);
}
Build 側的選擇對性能有巨大影響:
場景: 小表 (1000 行) JOIN 大表 (100 萬行)
選擇 A - 小表做 Build,大表做 Probe:
Build Phase: 1000 行 → Hash Table 占用 ~10 KB
Probe Phase: 100 萬行 × O(1) 查找 = 100 萬次操作
總記憶體: 10 KB
總時間: 約 100 ms
選擇 B - 大表做 Build,小表做 Probe:
Build Phase: 100 萬行 → Hash Table 占用 ~10 MB
Probe Phase: 1000 行 × O(1) 查找 = 1000 次操作
總記憶體: 10 MB (1000 倍差異!)
總時間: 約 200 ms
結論: 應該選擇小表做 Build 側
DataFusion 的物理規劃器會在生成執行計劃時選擇 Build 側:
// 簡化的選擇邏輯
fn should_swap_join_inputs(
left_stats: &Statistics,
right_stats: &Statistics,
join_type: JoinType,
) -> bool {
// 規則 1: 某些 join 類型不能交換
if !join_type.supports_swap() {
return false;
}
// 規則 2: 如果有明確的行數統計,選擇較小的做 build
if let (Some(left_rows), Some(right_rows)) =
(left_stats.num_rows, right_stats.num_rows)
{
return left_rows > right_rows; // 如果左側更大,交換
}
// 規則 3: 使用總字節數估計
if let (Some(left_bytes), Some(right_bytes)) =
(left_stats.total_byte_size, right_stats.total_byte_size)
{
return left_bytes > right_bytes;
}
// 規則 4: 無統計資訊時,保持原樣
false
}
實際案例:
-- 原始查詢
SELECT *
FROM large_orders o
JOIN small_users u ON o.user_id = u.user_id;
-- 優化器決策:
-- large_orders: 1000 萬行,1 GB
-- small_users: 10 萬行,10 MB
-- 物理計劃(可能):
HashJoinExec
├─ small_users (Build 側)
└─ large_orders (Probe 側)
-- 如果優化器判斷 large_orders 更小,會自動交換:
HashJoinExec
├─ large_orders (Build 側)
└─ small_users (Probe 側)
不是所有 Join 類型都能自由交換 Build 和 Probe 側:
INNER JOIN:
A JOIN B == B JOIN A
✓ 可以交換
LEFT OUTER JOIN:
A LEFT JOIN B != B LEFT JOIN A
✗ 不能直接交換
但可以轉換: A LEFT JOIN B == B RIGHT JOIN A
✓ 可以通過改變 Join 類型來交換
RIGHT OUTER JOIN:
A RIGHT JOIN B == B LEFT JOIN A
✓ 可以通過改變 Join 類型來交換
FULL OUTER JOIN:
A FULL JOIN B == B FULL JOIN A
✓ 可以交換
SEMI/ANTI JOIN:
有特定的語義,需要謹慎處理
INNER JOIN 只輸出兩側都有匹配的行:
Build 側: Probe 側:
id | name id | city
---+------- ---+----------
1 | Alice 1 | Taipei
2 | Bob 2 | Taichung
3 | Carol 4 | Kaohsiung
INNER JOIN 處理:
Probe id=1: 在 Hash Table 找到 → 輸出 (1, Alice, Taipei)
Probe id=2: 在 Hash Table 找到 → 輸出 (2, Bob, Taichung)
Probe id=4: 在 Hash Table 未找到 → 不輸出(跳過)
結果:
id | name | city
---+-------+----------
1 | Alice | Taipei
2 | Bob | Taichung
visited_indices_bitmap: [true, true, false]
// Build 側的 id=3 未被匹配
實作要點:
LEFT OUTER JOIN 輸出所有 probe 側的行,即使沒有匹配:
Build 側: Probe 側:
id | name id | city
---+------- ---+----------
1 | Alice 1 | Taipei
2 | Bob 2 | Taichung
4 | Kaohsiung
LEFT OUTER JOIN 處理:
Probe id=1: 在 Hash Table 找到 → 輸出 (1, Taipei, Alice)
Probe id=2: 在 Hash Table 找到 → 輸出 (2, Taichung, Bob)
Probe id=4: 在 Hash Table 未找到 → 輸出 (4, Kaohsiung, NULL)
結果:
id | city | name
---+-----------+-------
1 | Taipei | Alice
2 | Taichung | Bob
4 | Kaohsiung | NULL
// 注意: probe 側的 id=4 雖然沒有匹配,仍然輸出
實作要點:
RIGHT OUTER JOIN 輸出所有 build 側的行,即使沒有匹配:
Build 側: Probe 側:
id | name id | city
---+------- ---+----------
1 | Alice 1 | Taipei
2 | Bob 4 | Kaohsiung
3 | Carol
RIGHT OUTER JOIN 處理:
Probe Phase:
Probe id=1: 找到匹配 → 輸出 (1, Alice, Taipei)
Probe id=4: 未找到匹配 → 跳過
Final Phase(處理未匹配的 build 側行):
Build id=2: visited_bitmap[1] = true → 已匹配,跳過
Build id=3: visited_bitmap[2] = false → 未匹配,輸出 (3, Carol, NULL)
結果:
id | name | city
---+-------+----------
1 | Alice | Taipei
2 | Bob | NULL (未在 probe 側找到)
3 | Carol | NULL (未在 probe 側找到)
// 注意: 需要在 Probe 結束後,額外輸出未匹配的 build 行
實作要點:
FULL OUTER JOIN 輸出兩側的所有行,無論是否匹配:
Build 側: Probe 側:
id | name id | city
---+------- ---+----------
1 | Alice 1 | Taipei
2 | Bob 4 | Kaohsiung
3 | Carol
FULL OUTER JOIN 處理:
Probe Phase:
Probe id=1: 找到匹配 → 輸出 (1, Alice, Taipei)
Probe id=4: 未找到 → 輸出 (4, NULL, Kaohsiung)
Final Phase(處理未匹配的 build 側行):
Build id=2: visited_bitmap[1] = false → 輸出 (2, Bob, NULL)
Build id=3: visited_bitmap[2] = false → 輸出 (3, Carol, NULL)
結果:
id | name | city
---+-------+----------
1 | Alice | Taipei
2 | Bob | NULL
3 | Carol | NULL
4 | NULL | Kaohsiung
實作要點:
// datafusion/physical-plan/src/joins/utils.rs
pub fn adjust_indices_by_join_type(
mut build_indices: Vec<usize>,
mut probe_indices: Vec<usize>,
probe_batch_size: usize,
build_batch_size: usize,
join_type: JoinType,
) -> Result<(UInt64Array, UInt32Array)> {
match join_type {
JoinType::Inner => {
// 直接返回找到匹配的索引對
// 無需額外處理
}
JoinType::Left => {
// 檢查每個 probe 行是否有匹配
let mut matched_probe = vec![false; probe_batch_size];
for &probe_idx in &probe_indices {
matched_probe[probe_idx] = true;
}
// 為未匹配的 probe 行添加 (NULL, probe_idx)
for probe_idx in 0..probe_batch_size {
if !matched_probe[probe_idx] {
build_indices.push(NULL_INDEX); // 特殊值表示 NULL
probe_indices.push(probe_idx);
}
}
}
JoinType::Right => {
// Probe Phase 正常處理
// Final Phase 由調用方處理 visited_indices_bitmap
}
JoinType::Full => {
// 結合 Left 和 Right 的邏輯
// ...
}
// 其他 Join 類型...
}
Ok((
UInt64Array::from(build_indices),
UInt32Array::from(probe_indices),
))
}
Hash Join 的記憶體使用主要來自 Build 側:
記憶體組成:
1. Build 側合併 RecordBatch: 取決於數據大小
2. Hash Table: 約為 Build 側數據的 15-20%
3. Visited Bitmap (OUTER JOIN): 每行 1 bit
優化策略:
- 使用較小的表做 Build 側(優化器自動選擇)
- 使用 u32 索引替代 u64(適用於 < 42 億行)
- 使用 SmallVec 減少小型匹配列表的 heap 分配
- 啟用記憶體限制,防止 OOM
如果 Build 側太大無法放入記憶體,有幾種策略:
策略 1: Grace Hash Join(優雅 Hash Join)
將兩個輸入表分區(partition):
1. 按連接鍵的 hash 值將兩個表分成 N 個分區
2. 對每個分區獨立執行 Hash Join
3. 每個分區可以放入記憶體
好處: 即使總數據很大,每個分區都很小
策略 2: Hybrid Hash Join(混合 Hash Join)
部分分區在記憶體中,部分在磁碟上:
1. 盡可能多的分區放在記憶體中處理
2. 其餘分區 spill 到磁碟
3. 第二輪從磁碟讀取並處理
好處: 充分利用可用記憶體,減少磁碟 I/O
DataFusion 當前主要使用第一種策略,通過 PartitionMode
控制:
pub enum PartitionMode {
/// 收集整個 build 側到單一分區(適合小表)
CollectLeft,
/// 兩側都分區(適合大表)
Partitioned,
/// 自動選擇
Auto,
}
CollectLeft 模式:
Build 側(單一分區): Probe 側(多分區):
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
│All │ │ P0 │ │ P1 │ │ P2 │ │ P3 │
│Data │ └─────┘ └─────┘ └─────┘ └─────┘
└─────┘ │ │ │ │
│ ▼ ▼ ▼ ▼
│ ┌─────────────────────────────┐
└────────────────────►│ Shared Hash Table │
│ (由所有 probe 執行緒共享) │
└─────────────────────────────┘
優勢: Probe 階段完全並行,無資料交換
適用: Build 側較小(可放入記憶體)
Partitioned 模式:
Build 側(分區): Probe 側(分區):
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
│ P0 │ │ P1 │ │ P2 │ │ P0 │ │ P1 │ │ P2 │
└─────┘ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐ (相同 partition 編號的
│Hash0│ │Hash1│ │Hash2│ 數據一起處理)
└─────┘ └─────┘ └─────┘
步驟:
1. 按連接鍵 hash 重分區兩個輸入
2. 每個分區獨立執行 Hash Join
3. 無需全局同步
優勢: 適合大型 Build 側,每個分區獨立
適用: 兩個輸入都很大
準確的統計資訊對 Hash Join 性能很重要:
-- 場景 1: 有準確統計
-- 優化器知道 users 只有 10 萬行,orders 有 1000 萬行
SELECT * FROM orders o JOIN users u ON o.user_id = u.user_id;
-- 物理計劃:
HashJoinExec
├─ users (Build 側, 10 萬行) ← 正確選擇
└─ orders (Probe 側, 1000 萬行)
-- 場景 2: 沒有統計(或統計過時)
-- 優化器無法判斷哪個表更小
SELECT * FROM orders o JOIN users u ON o.user_id = u.user_id;
-- 物理計劃(可能):
HashJoinExec
├─ orders (Build 側, 1000 萬行) ← 錯誤選擇!記憶體暴增
└─ users (Probe 側, 10 萬行)
解決方法:
-- 更新統計資訊
ANALYZE TABLE orders;
ANALYZE TABLE users;
今天我們深入探討了 Hash Join 算子的原理和實現:
兩階段執行模型:
Build 側與 Probe 側的選擇:
Hash Table 的設計:
HashMap<hash_value, Vec<row_index>>
結構OnceAsync<JoinLeftData>
允許多個 probe 執行緒共享同一個 Hash TableMemoryReservation
追蹤記憶體使用,支援 SpillingCollectLeft
適合小表,Partitioned
適合大表Hash Join 是實現高效 Join 操作的基礎。然而,它並非萬能——在某些場景下(如輸入已排序、記憶體嚴重受限),Sort-Merge Join 可能是更好的選擇。因此明天我們將探討另一種重要的 Join 算法 -- Sort-Merge Join,並學習優化器如何在不同 Join 策略之間做出選擇。