重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
還記得上一篇的咖啡店故事嗎?老闆從只想看訂單數量,進化到想要看訂單明細(orders_detail),最後我們在 Lambda 架構的 Serving Layer 用複雜的 SQL JOIN 解決了問題。
但故事還沒結束。隨著生意越來越好,新的痛點出現了:
SELECT
o.order_id,
o.total_amount,
c.customer_name,
c.customer_type,
od.product_name,
od.quantity
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id -- JOIN 客戶表
JOIN orders_detail od ON o.order_id = od.order_id -- JOIN 訂單明細
WHERE o.order_time >= NOW() - INTERVAL 1 HOUR;
這個直覺來自我們熟悉的 Web 開發思維模式:
# 熟悉的 Web API 開發流程
@app.route('/api/orders/<order_id>')
def get_order_detail(order_id):
# 直接在資料庫做 JOIN - 這是我們最習慣的做法!
result = db.query("""
SELECT
o.order_id,
o.total_amount,
c.customer_name,
c.customer_type,
p.product_name,
p.price
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE o.id = %s
""", order_id)
return result
但是在 Stream 處理中,我們沒辦法在流處理引擎裡直接寫 SQL JOIN,所以工程師的大腦會很自然地想:
「既然不能在流處理裡寫 JOIN,那我就在流處理中去查資料庫,手動把資料 JOIN 起來!」
Web 開發(資料庫 JOIN):
HTTP Request → 一次 SQL 查詢(JOIN 多表) → 回傳完整資料
Stream 開發(手動 Lookup):
Stream Message → 多次資料庫查詢 → 應用層組合 → 輸出豐富資料
讓我們一步一步拆解 SimpleDataFrame 的 lookup join 實作
Message Flow with Database Lookup
┌─────────────────────────────────────────────────────────────────┐
│ Original Message │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ { "order_id": 1001, "customer_id": 123 } │ │
│ └───────────────────────────┬─────────────────────────────┘ │
└──────────────────────────────┼──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Database Lookup Process │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Step 1: Extract join_key = "customer_id" = 123 │ │
│ │ Step 2: Query DB "SELECT name, type FROM customers │ │
│ │ WHERE customer_id = 123" │ │
│ │ Step 3: Get result {"name": "Alice", "type": "VIP"} │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Enriched Message │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ "order_id": 1001, │ │
│ │ "customer_id": 123, │ │
│ │ "name": "Alice", ← Added from DB │ │
│ │ "type": "VIP" ← Added from DB │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
提醒: 這裡的 SimpleDataFrame 實作專注於展示 lookup join 的核心概念,省略了生產環境所需的連線池管理、異常處理等複雜細節。同時也簡化了前幾天實作的背壓控制、checkpoint 機制、sink 管理等流計算核心功能,重點聚焦在 database lookup 的實作邏輯上。
class SimpleDataFrame:
def __init__(self, name: str = "dataframe"):
self.name = name
# 核心:資料庫連線 - 支援 lookup join 的關鍵
self._db_connection = None
def setup_database(self, connection_string: str):
"""設定資料庫連線"""
# 建立連線
self._db_connection = create_connection(connection_string)
def lookup_join(self, table_name: str, join_key: str, select_fields: list):
"""
核心方法:添加資料庫 lookup join
範例:df.lookup_join("customers", "customer_id", ["name", "type"])
"""
# 建構查詢 SQL
fields_sql = ", ".join(select_fields)
query_sql = f"SELECT {fields_sql} FROM {table_name} WHERE {join_key} = ?"
# 關鍵:Function Wrapping 技術
original_process = self.process_message
def enhanced_process(message):
# 1. 先執行原本的處理邏輯
processed_message = original_process(message)
# 2. 再執行 database lookup 豐富資料
return self._perform_lookup(processed_message, query_sql, join_key)
# 3. 替換處理函數 - 核心魔法!
self.process_message = enhanced_process
return self
def _perform_lookup(self, message, query_sql, join_key):
"""執行資料庫查詢並合併結果"""
join_value = message.get(join_key)
if not join_value:
return message
# 查詢資料庫
result = execute_query(self._db_connection, query_sql, join_value)
# 合併結果到原始訊息
if result:
return {**message, **result} # 字典合併
return message
lookup_join 方法的精髓在於函數包裝技術,讓我們用圖解說明:
Function Wrapping Process
┌─────────────────────────────────────────────────────────────────┐
│ Original DataFrame │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ process_message(msg) { │ │
│ │ // Original processing logic │ │
│ │ 1. Apply filters │ │
│ │ 2. Send to sinks │ │
│ │ return success │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Function Wrapping
▼
┌─────────────────────────────────────────────────────────────────┐
│ Enhanced DataFrame │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ enhanced_process_message(msg) { │ │
│ │ // Enhanced processing logic │ │
│ │ 1. Call original_process(msg) │ │
│ │ 2. Perform database lookup ← NEW! │ │
│ │ 3. Merge results into message ← NEW! │ │
│ │ return enriched_message │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
關鍵程式碼片段解析:
# 1. 保存原始處理函數
original_process = self.process_message
# 2. 定義增強版處理函數
def enhanced_process(message):
# 先執行原本的處理流程(過濾、轉換等)
processed_message = original_process(message)
# 再執行 lookup,豐富處理後的訊息
return self._perform_lookup(processed_message, query_sql, join_key)
# 3. 替換處理函數 - 這是關鍵魔法!
self.process_message = enhanced_process
Complete Message Processing Flow
┌─────────────────────────────────────────────────────────────────┐
│ Step 1: Message Arrives │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ {"order_id": 1001, "customer_id": 123, "amount": 85} │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 2: Enhanced Process Message Called │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ enhanced_process_message(message) │ │
│ │ 1. First call original_process(message) │ │
│ │ 2. Apply existing filters, transformations │ │
│ │ 3. Then perform database lookup │ │
│ │ 4. Extract customer_id = 123 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 3: Database Lookup Process │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ SQL: SELECT name, type FROM customers WHERE id=123 │ │
│ │ Result: {"name": "Alice", "type": "VIP"} │ │
│ │ Merge with processed message │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 4: Final Enriched Message │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ "order_id": 1001, │ │
│ │ "customer_id": 123, │ │
│ │ "amount": 85, │ │
│ │ "name": "Alice", ← Added by lookup │ │
│ │ "type": "VIP" ← Added by lookup │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
想像一個簡單的訂單處理系統:
資料來源:
orders
:訂單基本資料 {"order_id": 1001, "customer_id": 123}
customers
:客戶資料流 {"customer_id": 123, "name": "Alice", "type": "VIP"}
customers
表:透過另一個 streaming pipeline 從 customers topic 同步過來目標:
將訂單流中的客戶資訊即時豐富化,產出完整的訂單資料
系統架構:
Kafka customers topic → Streaming Pipeline A → PostgreSQL customers table
↓ lookup
Kafka orders topic → Streaming Pipeline B → enriched orders output
Pipeline A: 客戶資料同步(背景執行)
# 將客戶資料從 Kafka 同步到 PostgreSQL
sync_engine = SimpleStreamingEngine("customer-sync")
customer_df = sync_engine.dataframe(source=customers_kafka_source)
customer_df.sink(postgres_customers_table) # 持續同步客戶資料
Pipeline B: 訂單豐富化(主要邏輯)
# 訂單豐富化 streaming pipeline
sync_engine = SimpleStreamingEngine("order-enrichment")
# 建立 DataFrame 並設定資料庫
df = sync_engine.dataframe(source=orders_kafka_source)
df.setup_database("postgresql://localhost/orders_db")
# 執行 lookup join
enriched_df = df.lookup_join("customers", "customer_id", ["name", "type", "email"])
# 輸出到目標
enriched_df.sink(output_sink)
執行流程
系統運作時,每當有新的訂單訊息流入:
原始訊息進入:{"order_id": 1001, "customer_id": 123}
執行 Lookup:
SELECT name, type, email FROM customers WHERE customer_id = 123
資料合併:
{
"order_id": 1001,
"customer_id": 123,
"name": "Alice Chen", ← 來自資料庫
"type": "VIP", ← 來自資料庫
"email": "alice@email.com" ← 來自資料庫
}
輸出豐富化結果:完整的訂單資料被送到下游系統
開發體驗:像寫 SQL JOIN 一樣簡單,但在 Streaming 中執行
立即可用:不需要改變現有的資料庫結構
彈性擴展:可以輕鬆添加多個 lookup join
# 進階範例:多重 lookup join
df.lookup_join("customers", "customer_id", ["name", "type"])
.lookup_join("products", "product_id", ["product_name", "price"])
.filter(lambda msg: msg.get("type") == "VIP")
.sink(output_sink)
這樣簡潔的 API 設計讓開發者可以用熟悉的鏈式語法,快速建構複雜的資料豐富化流程。
處理延遲分析:
在高吞吐場景下,這個延遲可能無法接受,但大部分 streaming pipeline 是可接受秒級的
這是 streaming lookup join 最棘手的問題:當資料庫中的參考資料更新時,會有一段時間窗口內,新流入的訊息還是會查到舊資料。
讓我們用具體場景說明這個問題:
Data Consistency Gap Timeline
┌─────────────────────────────────────────────────────────────────┐
│ T0: Database State │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ products table: {"product_id": 123, "price": 85} │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Price Update Event
▼
┌─────────────────────────────────────────────────────────────────┐
│ T1: Price Update (takes 500ms to propagate) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ products table: {"product_id": 123, "price": 90} │ │
│ │ Status: Update in progress... │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Order arrives during update window
▼
┌─────────────────────────────────────────────────────────────────┐
│ T1.1: Order Processing (within consistency gap) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Incoming: {"order_id": 1001, "product_id": 123} │ │
│ │ Lookup result: price = 85 ← Wrong! Should be 90 │ │
│ │ Output: Wrong pricing in enriched order │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ Update completes
▼
┌─────────────────────────────────────────────────────────────────┐
│ T2: Consistency Restored │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ All new orders will get correct price = 90 │ │
│ │ But orders processed during T1-T1.5 have wrong data │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
常見解決方案
1. 接受最終一致性
2. 雙向觸發機制(Delta Join)
# 不只訂單流觸發 lookup,產品價格更新也觸發重算
price_update_stream.process(lambda msg:
reprocess_recent_orders(msg.product_id, msg.new_price))
這個挑戰提醒我們:streaming lookup join 雖然開發便利,但在對資料一致性要求嚴格的場景下,需要仔細設計容錯機制。
這次我們從「老闆要更多細節」的需求出發,一路走到「JOIN 太慢怎麼辦」的實戰。
我們採用了 Database Lookup Join 的方案,在 Streaming Pipeline 中即時查詢資料庫來豐富資料,實現了查詢前就先把資料「攤平」。
開發體驗極佳:語法直覺,像寫 SQL JOIN 一樣簡單
立即可用:不需要改動現有資料庫結構,今天寫完今天上線
彈性高:可以輕鬆添加多重 lookup,支援複雜的資料豐富化
維護簡單:邏輯集中在 streaming pipeline,容易除錯和監控
延遲增加: Lookup 雖方便,但每筆資料都要去查一次資料庫,對高吞吐系統來說,延遲很容易累積。
資料一致性窗口期: Lookup 依賴當下的資料庫狀態,資料更新延遲會直接反映在結果裡,甚至可能出現錯誤的資訊。
這些挑戰讓我們開始思考:「既然 Database Lookup 有延遲和一致性問題,能不能把 JOIN 完全搬到 Stream 層來做?讓兩個資料流直接在記憶體中 JOIN,徹底避開資料庫瓶頸?」
在這次的經驗中,技術總監腦中閃過一個大膽的念頭:
「如果資料的所有轉換、聚合、JOIN,都在 Streaming 層就完成,那我們的查詢是不是能快到飛起?」
於是,一場從 Lambda 架構向 Kappa 架構演進悄悄展開。