iT邦幫忙

2025 iThome 鐵人賽

DAY 13
2
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 13

【知其然,更知其所以然】Day13:流與流的邂逅 - Streaming Join 的藝術

  • 分享至 

  • xImage
  •  

還記得我們在 Day12 說過的「狀態運算」嗎?以及之前實作的「Database Lookup Join」嗎?

今天我們要更進一步,探討真正的流處理思維:Stream-to-Stream Join。不再依賴外部資料庫查詢,而是讓兩個流直接進行事件配對,這才是流處理的精髓所在。

重新認識 JOIN

在關聯式資料庫裡,JOIN 是把兩個表的資料根據某個條件合併起來。但在流處理的世界裡,「表」變成了「無限的事件流」,這讓 JOIN 變得更加微妙。

傳統 RDB Join:Build-Probe 模式

Step 1: Build Phase
    Table A
    (smaller)         ────────────────────> Hash Table
      │                                   (key→value index)
      │                                       │
      └─ Load all data into memory            │
                                              │
Step 2: Probe Phase                           │
    Table B                                   │
    (larger)                                  │
      │                                       │
      ├─ row1 ──────────┐                     │
      ├─ row2 ──────────┼─ Lookup ───────────►│
      ├─ row3 ──────────┼─ Hash Table         │
      ├─ ...  ──────────┘                     │
      │                                       │
      │                   Match Found ◄───────┘
      │                      │
      ▼                      │
    Join Result ◄────────────┘

特點

  • 一邊 Build(建立索引),一邊 Probe(探測匹配)
  • 小表先全部讀入記憶體建 Hash Table
  • 大表逐筆掃描並探測匹配
  • 一次性操作,有明確的開始和結束

Streaming Join:雙向狀態管理

    Stream A Processing                 Stream B Processing
    ┌─────────────────┐                ┌─────────────────┐
    │                 │                │                 │
    │ Event A         │                │ Event B         │
    │      │          │                │      │          │
    │      ▼          │                │      ▼          │
    │ Store in        │                │ Store in        │
    │ State A         │                │ State B         │
    │      │          │                │      │          │
    │      ▼          │                │      ▼          │
    │ Query State B ──┼────────────────┼── Query State A │
    │      │          │                │      │          │
    │      ▼          │                │      ▼          │
    │ Emit Result     │                │ Emit Result     │
    │                 │                │                 │
    └─────────────────┘                └─────────────────┘
            │                                    │
            │        State Management            │
            │    ┌─────────────────────────┐     │
            └───►│ State Store A           │◄────┘
                 │ Key → [timestamp,value] │
                 │                         │
                 │ State Store B           │
                 │ Key → [timestamp,value] │
                 └─────────────────────────┘

關鍵差異

  • 雙向 Build + Probe:兩個流都要建立狀態存儲,也都要探測對方
  • 持續性操作:沒有明確的結束點,持續接收和處理資料
  • 時間語義:需要考慮事件的時間戳和到達順序
  • 狀態管理:需要智慧地保留和清理歷史資料

Simple Streaming 的 Streaming Join 實作解析

既然理解了理論,我們來看看如何在實際的流處理框架中實作 Streaming Join。

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。

為了讓本日重點更突出,我們會適度簡化前幾天的程式碼實作細節,主要聚焦在今天的機制講解。

第一步:狀態存儲設計

class SimpleDataFrame:
    def __init__(self, name: str = "dataframe", sink_callback=None):
        self.name = name
        # Streaming Join 的核心:雙向狀態存儲
        self._join_state = defaultdict(list)  # key -> [events...]
        self._join_partner = None             # 配對的另一個 DataFrame
        self._join_key = None                 # join 的欄位名稱

狀態存儲結構

# _join_state 的內部結構
{
  "O001": [event1, event2, ...],  # 同一個 key 可能對應多個事件
  "O002": [event3, event4, ...],
}

設計考量

  • 使用 defaultdict(list) 支援一對多關係
  • 同一個 order_id 可能有多個明細事件

第二步:join() 方法建立雙向關聯

def join(self, other: 'SimpleDataFrame', on: str) -> 'SimpleDataFrame':
    # 1. 創建新的 DataFrame 收集 join 結果
    joined_df = SimpleDataFrame(f"{self.name}_join_{other.name}")
    
    # 2. 建立雙向關聯
    self._join_partner = other
    other._join_partner = self
    self._join_key = on
    other._join_key = on
    
    # 3. 攔截原有處理邏輯,增加 join 功能
    self_original_process = self.process_message
    
    def enhanced_process(message):
        # 先執行原本的處理邏輯(filter、transform 等)
        result = self_original_process(message)
        
        # 再處理 join 邏輯
        join_results = self._process_join_event(message)
        for join_result in join_results:
            joined_df.process_message(join_result)
        
        return result
    
    self.process_message = enhanced_process
    return joined_df

關鍵設計原則

  • 建立雙向引用:兩個 DataFrame 互相知道對方
  • 事件攔截:在原有處理前加入 join 邏輯
  • 保持原有行為:不破壞 DataFrame 既有功能

設計優勢

  • 向後相容:原有 filter、sink 功能不受影響
  • 職責分離:join 結果獨立處理,不污染原 DataFrame
  • 鏈式操作:join 結果可繼續進行其他操作

