iT邦幫忙

2025 iThome 鐵人賽

DAY 15
0
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 15

【知其然,更知其所以然】Day15:GroupBy 的進化 - 多種聚合器

  • 分享至 

  • xImage
  •  

昨天我們實現了基礎的流式 GroupBy Count,今天將進一步擴展聚合功能。我們將實現完整的聚合操作(sum、avg、max、min)。

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。

聚合器架構設計

步驟一:建立聚合器統一介面

聚合器類別繼承結構
┌─────────────────────────────────────────────────────────────┐
│                    BaseAggregator                           │
│  ┌─────────────────────────────────────────────────────┐    │
│  │ + reset(): void                                     │    │
│  │ + update(value): Any (abstract)                     │    │
│  │ + get_result(): Any (abstract)                      │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────┬───────────────────────────────────┘
                          │ (繼承)
                          ▼
┌─────────────────────────────────────────────────────────────┐
│ CountAggregator    │ SumAggregator     │ MaxAggregator      │
│ • count: int       │ • sum: float      │ • max_value: Any   │
├────────────────────┼───────────────────┼────────────────────┤
│ MinAggregator      │ AvgAggregator     │ MultiAggregator    │
│ • min_value: Any   │ • sum: float      │ • aggregators: {}  │
│                    │ • count: int      │ • agg_specs: {}    │
└─────────────────────────────────────────────────────────────┘

1.1 抽象基類設計

首先定義所有聚合器的通用介面,確保一致的行為模式:

class BaseAggregator(ABC):
    """聚合器基礎介面"""

    def __init__(self):
        self.reset()

    @abstractmethod
    def reset(self):
        """重置聚合狀態"""
        pass

    @abstractmethod
    def update(self, value: Any) -> Any:
        """
        更新聚合狀態並返回當前結果

        :param value: 新的輸入值
        :return: 當前聚合結果
        """
        pass

    @abstractmethod
    def get_result(self) -> Any:
        """獲取當前聚合結果"""
        pass

設計要點

  • 使用抽象基類 (ABC) 確保所有聚合器實現相同介面
  • reset() 方法用於重置聚合狀態,支援視窗化聚合
  • update() 方法接收新值,更新內部狀態並返回結果
  • get_result() 方法獲取當前聚合結果

1.2 基礎聚合器實現

class CountAggregator(BaseAggregator):
    """計數聚合器"""
    
    def reset(self):
        self.count = 0
    
    def update(self, value: Any) -> int:
        self.count += 1
        return self.count
    
    def get_result(self) -> int:
        return self.count

CountAggregator 分析

  • 狀態:僅維護一個 count 計數器
  • 更新:每次調用 update() 計數器加1,忽略輸入值
  • 結果:返回當前計數

1.3 數值聚合器實現

class SumAggregator(BaseAggregator):
    """求和聚合器"""
    
    def reset(self):
        self.sum = 0
    
    def update(self, value: Any) -> float:
        self.sum += float(value)
        return self.sum
    
    def get_result(self) -> float:
        return self.sum


class MaxAggregator(BaseAggregator):
    """最大值聚合器"""
    
    def reset(self):
        self.max_value = None
    
    def update(self, value: Any) -> Any:
        if self.max_value is None or value > self.max_value:
            self.max_value = value
        return self.max_value
    
    def get_result(self) -> Any:
        return self.max_value


class MinAggregator(BaseAggregator):
    """最小值聚合器"""
    
    def reset(self):
        self.min_value = None
    
    def update(self, value: Any) -> Any:
        if self.min_value is None or value < self.min_value:
            self.min_value = value
        return self.min_value
    
    def get_result(self) -> Any:
        return self.min_value


class AvgAggregator(BaseAggregator):
    """平均值聚合器"""
    
    def reset(self):
        self.sum = 0
        self.count = 0
    
    def update(self, value: Any) -> float:
        self.sum += float(value)
        self.count += 1
        return self.sum / self.count
    
    def get_result(self) -> float:
        return self.sum / self.count if self.count > 0 else 0

各聚合器特點

  • SumAggregator:累計數值,處理數值型別轉換
  • MaxAggregator:追蹤最大值,處理 None 初始狀態
  • MinAggregator:追蹤最小值,同樣處理 None 初始狀態
  • AvgAggregator:維護和與計數,計算平均值

步驟二:複合聚合器設計

MultiAggregator 組合模式架構圖

