iT邦幫忙

2025 iThome 鐵人賽

DAY 14
2

上一篇我們征服了「流與流的邂逅」- Streaming Join。今天讓我們探討另一個流處理的核心操作:Streaming GroupBy

如果說 JOIN 是「兩個流的配對」,那麼 GROUP BY 就是「一個流的自我整理」。

重新認識 GROUP BY

傳統 SQL GROUP BY:批次聚合

在 SQL 世界裡,GROUP BY 是這樣的:

SELECT 
    customer_id,
    COUNT(*) as order_count
FROM orders 
GROUP BY customer_id;

特點

  • 一次性處理完整的資料集
  • 等所有資料都讀完才計算結果
  • 結果是靜態的,不會變化

Streaming GROUP BY:持續聚合

但在流處理中,情況完全不同:

# 概念示意
customer_counts = (orders_stream
    .group_by("customer_id")
    .count()
)

挑戰

  • 資料持續到達,沒有「結束」
  • 計數結果需要持續更新
  • 需要在記憶體中維護每個群組的狀態

Streaming GROUP BY 的核心概念

問題場景:咖啡店訂單統計

想像一個咖啡店的訂單流,我們想要即時統計每個客戶的訂單數量:

事件流:
09:00:10 - {"customer_id": "C001", "product": "拿鐵", "amount": 120}
09:00:25 - {"customer_id": "C002", "product": "美式", "amount": 80}
09:00:40 - {"customer_id": "C001", "product": "蛋糕", "amount": 150}  # C001 第2筆
09:01:15 - {"customer_id": "C003", "product": "卡布", "amount": 100}
09:01:30 - {"customer_id": "C001", "product": "餅乾", "amount": 50}   # C001 第3筆

期望輸出:
{"customer_id": "C001", "count": 1}  # 第1筆訂單後
{"customer_id": "C002", "count": 1}  # C002 首次訂單
{"customer_id": "C001", "count": 2}  # C001 第2筆訂單
{"customer_id": "C003", "count": 1}  # C003 首次訂單
{"customer_id": "C001", "count": 3}  # C001 第3筆訂單

核心挑戰:狀態管理

不同於批次處理,流處理需要在記憶體中持續維護每個群組的計數狀態:

# 概念示意的內部狀態
group_counts = {
    "C001": 3,  # 客戶C001已下3筆訂單
    "C002": 1,  # 客戶C002已下1筆訂單
    "C003": 1,  # 客戶C003已下1筆訂單
}

每當新事件到達:

  1. 提取群組鍵:從事件中取出 customer_id
  2. 更新計數:將該群組的計數 +1
  3. 輸出結果:發送更新後的計數

實作:Simple Streaming 的 GroupBy

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。

為了讓本日重點更突出,我們會適度簡化前幾天的程式碼實作細節,主要聚焦在今天的機制講解。

GroupBy 處理架構

步驟一:建立狀態存儲
┌─────────────────────────────┐
│ GroupByOperation            │
│ group_key = "customer_id"   │
│ group_counts = {}           │
└─────────────────────────────┘

步驟二:事件處理三階段
Event: {"customer_id": "C001", "product": "拿鐵"}
         │
         ▼
┌─────────────────────────────────────┐
│ Phase1: Extract Key: "C001"         │
│ Phase2: Update State: counts++      │
│ Phase3: Output Result               │
└─────────────────────────────────────┘
         │
         ▼
Result: {"group_key": "C001", "count": 1}

步驟三:API 設計
orders_df.group_by("customer_id").count()
    │              │                │
    │              │                └─ 步驟四:攔截處理
    │              └─ GroupedDataFrame
    └─ SimpleDataFrame

步驟四:事件攔截機制
Original Flow:    Event ──→ [Filter] ──→ [Sink]
                              │
Enhanced Flow:    Event ──→ [Filter] ──→ [GroupBy] ──→ Result DataFrame
                              │              │
                              └─ Original ───┘

關鍵組件

  • 狀態存儲:記憶體中維護每個群組的計數
  • 事件處理:提取鍵值、更新計數、生成結果
  • 結果輸出:每次更新都即時輸出最新計數

讓我們為框架添加最基本的 GroupBy 功能:

步驟一:GroupBy 類別的核心狀態

首先建立狀態存儲:

class GroupByOperation:
    def __init__(self, group_key):
        self.group_key = group_key
        # 核心狀態:group_value -> count
        self.group_counts = defaultdict(int)

設計要點

  • 使用 defaultdict(int) 自動處理新群組,初始值為 0
  • 支援任意數量的群組,動態擴展

步驟二:事件處理的三階段

每個事件到達時的處理流程:

def process_event(self, event) -> dict:
    # 階段1:提取群組鍵
    group_value = str(event.get(self.group_key))
    
    # 階段2:更新計數
    self.group_counts[group_value] += 1
    
    # 階段3:生成結果
    return {
        "group_key": group_value,
        "count": self.group_counts[group_value]
    }

