上一篇我們征服了「流與流的邂逅」- Streaming Join。今天讓我們探討另一個流處理的核心操作:Streaming GroupBy。
如果說 JOIN 是「兩個流的配對」,那麼 GROUP BY 就是「一個流的自我整理」。
在 SQL 世界裡,GROUP BY 是這樣的:
SELECT
customer_id,
COUNT(*) as order_count
FROM orders
GROUP BY customer_id;
特點:
但在流處理中,情況完全不同:
# 概念示意
customer_counts = (orders_stream
.group_by("customer_id")
.count()
)
挑戰:
想像一個咖啡店的訂單流,我們想要即時統計每個客戶的訂單數量:
事件流:
09:00:10 - {"customer_id": "C001", "product": "拿鐵", "amount": 120}
09:00:25 - {"customer_id": "C002", "product": "美式", "amount": 80}
09:00:40 - {"customer_id": "C001", "product": "蛋糕", "amount": 150} # C001 第2筆
09:01:15 - {"customer_id": "C003", "product": "卡布", "amount": 100}
09:01:30 - {"customer_id": "C001", "product": "餅乾", "amount": 50} # C001 第3筆
期望輸出:
{"customer_id": "C001", "count": 1} # 第1筆訂單後
{"customer_id": "C002", "count": 1} # C002 首次訂單
{"customer_id": "C001", "count": 2} # C001 第2筆訂單
{"customer_id": "C003", "count": 1} # C003 首次訂單
{"customer_id": "C001", "count": 3} # C001 第3筆訂單
不同於批次處理,流處理需要在記憶體中持續維護每個群組的計數狀態:
# 概念示意的內部狀態
group_counts = {
"C001": 3, # 客戶C001已下3筆訂單
"C002": 1, # 客戶C002已下1筆訂單
"C003": 1, # 客戶C003已下1筆訂單
}
每當新事件到達:
customer_id
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
為了讓本日重點更突出,我們會適度簡化前幾天的程式碼實作細節,主要聚焦在今天的機制講解。
步驟一:建立狀態存儲
┌─────────────────────────────┐
│ GroupByOperation │
│ group_key = "customer_id" │
│ group_counts = {} │
└─────────────────────────────┘
步驟二:事件處理三階段
Event: {"customer_id": "C001", "product": "拿鐵"}
│
▼
┌─────────────────────────────────────┐
│ Phase1: Extract Key: "C001" │
│ Phase2: Update State: counts++ │
│ Phase3: Output Result │
└─────────────────────────────────────┘
│
▼
Result: {"group_key": "C001", "count": 1}
步驟三:API 設計
orders_df.group_by("customer_id").count()
│ │ │
│ │ └─ 步驟四:攔截處理
│ └─ GroupedDataFrame
└─ SimpleDataFrame
步驟四:事件攔截機制
Original Flow: Event ──→ [Filter] ──→ [Sink]
│
Enhanced Flow: Event ──→ [Filter] ──→ [GroupBy] ──→ Result DataFrame
│ │
└─ Original ───┘
關鍵組件:
讓我們為框架添加最基本的 GroupBy 功能:
首先建立狀態存儲:
class GroupByOperation:
def __init__(self, group_key):
self.group_key = group_key
# 核心狀態:group_value -> count
self.group_counts = defaultdict(int)
設計要點:
defaultdict(int)
自動處理新群組,初始值為 0每個事件到達時的處理流程:
def process_event(self, event) -> dict:
# 階段1:提取群組鍵
group_value = str(event.get(self.group_key))
# 階段2:更新計數
self.group_counts[group_value] += 1
# 階段3:生成結果
return {
"group_key": group_value,
"count": self.group_counts[group_value]
}
處理邏輯:
接下來建立使用者介面:
class SimpleDataFrame:
def group_by(self, key) -> 'GroupedDataFrame':
return GroupedDataFrame(self, key)
class GroupedDataFrame:
def __init__(self, source_df, group_key):
self.source_df = source_df
self.group_key = group_key
API 設計:
group_by()
返回 GroupedDataFrame
,還不執行聚合count()
才真正開始處理def count(self) -> 'SimpleDataFrame':
# 1. 創建 GroupBy 操作實例
self.groupby_op = GroupByOperation(self.group_key)
# 2. 創建結果 DataFrame
result_df = SimpleDataFrame(f"{self.source_df.name}_count")
# 3. 攔截原有處理邏輯
original_process = self.source_df.process_message
def enhanced_process(message):
# 先執行原邏輯(filter 等)
original_process(message)
# 再執行聚合
result = self.groupby_op.process_event(message)
result_df.process_message(result)
self.source_df.process_message = enhanced_process
return result_df
關鍵機制:
首先看看如何使用這個 GroupBy API:
# 建立資料流
orders_df = engine.dataframe(source=orders_source)
# 設定 GroupBy 計數
customer_counts = (orders_df
.group_by("customer_id") # 按客戶 ID 分組
.count() # 計算每組的事件數量
)
讓我們追蹤一個完整的例子:
# 事件序列
events = [
{"customer_id": "C001", "product": "拿鐵"},
{"customer_id": "C002", "product": "美式"},
{"customer_id": "C001", "product": "蛋糕"},
]
第一個事件處理:
event = {"customer_id": "C001", "product": "拿鐵"}
# 1. 提取分組鍵
group_value = "C001"
# 2. 更新狀態
group_counts["C001"] += 1 # 0 + 1 = 1
# 3. 生成結果
result = {"group_key": "C001", "count": 1}
# 4. 輸出:C001 的第1筆訂單
第二個事件處理:
event = {"customer_id": "C002", "product": "美式"}
# group_counts["C002"] += 1 # 0 + 1 = 1
# result = {"group_key": "C002", "count": 1}
# 狀態:{"C001": 1, "C002": 1}
第三個事件處理:
event = {"customer_id": "C001", "product": "蛋糕"}
# group_counts["C001"] += 1 # 1 + 1 = 2
# result = {"group_key": "C001", "count": 2}
# 狀態:{"C001": 2, "C002": 1}
Day 14: SimpleStreamingEngine with GroupBy
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ KafkaSource │───►│DataFrame │───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Filter │ │ • Batch insert │
│ • Consumer pause│ │ • Lookup Join │ │ • Timer trigger │
│ • Offset resume │ │ • Streaming Join │ │ • Overload detect│
│ │ │ • GroupBy │ │ │
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ • Backpressure │
│ • Checkpoint │
│ • State Storage │
└─────────────────────────────────────────────────────────────┘
新增的核心能力:
Day14 新增功能
Streaming GroupBy 是流處理中的重要聚合操作,透過狀態管理實現即時計數。
與傳統批次處理不同,流式 GroupBy 不需要等待所有資料到齊,而是每個事件到達就立即更新對應群組的統計值。這種設計讓我們能夠即時監控系統狀態、追蹤用戶行為、檢測異常模式。
透過簡潔的 API 設計和事件攔截機制,我們成功將複雜的狀態管理邏輯封裝成直觀易用的介面。
我們將擴展這個 GroupBy 實作,添加更多聚合函數(sum, avg, max, min)和窗口功能,打造一個更完整的流式聚合系統。