MultiAggregator 內部結構
┌─────────────────────────────────────────────────────────────┐
│                  MultiAggregator                            │
│  ┌─────────────────────────────────────────────────────┐    │
│  │ agg_specs: {                                        │    │
│  │   "order_count": "count",                           │    │
│  │   "total_amount": "sum(amount)",                    │    │
│  │   "max_amount": "max(amount)"                       │    │
│  │ }                                                   │    │
│  └─────────────────────────────────────────────────────┘    │
│                            │                                │
│                            ▼                                │
│  ┌─────────────────────────────────────────────────────┐    │
│  │ aggregators: {                                      │    │
│  │   "order_count": (CountAggregator(), None),         │    │
│  │   "total_amount": (SumAggregator(), "amount"),      │    │
│  │   "max_amount": (MaxAggregator(), "amount")         │    │
│  │ }                                                   │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────┬───────────────────────────────────┘
                          │ (組合)
                          ▼
┌─────────────────────────────────────────────────────────────┐
├─────────────────┬───────────────────┬───────────────────────┤
│ CountAggregator │ SumAggregator     │ MaxAggregator         │
│ count = 0       │ sum = 0.0         │ max_value = None      │
└─────────────────┴───────────────────┴───────────────────────┘

2.1 MultiAggregator 核心思想

MultiAggregator 採用組合模式,將多個基礎聚合器組合起來,實現多重聚合功能:

class MultiAggregator(BaseAggregator):
    """複合聚合器:復用現有聚合器,組合成多聚合功能"""
    
    def __init__(self, agg_specs: Dict[str, str]):
        """
        :param agg_specs: 聚合規格,例如:
        {
            "order_count": "count",
            "total_amount": "sum(amount)", 
            "max_amount": "max(amount)",
            "min_amount": "min(amount)"
        }
        """
        self.agg_specs = agg_specs
        self.aggregators = {}  # alias -> (aggregator, field)
        
        # 關鍵:為每個聚合規格創建對應的聚合器實例
        for alias, spec in agg_specs.items():
            if spec == "count":
                self.aggregators[alias] = (CountAggregator(), None)
            elif spec.startswith("sum(") and spec.endswith(")"):
                field = spec[4:-1]
                self.aggregators[alias] = (SumAggregator(), field)
            elif spec.startswith("max(") and spec.endswith(")"):
                field = spec[4:-1]
                self.aggregators[alias] = (MaxAggregator(), field)
            elif spec.startswith("min(") and spec.endswith(")"):
                field = spec[4:-1]
                self.aggregators[alias] = (MinAggregator(), field)
            elif spec.startswith("avg(") and spec.endswith(")"):
                field = spec[4:-1]
                self.aggregators[alias] = (AvgAggregator(), field)
                
        super().__init__()

規格解析邏輯

  1. "count"(CountAggregator(), None) - 不需要欄位值
  2. "sum(amount)"(SumAggregator(), "amount") - 提取欄位名
  3. "max(price)"(MaxAggregator(), "price") - 提取欄位名

2.2 複合聚合邏輯

def update(self, event: Dict[str, Any]) -> Dict[str, Any]:
    """更新所有聚合器並返回結果"""
    result = {}
    
    for alias, (aggregator, field) in self.aggregators.items():
        if field is None:
            # count 不需要欄位值
            current_result = aggregator.update(1)
        else:
            # 其他聚合需要欄位值
            value = event.get(field, 0)
            current_result = aggregator.update(value)
        
        result[alias] = current_result
    
    return result

執行流程分析

  1. 遍歷所有配置的聚合器
  2. 根據聚合型別提取對應的欄位值
  3. 調用各聚合器的 update() 方法
  4. 收集所有聚合結果,組成字典返回

實際執行範例

# 輸入事件
event = {"customer_id": "C001", "amount": 120}

# MultiAggregator 配置
specs = {
    "order_count": "count",
    "total_amount": "sum(amount)"
}

# 執行更新
result = multi_agg.update(event)
# 輸出: {"order_count": 1, "total_amount": 120.0}

步驟三:重構 GroupBy 操作

3.1 GroupByOperation 類別設計

class GroupByOperation:
    """
    通用的 GroupBy 操作,支援多種聚合函數
    """
    
    def __init__(self, group_key: str, aggregator_factory: Callable[[], BaseAggregator]):
        self.group_key = group_key
        self.aggregator_factory = aggregator_factory
        
        # 狀態存儲:group_value -> aggregator
        self.group_aggregators = defaultdict(self.aggregator_factory)

