iT邦幫忙

2025 iThome 鐵人賽

DAY 10
2
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 10

【知其然,更知其所以然】Day 10:Join 太慢?Streaming 幫你提前攤平

  • 分享至 

  • xImage
  •  

回顧:上篇的痛點

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。

還記得上一篇的咖啡店故事嗎?老闆從只想看訂單數量,進化到想要看訂單明細(orders_detail),最後我們在 Lambda 架構的 Serving Layer 用複雜的 SQL JOIN 解決了問題。

但故事還沒結束。隨著生意越來越好,新的痛點出現了:

咖啡店的新挑戰

  • 咖啡店每小時 1000 筆訂單
  • 每次老闆查看儀表板都要執行:
SELECT
    o.order_id,
    o.total_amount,
    c.customer_name,
    c.customer_type,
    od.product_name,
    od.quantity
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id -- JOIN 客戶表
JOIN orders_detail od ON o.order_id = od.order_id -- JOIN 訂單明細
WHERE o.order_time >= NOW() - INTERVAL 1 HOUR;

問題分析

  • 高峰時期 50 個並發查詢 = 每秒 50 次三表 JOIN!
  • 資料庫 CPU: 35% → 90%
  • 查詢延遲: 200ms → 5000ms
    這時候,技術總監皺著眉頭說:「每次查詢都要做這麼複雜的 JOIN,資料庫快撐不住了。能不能把 JOIN 的工作提前做,讓查詢變簡單?」

工程師的直覺反應:「Streaming 裡面 JOIN Database!」

為什麼會有這個直覺?

這個直覺來自我們熟悉的 Web 開發思維模式:

# 熟悉的 Web API 開發流程
@app.route('/api/orders/<order_id>')
def get_order_detail(order_id):
    # 直接在資料庫做 JOIN - 這是我們最習慣的做法!
    result = db.query("""
    SELECT
    o.order_id,
    o.total_amount,
    c.customer_name,
    c.customer_type,
    p.product_name,
    p.price
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    JOIN products p ON o.product_id = p.product_id
    WHERE o.id = %s
    """, order_id)
    return result

思維轉移:從 HTTP Request 到 Stream Message

但是在 Stream 處理中,我們沒辦法在流處理引擎裡直接寫 SQL JOIN,所以工程師的大腦會很自然地想:
「既然不能在流處理裡寫 JOIN,那我就在流處理中去查資料庫,手動把資料 JOIN 起來!」

思維轉移過程

Web 開發(資料庫 JOIN):

HTTP Request → 一次 SQL 查詢(JOIN 多表) → 回傳完整資料

Stream 開發(手動 Lookup):

Stream Message → 多次資料庫查詢 → 應用層組合 → 輸出豐富資料

核心流程:理解 Lookup Join 的運作機制

讓我們一步一步拆解 SimpleDataFrame 的 lookup join 實作

Message Flow with Database Lookup
┌─────────────────────────────────────────────────────────────────┐
│                    Original Message                             │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ { "order_id": 1001, "customer_id": 123 }                │    │
│  └───────────────────────────┬─────────────────────────────┘    │
└──────────────────────────────┼──────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                Database Lookup Process                          │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ Step 1: Extract join_key = "customer_id" = 123          │    │
│  │ Step 2: Query DB "SELECT name, type FROM customers      │    │
│  │         WHERE customer_id = 123"                        │    │
│  │ Step 3: Get result {"name": "Alice", "type": "VIP"}     │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│                   Enriched Message                              │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ {                                                       │    │
│  │   "order_id": 1001,                                     │    │
│  │   "customer_id": 123,                                   │    │
│  │   "name": "Alice",        ← Added from DB               │    │
│  │   "type": "VIP"           ← Added from DB               │    │
│  │ }                                                       │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

程式架構:SimpleDataFrame 類別的核心結構

提醒: 這裡的 SimpleDataFrame 實作專注於展示 lookup join 的核心概念,省略了生產環境所需的連線池管理、異常處理等複雜細節。同時也簡化了前幾天實作的背壓控制、checkpoint 機制、sink 管理等流計算核心功能,重點聚焦在 database lookup 的實作邏輯上。

class SimpleDataFrame:
    def __init__(self, name: str = "dataframe"):
        self.name = name
        # 核心:資料庫連線 - 支援 lookup join 的關鍵
        self._db_connection = None
        
    def setup_database(self, connection_string: str):
        """設定資料庫連線"""
        # 建立連線
        self._db_connection = create_connection(connection_string)
    
    def lookup_join(self, table_name: str, join_key: str, select_fields: list):
        """
        核心方法:添加資料庫 lookup join
        
        範例:df.lookup_join("customers", "customer_id", ["name", "type"])
        """
        # 建構查詢 SQL
        fields_sql = ", ".join(select_fields)  
        query_sql = f"SELECT {fields_sql} FROM {table_name} WHERE {join_key} = ?"
        
        # 關鍵:Function Wrapping 技術
        original_process = self.process_message
        
        def enhanced_process(message):
            # 1. 先執行原本的處理邏輯
            processed_message = original_process(message)
            # 2. 再執行 database lookup 豐富資料
            return self._perform_lookup(processed_message, query_sql, join_key)
        
        # 3. 替換處理函數 - 核心魔法!
        self.process_message = enhanced_process
        return self
    
    def _perform_lookup(self, message, query_sql, join_key):
        """執行資料庫查詢並合併結果"""
        join_value = message.get(join_key)
        if not join_value:
            return message
        
        # 查詢資料庫
        result = execute_query(self._db_connection, query_sql, join_value)
        
        # 合併結果到原始訊息
        if result:
            return {**message, **result}  # 字典合併
        return message

