還記得我們在 Day12 說過的「狀態運算」嗎?以及之前實作的「Database Lookup Join」嗎?
今天我們要更進一步,探討真正的流處理思維:Stream-to-Stream Join。不再依賴外部資料庫查詢,而是讓兩個流直接進行事件配對,這才是流處理的精髓所在。
在關聯式資料庫裡,JOIN 是把兩個表的資料根據某個條件合併起來。但在流處理的世界裡,「表」變成了「無限的事件流」,這讓 JOIN 變得更加微妙。
Step 1: Build Phase
Table A
(smaller) ────────────────────> Hash Table
│ (key→value index)
│ │
└─ Load all data into memory │
│
Step 2: Probe Phase │
Table B │
(larger) │
│ │
├─ row1 ──────────┐ │
├─ row2 ──────────┼─ Lookup ───────────►│
├─ row3 ──────────┼─ Hash Table │
├─ ... ──────────┘ │
│ │
│ Match Found ◄───────┘
│ │
▼ │
Join Result ◄────────────┘
特點:
Stream A Processing Stream B Processing
┌─────────────────┐ ┌─────────────────┐
│ │ │ │
│ Event A │ │ Event B │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Store in │ │ Store in │
│ State A │ │ State B │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Query State B ──┼────────────────┼── Query State A │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Emit Result │ │ Emit Result │
│ │ │ │
└─────────────────┘ └─────────────────┘
│ │
│ State Management │
│ ┌─────────────────────────┐ │
└───►│ State Store A │◄────┘
│ Key → [timestamp,value] │
│ │
│ State Store B │
│ Key → [timestamp,value] │
└─────────────────────────┘
關鍵差異:
既然理解了理論,我們來看看如何在實際的流處理框架中實作 Streaming Join。
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
為了讓本日重點更突出,我們會適度簡化前幾天的程式碼實作細節,主要聚焦在今天的機制講解。
class SimpleDataFrame:
def __init__(self, name: str = "dataframe", sink_callback=None):
self.name = name
# Streaming Join 的核心:雙向狀態存儲
self._join_state = defaultdict(list) # key -> [events...]
self._join_partner = None # 配對的另一個 DataFrame
self._join_key = None # join 的欄位名稱
狀態存儲結構:
# _join_state 的內部結構
{
"O001": [event1, event2, ...], # 同一個 key 可能對應多個事件
"O002": [event3, event4, ...],
}
設計考量:
defaultdict(list)
支援一對多關係def join(self, other: 'SimpleDataFrame', on: str) -> 'SimpleDataFrame':
# 1. 創建新的 DataFrame 收集 join 結果
joined_df = SimpleDataFrame(f"{self.name}_join_{other.name}")
# 2. 建立雙向關聯
self._join_partner = other
other._join_partner = self
self._join_key = on
other._join_key = on
# 3. 攔截原有處理邏輯,增加 join 功能
self_original_process = self.process_message
def enhanced_process(message):
# 先執行原本的處理邏輯(filter、transform 等)
result = self_original_process(message)
# 再處理 join 邏輯
join_results = self._process_join_event(message)
for join_result in join_results:
joined_df.process_message(join_result)
return result
self.process_message = enhanced_process
return joined_df
關鍵設計原則:
設計優勢:
def _process_join_event(self, event) -> List[Dict]:
if not self._join_partner:
return []
join_value = str(event.get(self._join_key))
results = []
# 1. 存儲:把自己的事件保存到狀態中
self._join_state[join_value].append(event)
# 2. 查詢:到對方的狀態存儲找配對
partner_events = self._join_partner._join_state.get(join_value, [])
# 3. 配對:與找到的每個事件進行合併
for partner_event in partner_events:
merged = {**event, **partner_event}
results.append(merged)
return results
演算法步驟:
讓我們追蹤一個完整的例子:
# 設定 join 關係
orders_df = app.dataframe(source=orders_source)
details_df = app.dataframe(source=details_source)
joined_df = orders_df.join(details_df, on="order_id")
# 事件到達順序
1. detail = {"order_id": "O001", "product": "咖啡", "qty": 2}
2. order = {"order_id": "O001", "user_id": "U123", "total": 500}
事件 1:明細先到達
# details_df 收到明細事件後:
join_value = "O001"
# 1. 存儲:將明細事件存入自己的狀態
details_df._join_state["O001"] = [{"order_id": "O001", "product": "咖啡", "qty": 2}]
# 2. 查詢:到對方(orders_df)找相同 key 的事件
orders_df._join_state.get("O001", []) # 返回 [],還沒有訂單事件
# 3. 配對:沒找到配對,暫時無輸出
results = []
事件 2:訂單後到達
# orders_df 收到訂單事件後:
join_value = "O001"
# 1. 存儲:將訂單事件存入自己的狀態
orders_df._join_state["O001"] = [{"order_id": "O001", "user_id": "U123", "total": 500}]
# 2. 查詢:到對方(details_df)找相同 key 的事件
details_df._join_state.get("O001", [])
# 找到了![{"order_id": "O001", "product": "咖啡", "qty": 2}]
# 3. 配對:合併兩個事件
merged = {
**{"order_id": "O001", "user_id": "U123", "total": 500}, # 當前事件
**{"order_id": "O001", "product": "咖啡", "qty": 2} # 配對事件
}
# 輸出: {"order_id": "O001", "user_id": "U123", "total": 500, "product": "咖啡", "qty": 2}
results = [merged]
df1.join(df2, on="key")
直觀易懂這個實作完美展示了如何將複雜的 streaming join 邏輯封裝成簡潔的 API,讓開發者能夠專注於業務邏輯而不是底層的狀態管理細節。
Day 13: SimpleStreamingEngine with Streaming JOIN
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ KafkaSource │───►│DataFrame │───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Filter │ │ • Batch insert │
│ • Consumer pause│ │ • Lookup Join │ │ • Timer trigger │
│ • Offset resume │ │ • Streaming Join │ │ • Overload detect│
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ • Backpressure │
│ • Checkpoint │
│ • State Storage │
└─────────────────────────────────────────────────────────────┘
新增的核心能力:
Day10 新增功能
Day13 新增功能
Streaming Join 是流處理從「無腦轉發」邁向「智能關聯」的關鍵技術。
通過雙向狀態管理,我們讓兩個獨立的事件流能夠互相感知、等待配對,最終實現了真正的流與流邂逅。這不僅解決了事件亂序的問題,更重要的是擺脫了對外部資料庫的依賴,讓數據處理完全在流的世界中完成。
如果說 JOIN 是「流與流的邂逅」,那麼 GROUP BY 就是「流的自我整理」。
在 Day14 中,我們將探討如何實作 GroupBy 功能
敬請期待!