核心參數

  • group_key:字串欄位名,用於從事件中提取分組鍵值
  • aggregator_factory:聚合器工廠函數,用於創建新的聚合器實例

狀態管理

  • group_aggregators:使用 defaultdict 自動為新群組創建聚合器
  • 當新群組出現時,工廠函數自動創建對應的聚合器實例

3.2 統一事件處理介面

可以將單一聚合與多聚合統一成一個介面,根據聚合器類型自動判斷處理方式:

def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
    """
    統一的事件處理介面,支援單一聚合和多聚合
    
    :param event: 輸入事件
    :return: 聚合結果
    """
    # 1. 提取群組鍵值
    group_value = str(event.get(self.group_key, "unknown"))
    
    # 2. 獲取聚合器實例
    aggregator = self.group_aggregators[group_value]
    
    # 3. 根據聚合器類型選擇處理方式
    if isinstance(aggregator, MultiAggregator):
        # 多聚合:傳入完整事件
        current_results = aggregator.update(event)
        result = {
            "group_key": group_value,
            "timestamp": event.get("timestamp", "unknown"),
            **current_results  # 展開所有聚合值
        }
    else:
        # 單一聚合:提取特定值
        if hasattr(self, 'value_field') and self.value_field:
            aggregate_value = event.get(self.value_field, 0)
        else:
            aggregate_value = 1  # Count 聚合固定為 1
        
        current_result = aggregator.update(aggregate_value)
        result = {
            "group_key": group_value,
            "value": current_result,
            "timestamp": event.get("timestamp", "unknown")
        }
    
    return result

設計優勢

  1. 介面統一:只需要一個 process_event() 方法
  2. 自動識別:根據聚合器類型自動選擇處理邏輯
  3. 程式碼簡化:減少重複的群組鍵提取邏輯
  4. 易於維護:統一的處理流程更容易維護

處理邏輯差異

  • MultiAggregator:傳入完整事件,返回多個聚合結果
  • 基礎聚合器:提取特定欄位值,返回單一聚合結果

3.3 執行範例對比

# 單一聚合範例
count_op = GroupByOperation("customer_id", CountAggregator)  
event = {"customer_id": "C001", "amount": 120}
result = count_op.process_event(event)
# 結果: {"group_key": "C001", "value": 1, "timestamp": "..."}

# 多聚合範例
multi_factory = lambda: MultiAggregator({
    "order_count": "count", 
    "total_amount": "sum(amount)"
})
multi_op = GroupByOperation("customer_id", multi_factory)
result = multi_op.process_event(event)  # 相同的方法調用!
# 結果: {"group_key": "C001", "order_count": 1, "total_amount": 120.0, "timestamp": "..."}

步驟四:GroupedDataFrame API 設計

4.1 GroupedDataFrame 作為聚合操作的統一介面:

class GroupedDataFrame:
    """
    分組後的 DataFrame,支援多種聚合操作
    """
    
    def __init__(self, source_df: 'SimpleDataFrame', group_key: Union[str, Callable]):
        self.source_df = source_df
        self.group_key = group_key
        
        # 懶載入:等到具體聚合方法被呼叫時才創建
        self.groupby_op = None
        self._aggregation_configured = False

4.2 單一聚合 API 設計

基礎聚合方法

def count(self) -> 'SimpleDataFrame':
    """計數聚合"""
    return self._setup_aggregation(
        CountAggregator,
        aggregation_name="count"
    )

def sum(self, field: str) -> 'SimpleDataFrame':
    """求和聚合"""
    return self._setup_aggregation(
        SumAggregator,
        aggregation_name="sum",
        value_field=field
    )

def max(self, field: str) -> 'SimpleDataFrame':
    """最大值聚合"""
    return self._setup_aggregation(
        MaxAggregator,
        aggregation_name="max",
        value_field=field
    )

def min(self, field: str) -> 'SimpleDataFrame':
    """最小值聚合"""
    return self._setup_aggregation(
        MinAggregator,
        aggregation_name="min",
        value_field=field
    )

def avg(self, field: str) -> 'SimpleDataFrame':
    """平均值聚合"""
    return self._setup_aggregation(
        AvgAggregator,
        aggregation_name="avg",
        value_field=field
    )

4.3 單一聚合的核心實現

所有單一聚合方法都使用 _setup_aggregation 進行統一處理:

def _setup_aggregation(self, aggregator_class, aggregation_name: str, value_field: Optional[str] = None):
    """通用的聚合設定邏輯"""
    if self._aggregation_configured:
        raise RuntimeError("Aggregation already configured for this GroupedDataFrame")
    
    # 1. 創建 GroupBy 操作實例
    self.groupby_op = GroupByOperation(self.group_key, aggregator_class)
    self.value_field = value_field
    
    # 2. 創建結果 DataFrame
    result_df = SimpleDataFrame(f"{self.source_df.name}_{aggregation_name}")
    
    # 3. 設定處理邏輯
    self._setup_processing(result_df)
    
    # 4. 標記為已配置
    self._aggregation_configured = True
    
    return result_df

4.4 事件攔截機制

_setup_processing 方法實現事件攔截和聚合處理:

def _setup_processing(self, result_df: 'SimpleDataFrame'):
    """設定聚合處理邏輯"""
    original_process = self.source_df.process_message
    
    def enhanced_process(message: Dict[str, Any]) -> bool:
        try:
            # 1. 先執行原本的處理流程
            original_result = original_process(message)
            
            # 2. 只有原處理成功才執行聚合
            if original_result:
                # 執行聚合 - 使用統一的 process_event 介面
                result = self.groupby_op.process_event(message)
                
                # 將聚合結果發送到結果 DataFrame
                result_df.process_message(result)
            
            return original_result
        except Exception as e:
            logger.error(f"Error in GroupBy processing: {e}")
            return original_process(message)
    
    # 替換處理函數
    self.source_df.process_message = enhanced_process

4.5 多聚合 API 設計

agg() 方法 - SQL 風格多聚合

def agg(self, **specs) -> 'SimpleDataFrame':
        """
        SQL 風格多聚合:像 SQL 一樣同時計算多種聚合
        
        用法:
        .agg(
            order_count="count",
            total_amount="sum(amount)",
            max_amount="max(amount)", 
            min_amount="min(amount)",
            avg_amount="avg(amount)"
        )
        
        等效 SQL:
        SELECT customer_id,
               COUNT(*) as order_count,
               SUM(amount) as total_amount,
               MAX(amount) as max_amount,
               MIN(amount) as min_amount,
               AVG(amount) as avg_amount
        FROM orders GROUP BY customer_id
        """
        if self._aggregation_configured:
            raise RuntimeError("Aggregation already configured for this GroupedDataFrame")
        
        # 創建多聚合器工廠
        def multi_agg_factory():
            return MultiAggregator(specs)
        
        # 創建 GroupBy 操作實例
        self.groupby_op = GroupByOperation(self.group_key, multi_agg_factory)
        self.value_field = None
        
        # 創建結果 DataFrame
        result_df = SimpleDataFrame(f"{self.source_df.name}_multi_agg")
        
        # 使用統一的處理邏輯
        self._setup_processing(result_df)
        
        self._aggregation_configured = True
        logger.info(f"Configured multi-aggregation: {list(specs.keys())}")
        return result_df

基本多聚合用法

# 客戶訂單統計:同時計算多種聚合
stats_df = (
    orders_df
    .group_by("customer_id")
    .agg(
        order_count="count",           # 訂單數量
        total_spent="sum(amount)",     # 總消費額
        max_order="max(amount)",       # 最大單筆訂單
        min_order="min(amount)",       # 最小單筆訂單
        avg_order="avg(amount)"        # 平均訂單金額
    )
)

輸出結果

{
    "group_key": "C001",
    "timestamp": "2024-01-15T10:30:00Z",
    "order_count": 5,
    "total_spent": 1250.0,
    "max_order": 450.0,
    "min_order": 80.0,
    "avg_order": 250.0
}

4.6 API 設計的統一性

共同特點

  • 所有聚合方法都返回新的 SimpleDataFrame
  • 使用相同的事件攔截機制 _setup_processing()
  • 透過統一的 process_event() 介面處理事件
  • 支援鏈式操作:聚合結果可以繼續 filter、sink 等

差異點

  • 單一聚合:使用具體的聚合器類別 (CountAggregator, SumAggregator 等)
  • 多聚合:使用 MultiAggregator 作為容器,內部管理多個子聚合器
  • 輸出格式:單聚合用 "value" 欄位,多聚合直接展開所有結果欄位

資料生命週期圖

Data Lifecycle in Streaming Aggregation System

┌────────────────────────────────────────────────────┐
│             Phase 1: Event Input                   │
└─────────────────────────────┬──────────────────────┘
                              │
    Raw Event Stream          │{"customer_id": "C001", "amount": 120}
    ┌─────────────────┐       │{"customer_id": "C002", "amount": 200}
    │ KafkaSource     │───────┘{"customer_id": "C001", "amount": 80}
    │ • Topic         │
    │ • Partition     │
    │ • Offset        │
    └─────────────────┘
            │
            ▼