技術解析:Function Wrapping 魔法

lookup_join 方法的精髓在於函數包裝技術,讓我們用圖解說明:

Function Wrapping Process
┌─────────────────────────────────────────────────────────────────┐
│                   Original DataFrame                            │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ process_message(msg) {                                  │    │
│  │   // Original processing logic                          │    │
│  │   1. Apply filters                                      │    │
│  │   2. Send to sinks                                      │    │
│  │   return success                                        │    │
│  │ }                                                       │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────┬───────────────────────────────────────┘
                          │ Function Wrapping
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│                Enhanced DataFrame                               │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ enhanced_process_message(msg) {                         │    │
│  │   // Enhanced processing logic                          │    │
│  │   1. Call original_process(msg)                         │    │
│  │   2. Perform database lookup  ← NEW!                    │    │
│  │   3. Merge results into message ← NEW!                  │    │
│  │   return enriched_message                               │    │
│  │ }                                                       │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

關鍵程式碼片段解析:

# 1. 保存原始處理函數
original_process = self.process_message

# 2. 定義增強版處理函數
def enhanced_process(message):
    # 先執行原本的處理流程(過濾、轉換等)
    processed_message = original_process(message)
    # 再執行 lookup,豐富處理後的訊息
    return self._perform_lookup(processed_message, query_sql, join_key)

# 3. 替換處理函數 - 這是關鍵魔法!
self.process_message = enhanced_process

完整流程:資料處理流程追蹤

Complete Message Processing Flow
┌─────────────────────────────────────────────────────────────────┐
│ Step 1: Message Arrives                                         │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ {"order_id": 1001, "customer_id": 123, "amount": 85}    │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 2: Enhanced Process Message Called                         │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ enhanced_process_message(message)                       │     │
│ │ 1. First call original_process(message)                 │     │
│ │ 2. Apply existing filters, transformations              │     │
│ │ 3. Then perform database lookup                         │     │
│ │ 4. Extract customer_id = 123                            │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 3: Database Lookup Process                                 │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ SQL: SELECT name, type FROM customers WHERE id=123      │     │
│ │ Result: {"name": "Alice", "type": "VIP"}                │     │
│ │ Merge with processed message                            │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 4: Final Enriched Message                                  │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ {                                                       │     │
│ │   "order_id": 1001,                                     │     │
│ │   "customer_id": 123,                                   │     │
│ │   "amount": 85,                                         │     │
│ │   "name": "Alice",     ← Added by lookup                │     │
│ │   "type": "VIP"        ← Added by lookup                │     │
│ │ }                                                       │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────────┘

實戰應用場景

想像一個簡單的訂單處理系統:

場景設定

資料來源:

  • Kafka 主題 orders:訂單基本資料 {"order_id": 1001, "customer_id": 123}
  • Kafka 主題 customers:客戶資料流 {"customer_id": 123, "name": "Alice", "type": "VIP"}
  • PostgreSQL customers 表:透過另一個 streaming pipeline 從 customers topic 同步過來

目標:
將訂單流中的客戶資訊即時豐富化,產出完整的訂單資料

系統架構:

Kafka customers topic → Streaming Pipeline A → PostgreSQL customers table
                                                        ↓ lookup
Kafka orders topic → Streaming Pipeline B → enriched orders output

實作步驟

Pipeline A: 客戶資料同步(背景執行)

# 將客戶資料從 Kafka 同步到 PostgreSQL
sync_engine = SimpleStreamingEngine("customer-sync")
customer_df = sync_engine.dataframe(source=customers_kafka_source)
customer_df.sink(postgres_customers_table)  # 持續同步客戶資料

Pipeline B: 訂單豐富化(主要邏輯)

# 訂單豐富化 streaming pipeline
sync_engine = SimpleStreamingEngine("order-enrichment")

# 建立 DataFrame 並設定資料庫
df = sync_engine.dataframe(source=orders_kafka_source)
df.setup_database("postgresql://localhost/orders_db")

# 執行 lookup join
enriched_df = df.lookup_join("customers", "customer_id", ["name", "type", "email"])

# 輸出到目標
enriched_df.sink(output_sink)

執行流程