第三步:核心 Join 演算法

def _process_join_event(self, event) -> List[Dict]:
    if not self._join_partner:
        return []
    
    join_value = str(event.get(self._join_key))
    results = []
    
    # 1. 存儲:把自己的事件保存到狀態中
    self._join_state[join_value].append(event)
    
    # 2. 查詢:到對方的狀態存儲找配對
    partner_events = self._join_partner._join_state.get(join_value, [])
    
    # 3. 配對:與找到的每個事件進行合併
    for partner_event in partner_events:
        merged = {**event, **partner_event}
        results.append(merged)
    
    return results

演算法步驟

  1. 存儲(Store):將當前事件按 join key 存入狀態
  2. 查詢(Query):到對方狀態存儲中找相同 key 的事件
  3. 合併(Merge):將找到的事件與當前事件合併輸出

第四步:實際運作流程

讓我們追蹤一個完整的例子:

# 設定 join 關係
orders_df = app.dataframe(source=orders_source)
details_df = app.dataframe(source=details_source)
joined_df = orders_df.join(details_df, on="order_id")

# 事件到達順序
1. detail = {"order_id": "O001", "product": "咖啡", "qty": 2}
2. order = {"order_id": "O001", "user_id": "U123", "total": 500}

事件 1:明細先到達

# details_df 收到明細事件後:

join_value = "O001"

# 1. 存儲:將明細事件存入自己的狀態
details_df._join_state["O001"] = [{"order_id": "O001", "product": "咖啡", "qty": 2}]

# 2. 查詢:到對方(orders_df)找相同 key 的事件
orders_df._join_state.get("O001", [])  # 返回 [],還沒有訂單事件

# 3. 配對:沒找到配對,暫時無輸出
results = []

事件 2:訂單後到達

# orders_df 收到訂單事件後:

join_value = "O001"

# 1. 存儲:將訂單事件存入自己的狀態
orders_df._join_state["O001"] = [{"order_id": "O001", "user_id": "U123", "total": 500}]

# 2. 查詢:到對方(details_df)找相同 key 的事件
details_df._join_state.get("O001", [])  
# 找到了![{"order_id": "O001", "product": "咖啡", "qty": 2}]

# 3. 配對:合併兩個事件
merged = {
    **{"order_id": "O001", "user_id": "U123", "total": 500},    # 當前事件
    **{"order_id": "O001", "product": "咖啡", "qty": 2}         # 配對事件
}
# 輸出: {"order_id": "O001", "user_id": "U123", "total": 500, "product": "咖啡", "qty": 2}

results = [merged]

設計優勢總結

  1. API 簡潔df1.join(df2, on="key") 直觀易懂
  2. 自動處理:狀態管理、事件路由都是自動的
  3. 不破壞原有功能:原 DataFrame 的 filter、sink 照常運作
  4. 支援鏈式操作:join 結果可以繼續 filter、sink
  5. 處理亂序:不管事件到達順序,最終都能正確配對

這個實作完美展示了如何將複雜的 streaming join 邏輯封裝成簡潔的 API,讓開發者能夠專注於業務邏輯而不是底層的狀態管理細節。

架構回顧

Day 13: SimpleStreamingEngine with Streaming JOIN

┌─────────────────┐    ┌──────────────────┐    ┌──────────────────┐
│   KafkaSource   │───►│DataFrame         │───►│PostgreSQLSink    │
│                 │    │                  │    │                  │
│ • Topic consume │    │ • Filter         │    │ • Batch insert   │
│ • Consumer pause│    │ • Lookup Join    │    │ • Timer trigger  │
│ • Offset resume │    │ • Streaming Join │    │ • Overload detect│
└─────────────────┘    └──────────────────┘    └──────────────────┘
         ▲                                               │
         │         ┌─────────────────────────────────────┘
         │         │
         │         ▼
┌─────────────────────────────────────────────────────────────┐
│              SimpleStreamingEngine                          │
│  • Backpressure                                             │
│  • Checkpoint                                               │
│  • State Storage                                            │
└─────────────────────────────────────────────────────────────┘

新增的核心能力

Day10 新增功能

  • Lookup Join:DataFrame 可查詢資料庫豐富資料

Day13 新增功能

  • Streaming Join:兩個流之間可直接進行 JOIN
  • 狀態存儲:雙向狀態管理,支援事件配對與亂序處理

總結

Streaming Join 是流處理從「無腦轉發」邁向「智能關聯」的關鍵技術。

通過雙向狀態管理,我們讓兩個獨立的事件流能夠互相感知、等待配對,最終實現了真正的流與流邂逅。這不僅解決了事件亂序的問題,更重要的是擺脫了對外部資料庫的依賴,讓數據處理完全在流的世界中完成。

下一篇預告:Day14 - Streaming GroupBy

如果說 JOIN 是「流與流的邂逅」,那麼 GROUP BY 就是「流的自我整理」。

在 Day14 中,我們將探討如何實作 GroupBy 功能

敬請期待!


上一篇
【知其然,更知其所以然】Day 12: 讓流處理擁有「記憶」- 狀態運算的秘密
下一篇
【知其然,更知其所以然】Day14:Streaming GroupBy
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」15
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言