┌────────────────────────────────────────────────────┐
│           Phase 2: DataFrame Processing            │
└─────────────────────────────┬──────────────────────┘
                              │
┌──────────────────────────────────────────────────────────────────┐
│ SimpleDataFrame: orders_df                                       │
│ ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐│
│ │ Original        │  │ Enhanced        │  │ Result DataFrame    ││
│ │ process_message │─▶│ process_message │─▶│ orders_df_count     ││
│ │ • Filter        │  │ • Original logic│  │ • Aggregated results││
│ │ • Transform     │  │ • + Aggregation │  │ • Ready for sink    ││
│ └─────────────────┘  └─────────────────┘  └─────────────────────┘│
└─────────────────────────────┬────────────────────────────────────┘
                              │
                              ▼
┌──────────────────────────────────────────────────────────────────┐
│                     Phase 3: Aggregation Configuration           │
└─────────────────────────────┬────────────────────────────────────┘
                              │
                              ▼
        ┌─────────────────────────────────────┐
        │     GroupedDataFrame                │
        │ ┌─────────────────────────────────┐ │
        │ │ .group_by("customer_id")        │ │
        │ │ • source_df = orders_df         │ │
        │ │ • group_key = "customer_id"     │ │
        │ │ • _aggregation_configured=False │ │
        │ └─────────────────────────────────┘ │
        └─────────────────┬───────────────────┘
                          │
          ┌───────────────┴────────────────┐
          │                                │
          ▼                                ▼
┌─────────────────┐              ┌─────────────────────┐
│ Single Agg      │              │ Multi Agg           │
│ .count()        │              │ .agg(               │
│ .sum("amount")  │              │   count="count",    │
│ .max("amount")  │              │   sum="sum(amount)" │
│ .min("amount")  │              │ )                   │
│ .avg("amount")  │              │                     │
└─────────────────┘              └─────────────────────┘
          │                                │
          ▼                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                        Phase 4: Aggregator Factory              │
└─────────────────────────────┬───────────────────────────────────┘
                              │
          ┌───────────────────┴───────────────────┐
          │                                       │
          ▼                                       ▼
┌─────────────────────────────────┐    ┌───────────────────────────┐
│    Single Aggregator Factory    │    │ Multi Aggregator Factory  │
│                                 │    │                           │
│ aggregator_factory =            │    │ def multi_agg_factory():  │
│   CountAggregator               │    │   return MultiAggregator({│
│   SumAggregator                 │    │     "count": "count",     │
│   MaxAggregator                 │    │     "sum": "sum(amount)"  │
│   MinAggregator                 │    │   })                      │
│   AvgAggregator                 │    │                           │
└─────────────────────────────────┘    └───────────────────────────┘
          │                                       │
          └───────────────────┬───────────────────┘
                              │
                              ▼
┌──────────────────────────────────────────────────────────────────┐
│                      Phase 5: GroupByOperation                   │
└─────────────────────────────┬────────────────────────────────────┘
                              │
┌──────────────────────────────────────────────────────────────────┐
│ GroupByOperation(group_key="customer_id", aggregator_factory)    │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ group_aggregators = defaultdict(aggregator_factory)          │ │
│ │                                                              │ │
│ │ Dynamic State Creation:                                      │ │
│ │ • First "C001" event → Creates aggregator for C001           │ │
│ │ • First "C002" event → Creates aggregator for C002           │ │
│ │ • Subsequent events → Updates existing aggregators           │ │
│ └──────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬────────────────────────────────────┘
                              │
                              ▼
┌──────────────────────────────────────────────────────────────────┐
│                       Phase 6: Event Processing                  │
└─────────────────────────────┬────────────────────────────────────┘
                              │
Event: {"customer_id": "C001", "amount": 120}
          │
          ▼