系統運作時,每當有新的訂單訊息流入:

  1. 原始訊息進入{"order_id": 1001, "customer_id": 123}

  2. 執行 Lookup

    SELECT name, type, email FROM customers WHERE customer_id = 123
    
  3. 資料合併

    {
      "order_id": 1001,
      "customer_id": 123,
      "name": "Alice Chen",      ← 來自資料庫
      "type": "VIP",             ← 來自資料庫  
      "email": "alice@email.com" ← 來自資料庫
    }
    
  4. 輸出豐富化結果:完整的訂單資料被送到下游系統

核心優勢

開發體驗:像寫 SQL JOIN 一樣簡單,但在 Streaming 中執行
立即可用:不需要改變現有的資料庫結構
彈性擴展:可以輕鬆添加多個 lookup join

# 進階範例:多重 lookup join  
df.lookup_join("customers", "customer_id", ["name", "type"])
  .lookup_join("products", "product_id", ["product_name", "price"])  
  .filter(lambda msg: msg.get("type") == "VIP")
  .sink(output_sink)

這樣簡潔的 API 設計讓開發者可以用熟悉的鏈式語法,快速建構複雜的資料豐富化流程。

現實的挑戰

1. 延遲增加

處理延遲分析:

  • 純 stream 處理:~1ms
  • 單次 database lookup:~50ms

在高吞吐場景下,這個延遲可能無法接受,但大部分 streaming pipeline 是可接受秒級的

2. 資料一致性窗口期

這是 streaming lookup join 最棘手的問題:當資料庫中的參考資料更新時,會有一段時間窗口內,新流入的訊息還是會查到舊資料

讓我們用具體場景說明這個問題:

Data Consistency Gap Timeline
┌─────────────────────────────────────────────────────────────────┐
│ T0: Database State                                              │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ products table: {"product_id": 123, "price": 85}        │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │ Price Update Event
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ T1: Price Update (takes 500ms to propagate)                     │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ products table: {"product_id": 123, "price": 90}        │     │
│ │ Status: Update in progress...                           │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │ Order arrives during update window
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ T1.1: Order Processing (within consistency gap)                 │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ Incoming: {"order_id": 1001, "product_id": 123}         │     │
│ │ Lookup result: price = 85  ← Wrong! Should be 90        │     │
│ │ Output: Wrong pricing in enriched order                 │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────┬───────────────────────────────────────┘
                          │ Update completes
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│ T2: Consistency Restored                                        │
│ ┌─────────────────────────────────────────────────────────┐     │
│ │ All new orders will get correct price = 90              │     │
│ │ But orders processed during T1-T1.5 have wrong data     │     │
│ └─────────────────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────────────┘

常見解決方案

1. 接受最終一致性

  • 適用場景:對即時性要求不高的分析場景
  • 作法:定期校正歷史資料

2. 雙向觸發機制(Delta Join)

# 不只訂單流觸發 lookup,產品價格更新也觸發重算
price_update_stream.process(lambda msg: 
    reprocess_recent_orders(msg.product_id, msg.new_price))

這個挑戰提醒我們:streaming lookup join 雖然開發便利,但在對資料一致性要求嚴格的場景下,需要仔細設計容錯機制

總結

這次我們從「老闆要更多細節」的需求出發,一路走到「JOIN 太慢怎麼辦」的實戰。
我們採用了 Database Lookup Join 的方案,在 Streaming Pipeline 中即時查詢資料庫來豐富資料,實現了查詢前就先把資料「攤平」。

Database Lookup Join 的優勢

開發體驗極佳:語法直覺,像寫 SQL JOIN 一樣簡單
立即可用:不需要改動現有資料庫結構,今天寫完今天上線
彈性高:可以輕鬆添加多重 lookup,支援複雜的資料豐富化
維護簡單:邏輯集中在 streaming pipeline,容易除錯和監控

Database Lookup Join 的挑戰

延遲增加: Lookup 雖方便,但每筆資料都要去查一次資料庫,對高吞吐系統來說,延遲很容易累積。
資料一致性窗口期: Lookup 依賴當下的資料庫狀態,資料更新延遲會直接反映在結果裡,甚至可能出現錯誤的資訊。

這些挑戰讓我們開始思考:「既然 Database Lookup 有延遲和一致性問題,能不能把 JOIN 完全搬到 Stream 層來做?讓兩個資料流直接在記憶體中 JOIN,徹底避開資料庫瓶頸?」

下一篇預告:Kappa 架構,全部上 Streaming!

在這次的經驗中,技術總監腦中閃過一個大膽的念頭:
「如果資料的所有轉換、聚合、JOIN,都在 Streaming 層就完成,那我們的查詢是不是能快到飛起?」
於是,一場從 Lambda 架構向 Kappa 架構演進悄悄展開。


上一篇
【知其然,更知其所以然】Day 9: Join
下一篇
【知其然,更知其所以然】Day 11: 走向 Kappa 架構
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」14
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言