處理邏輯

  1. 提取鍵值:從事件中取出分組欄位
  2. 增量更新:對該群組計數 +1
  3. 即時輸出:返回最新的計數結果

步驟三:建立 GroupBy API

接下來建立使用者介面:

class SimpleDataFrame:
    def group_by(self, key) -> 'GroupedDataFrame':
        return GroupedDataFrame(self, key)

class GroupedDataFrame:
    def __init__(self, source_df, group_key):
        self.source_df = source_df
        self.group_key = group_key

API 設計

  • group_by() 返回 GroupedDataFrame,還不執行聚合
  • 延遲執行:等到呼叫 count() 才真正開始處理

步驟四:count() 方法的實現

def count(self) -> 'SimpleDataFrame':
    # 1. 創建 GroupBy 操作實例
    self.groupby_op = GroupByOperation(self.group_key)
    
    # 2. 創建結果 DataFrame
    result_df = SimpleDataFrame(f"{self.source_df.name}_count")
    
    # 3. 攔截原有處理邏輯
    original_process = self.source_df.process_message
    
    def enhanced_process(message):
        # 先執行原邏輯(filter 等)
        original_process(message)
        
        # 再執行聚合
        result = self.groupby_op.process_event(message)
        result_df.process_message(result)
    
    self.source_df.process_message = enhanced_process
    return result_df

關鍵機制

  • 事件攔截:在原有處理後加入聚合邏輯
  • 結果分離:聚合結果送到新的 DataFrame
  • 鏈式呼叫:返回的 DataFrame 可繼續操作

實際運作流程

使用方式

首先看看如何使用這個 GroupBy API:

# 建立資料流
orders_df = engine.dataframe(source=orders_source)

# 設定 GroupBy 計數
customer_counts = (orders_df
    .group_by("customer_id")  # 按客戶 ID 分組
    .count()                  # 計算每組的事件數量
)

處理流程追蹤

讓我們追蹤一個完整的例子:

# 事件序列
events = [
    {"customer_id": "C001", "product": "拿鐵"},
    {"customer_id": "C002", "product": "美式"}, 
    {"customer_id": "C001", "product": "蛋糕"},
]

第一個事件處理:

event = {"customer_id": "C001", "product": "拿鐵"}

# 1. 提取分組鍵
group_value = "C001"

# 2. 更新狀態
group_counts["C001"] += 1  # 0 + 1 = 1

# 3. 生成結果
result = {"group_key": "C001", "count": 1}

# 4. 輸出:C001 的第1筆訂單

第二個事件處理:

event = {"customer_id": "C002", "product": "美式"}

# group_counts["C002"] += 1  # 0 + 1 = 1
# result = {"group_key": "C002", "count": 1}

# 狀態:{"C001": 1, "C002": 1}

第三個事件處理:

event = {"customer_id": "C001", "product": "蛋糕"}

# group_counts["C001"] += 1  # 1 + 1 = 2  
# result = {"group_key": "C001", "count": 2}

# 狀態:{"C001": 2, "C002": 1}

架構回顧

Day 14: SimpleStreamingEngine with GroupBy

┌─────────────────┐    ┌──────────────────┐    ┌──────────────────┐
│   KafkaSource   │───►│DataFrame         │───►│PostgreSQLSink    │
│                 │    │                  │    │                  │
│ • Topic consume │    │ • Filter         │    │ • Batch insert   │
│ • Consumer pause│    │ • Lookup Join    │    │ • Timer trigger  │
│ • Offset resume │    │ • Streaming Join │    │ • Overload detect│
│                 │    │ • GroupBy        │    │                  │
└─────────────────┘    └──────────────────┘    └──────────────────┘
         ▲                                               │
         │         ┌─────────────────────────────────────┘
         │         │
         │         ▼
┌─────────────────────────────────────────────────────────────┐
│              SimpleStreamingEngine                          │
│  • Backpressure                                             │
│  • Checkpoint                                               │
│  • State Storage                                            │
└─────────────────────────────────────────────────────────────┘

新增的核心能力
Day14 新增功能

  • GroupBy:流式分組聚合,支援即時計數統計
  • 增量狀態管理:維護每個群組的聚合狀態

總結

Streaming GroupBy 是流處理中的重要聚合操作,透過狀態管理實現即時計數。

與傳統批次處理不同,流式 GroupBy 不需要等待所有資料到齊,而是每個事件到達就立即更新對應群組的統計值。這種設計讓我們能夠即時監控系統狀態、追蹤用戶行為、檢測異常模式。

透過簡潔的 API 設計和事件攔截機制,我們成功將複雜的狀態管理邏輯封裝成直觀易用的介面。

Day 15 預告

我們將擴展這個 GroupBy 實作,添加更多聚合函數(sum, avg, max, min)和窗口功能,打造一個更完整的流式聚合系統。


上一篇
【知其然,更知其所以然】Day13:流與流的邂逅 - Streaming Join 的藝術
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」14
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言