┌────────────────────────────────────────────────────────────────────┐
│                        Event Processing Flow                       │
│                                                                    │
│ ┌─────────────────┐  ┌─────────────────┐  ┌───────────────────────┐│
│ │ 1. Extract Key  │  │ 2. Get/Create   │  │ 3. Update Aggregator  ││
│ │                 │  │    Aggregator   │  │                       ││
│ │ group_value =   │─▶│                 │─▶│ aggregator.update()   ││
│ │ "C001"          │  │ aggregator =    │  │ • Count: +1           ││
│ │                 │  │ group_aggs      │  │ • Sum: +120.0         ││
│ │                 │  │ ["C001"]        │  │ • Max: max(old, 120.0)││
│ └─────────────────┘  └─────────────────┘  └───────────────────────┘│
│                                  │                          │      │
│                         ┌────────▼────────┐                 │      │
│                         │ defaultdict     │                 │      │
│                         │ Auto-creates if │                 │      │
│                         │ key not exists  │                 │      │
│                         └─────────────────┘                 │      │
│                                                             │      │
│                                                             ▼      │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 4. Generate Result                                             │ │
│ │                                                                │ │
│ │ Single Aggregation:           Multi Aggregation:               │ │
│ │ {                             {                                │ │
│ │   "group_key": "C001",          "group_key": "C001",           │ │
│ │   "value": 1,                   "timestamp": "...",            │ │
│ │   "timestamp": "..."            "order_count": 1,              │ │
│ │ }                               "total_amount": 120.0,         │ │
│ │                                 "max_amount": 120.0            │ │
│ │                               }                                │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬──────────────────────────────────────┘
                              │
                              ▼
┌────────────────────────────────────────────────────────────────────┐
│                          Phase 7: Result Output                    │
└─────────────────────────────┬──────────────────────────────────────┘
                              │
                              ▼
┌────────────────────────────────────────────────────────────────────┐
│ Result DataFrame Processing                                        │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ result_df.process_message(aggregated_result)                   │ │
│ │          │                                                     │ │
│ │          ▼                                                     │ │
│ │ ┌──────────────────┐    ┌─────────────────┐                    │ │
│ │ │ Further          │    │ Sink            │                    │ │
│ │ │ Processing       │───▶│ Processing      │                    │ │
│ │ │ • More filters   │    │ • PostgreSink   │                    │ │
│ │ │ • Transformations│    │                 │                    │ │
│ │ │ • Chain ops      │    │                 │                    │ │
│ │ └──────────────────┘    └─────────────────┘                    │ │
│ └────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘

狀態演進時間軸

Single-Aggregation State Evolution Timeline:

Time T1: First Event {"customer_id": "C001", "amount": 120}
├─ GroupByOperation State:
   └─ group_aggregators = {"C001": CountAggregator(count=1)}


Time T2: Second Event {"customer_id": "C002", "amount": 200}  
├─ GroupByOperation State:
   └─ group_aggregators = {
        "C001": CountAggregator(count=1),
        "C002": CountAggregator(count=1)     ← New group created
      }


Time T3: Third Event {"customer_id": "C001", "amount": 80}
├─ GroupByOperation State:
   └─ group_aggregators = {
        "C001": CountAggregator(count=2),    ← Updated existing
        "C002": CountAggregator(count=1)
      }

Multi-Aggregation State Evolution Timeline:

Time T1: {"customer_id": "C001", "amount": 120}
├─ MultiAggregator["C001"] = {
     "order_count": CountAggregator(count=1),
     "total_amount": SumAggregator(sum=120.0),
     "max_amount": MaxAggregator(max=120.0)
   }


Time T2: {"customer_id": "C001", "amount": 80}
├─ MultiAggregator["C001"] = {
     "order_count": CountAggregator(count=2),     ← +1
     "total_amount": SumAggregator(sum=200.0),    ← +80.0
     "max_amount": MaxAggregator(max=120.0)       ← max(120.0, 80.0)
   }

總結

我們採用抽象工廠模式建立聚合器的統一介面。Count、Sum、Max、Min、Avg 這些聚合器都實現相同的抽象介面,確保一致的行為模式。

將基礎的 Count 聚合擴展為完整的多聚合系統,實現了 SQL 風格的流式聚合介面。這是從簡單功能到複雜系統的重要演進。

Day 16 預告

目前我們的聚合是「全局的」——從開始到現在的所有數據。但在真實世界中,我們往往更關心「最近一小時」、「每天」或「過去五分鐘」的統計。

下一章我們將加入最基本的時間窗口功能——固定時間窗口,讓聚合變得更加實用。比如每5分鐘統計一次,每小時產生一份報告。

到時候,我們的語法會變成這樣:

orders_df.window(minutes(5)).group_by("customer_id").agg(orders_count="count")

從無時間概念的聚合,進化到有時間感知的聚合,讓我們一起迎接更實用的流式分析能力!


上一篇
【知其然,更知其所以然】Day14:Streaming GroupBy
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」15
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言