昨天我們實現了基礎的流式 GroupBy Count,今天將進一步擴展聚合功能。我們將實現完整的聚合操作(sum、avg、max、min)。
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
聚合器類別繼承結構
┌─────────────────────────────────────────────────────────────┐
│ BaseAggregator │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ + reset(): void │ │
│ │ + update(value): Any (abstract) │ │
│ │ + get_result(): Any (abstract) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────┘
│ (繼承)
▼
┌─────────────────────────────────────────────────────────────┐
│ CountAggregator │ SumAggregator │ MaxAggregator │
│ • count: int │ • sum: float │ • max_value: Any │
├────────────────────┼───────────────────┼────────────────────┤
│ MinAggregator │ AvgAggregator │ MultiAggregator │
│ • min_value: Any │ • sum: float │ • aggregators: {} │
│ │ • count: int │ • agg_specs: {} │
└─────────────────────────────────────────────────────────────┘
首先定義所有聚合器的通用介面,確保一致的行為模式:
class BaseAggregator(ABC):
"""聚合器基礎介面"""
def __init__(self):
self.reset()
@abstractmethod
def reset(self):
"""重置聚合狀態"""
pass
@abstractmethod
def update(self, value: Any) -> Any:
"""
更新聚合狀態並返回當前結果
:param value: 新的輸入值
:return: 當前聚合結果
"""
pass
@abstractmethod
def get_result(self) -> Any:
"""獲取當前聚合結果"""
pass
設計要點:
reset()
方法用於重置聚合狀態,支援視窗化聚合update()
方法接收新值,更新內部狀態並返回結果get_result()
方法獲取當前聚合結果class CountAggregator(BaseAggregator):
"""計數聚合器"""
def reset(self):
self.count = 0
def update(self, value: Any) -> int:
self.count += 1
return self.count
def get_result(self) -> int:
return self.count
CountAggregator 分析:
count
計數器update()
計數器加1,忽略輸入值class SumAggregator(BaseAggregator):
"""求和聚合器"""
def reset(self):
self.sum = 0
def update(self, value: Any) -> float:
self.sum += float(value)
return self.sum
def get_result(self) -> float:
return self.sum
class MaxAggregator(BaseAggregator):
"""最大值聚合器"""
def reset(self):
self.max_value = None
def update(self, value: Any) -> Any:
if self.max_value is None or value > self.max_value:
self.max_value = value
return self.max_value
def get_result(self) -> Any:
return self.max_value
class MinAggregator(BaseAggregator):
"""最小值聚合器"""
def reset(self):
self.min_value = None
def update(self, value: Any) -> Any:
if self.min_value is None or value < self.min_value:
self.min_value = value
return self.min_value
def get_result(self) -> Any:
return self.min_value
class AvgAggregator(BaseAggregator):
"""平均值聚合器"""
def reset(self):
self.sum = 0
self.count = 0
def update(self, value: Any) -> float:
self.sum += float(value)
self.count += 1
return self.sum / self.count
def get_result(self) -> float:
return self.sum / self.count if self.count > 0 else 0
各聚合器特點:
MultiAggregator 內部結構
┌─────────────────────────────────────────────────────────────┐
│ MultiAggregator │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ agg_specs: { │ │
│ │ "order_count": "count", │ │
│ │ "total_amount": "sum(amount)", │ │
│ │ "max_amount": "max(amount)" │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ aggregators: { │ │
│ │ "order_count": (CountAggregator(), None), │ │
│ │ "total_amount": (SumAggregator(), "amount"), │ │
│ │ "max_amount": (MaxAggregator(), "amount") │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────┘
│ (組合)
▼
┌─────────────────────────────────────────────────────────────┐
├─────────────────┬───────────────────┬───────────────────────┤
│ CountAggregator │ SumAggregator │ MaxAggregator │
│ count = 0 │ sum = 0.0 │ max_value = None │
└─────────────────┴───────────────────┴───────────────────────┘
MultiAggregator 採用組合模式,將多個基礎聚合器組合起來,實現多重聚合功能:
class MultiAggregator(BaseAggregator):
"""複合聚合器:復用現有聚合器,組合成多聚合功能"""
def __init__(self, agg_specs: Dict[str, str]):
"""
:param agg_specs: 聚合規格,例如:
{
"order_count": "count",
"total_amount": "sum(amount)",
"max_amount": "max(amount)",
"min_amount": "min(amount)"
}
"""
self.agg_specs = agg_specs
self.aggregators = {} # alias -> (aggregator, field)
# 關鍵:為每個聚合規格創建對應的聚合器實例
for alias, spec in agg_specs.items():
if spec == "count":
self.aggregators[alias] = (CountAggregator(), None)
elif spec.startswith("sum(") and spec.endswith(")"):
field = spec[4:-1]
self.aggregators[alias] = (SumAggregator(), field)
elif spec.startswith("max(") and spec.endswith(")"):
field = spec[4:-1]
self.aggregators[alias] = (MaxAggregator(), field)
elif spec.startswith("min(") and spec.endswith(")"):
field = spec[4:-1]
self.aggregators[alias] = (MinAggregator(), field)
elif spec.startswith("avg(") and spec.endswith(")"):
field = spec[4:-1]
self.aggregators[alias] = (AvgAggregator(), field)
super().__init__()
規格解析邏輯:
(CountAggregator(), None)
- 不需要欄位值(SumAggregator(), "amount")
- 提取欄位名(MaxAggregator(), "price")
- 提取欄位名def update(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""更新所有聚合器並返回結果"""
result = {}
for alias, (aggregator, field) in self.aggregators.items():
if field is None:
# count 不需要欄位值
current_result = aggregator.update(1)
else:
# 其他聚合需要欄位值
value = event.get(field, 0)
current_result = aggregator.update(value)
result[alias] = current_result
return result
執行流程分析:
update()
方法實際執行範例:
# 輸入事件
event = {"customer_id": "C001", "amount": 120}
# MultiAggregator 配置
specs = {
"order_count": "count",
"total_amount": "sum(amount)"
}
# 執行更新
result = multi_agg.update(event)
# 輸出: {"order_count": 1, "total_amount": 120.0}
class GroupByOperation:
"""
通用的 GroupBy 操作,支援多種聚合函數
"""
def __init__(self, group_key: str, aggregator_factory: Callable[[], BaseAggregator]):
self.group_key = group_key
self.aggregator_factory = aggregator_factory
# 狀態存儲:group_value -> aggregator
self.group_aggregators = defaultdict(self.aggregator_factory)
核心參數:
group_key
:字串欄位名,用於從事件中提取分組鍵值aggregator_factory
:聚合器工廠函數,用於創建新的聚合器實例狀態管理:
group_aggregators
:使用 defaultdict 自動為新群組創建聚合器可以將單一聚合與多聚合統一成一個介面,根據聚合器類型自動判斷處理方式:
def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""
統一的事件處理介面,支援單一聚合和多聚合
:param event: 輸入事件
:return: 聚合結果
"""
# 1. 提取群組鍵值
group_value = str(event.get(self.group_key, "unknown"))
# 2. 獲取聚合器實例
aggregator = self.group_aggregators[group_value]
# 3. 根據聚合器類型選擇處理方式
if isinstance(aggregator, MultiAggregator):
# 多聚合:傳入完整事件
current_results = aggregator.update(event)
result = {
"group_key": group_value,
"timestamp": event.get("timestamp", "unknown"),
**current_results # 展開所有聚合值
}
else:
# 單一聚合:提取特定值
if hasattr(self, 'value_field') and self.value_field:
aggregate_value = event.get(self.value_field, 0)
else:
aggregate_value = 1 # Count 聚合固定為 1
current_result = aggregator.update(aggregate_value)
result = {
"group_key": group_value,
"value": current_result,
"timestamp": event.get("timestamp", "unknown")
}
return result
設計優勢:
process_event()
方法處理邏輯差異:
# 單一聚合範例
count_op = GroupByOperation("customer_id", CountAggregator)
event = {"customer_id": "C001", "amount": 120}
result = count_op.process_event(event)
# 結果: {"group_key": "C001", "value": 1, "timestamp": "..."}
# 多聚合範例
multi_factory = lambda: MultiAggregator({
"order_count": "count",
"total_amount": "sum(amount)"
})
multi_op = GroupByOperation("customer_id", multi_factory)
result = multi_op.process_event(event) # 相同的方法調用!
# 結果: {"group_key": "C001", "order_count": 1, "total_amount": 120.0, "timestamp": "..."}
class GroupedDataFrame:
"""
分組後的 DataFrame,支援多種聚合操作
"""
def __init__(self, source_df: 'SimpleDataFrame', group_key: Union[str, Callable]):
self.source_df = source_df
self.group_key = group_key
# 懶載入:等到具體聚合方法被呼叫時才創建
self.groupby_op = None
self._aggregation_configured = False
基礎聚合方法:
def count(self) -> 'SimpleDataFrame':
"""計數聚合"""
return self._setup_aggregation(
CountAggregator,
aggregation_name="count"
)
def sum(self, field: str) -> 'SimpleDataFrame':
"""求和聚合"""
return self._setup_aggregation(
SumAggregator,
aggregation_name="sum",
value_field=field
)
def max(self, field: str) -> 'SimpleDataFrame':
"""最大值聚合"""
return self._setup_aggregation(
MaxAggregator,
aggregation_name="max",
value_field=field
)
def min(self, field: str) -> 'SimpleDataFrame':
"""最小值聚合"""
return self._setup_aggregation(
MinAggregator,
aggregation_name="min",
value_field=field
)
def avg(self, field: str) -> 'SimpleDataFrame':
"""平均值聚合"""
return self._setup_aggregation(
AvgAggregator,
aggregation_name="avg",
value_field=field
)
所有單一聚合方法都使用 _setup_aggregation
進行統一處理:
def _setup_aggregation(self, aggregator_class, aggregation_name: str, value_field: Optional[str] = None):
"""通用的聚合設定邏輯"""
if self._aggregation_configured:
raise RuntimeError("Aggregation already configured for this GroupedDataFrame")
# 1. 創建 GroupBy 操作實例
self.groupby_op = GroupByOperation(self.group_key, aggregator_class)
self.value_field = value_field
# 2. 創建結果 DataFrame
result_df = SimpleDataFrame(f"{self.source_df.name}_{aggregation_name}")
# 3. 設定處理邏輯
self._setup_processing(result_df)
# 4. 標記為已配置
self._aggregation_configured = True
return result_df
_setup_processing
方法實現事件攔截和聚合處理:
def _setup_processing(self, result_df: 'SimpleDataFrame'):
"""設定聚合處理邏輯"""
original_process = self.source_df.process_message
def enhanced_process(message: Dict[str, Any]) -> bool:
try:
# 1. 先執行原本的處理流程
original_result = original_process(message)
# 2. 只有原處理成功才執行聚合
if original_result:
# 執行聚合 - 使用統一的 process_event 介面
result = self.groupby_op.process_event(message)
# 將聚合結果發送到結果 DataFrame
result_df.process_message(result)
return original_result
except Exception as e:
logger.error(f"Error in GroupBy processing: {e}")
return original_process(message)
# 替換處理函數
self.source_df.process_message = enhanced_process
agg() 方法 - SQL 風格多聚合:
def agg(self, **specs) -> 'SimpleDataFrame':
"""
SQL 風格多聚合:像 SQL 一樣同時計算多種聚合
用法:
.agg(
order_count="count",
total_amount="sum(amount)",
max_amount="max(amount)",
min_amount="min(amount)",
avg_amount="avg(amount)"
)
等效 SQL:
SELECT customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount,
MAX(amount) as max_amount,
MIN(amount) as min_amount,
AVG(amount) as avg_amount
FROM orders GROUP BY customer_id
"""
if self._aggregation_configured:
raise RuntimeError("Aggregation already configured for this GroupedDataFrame")
# 創建多聚合器工廠
def multi_agg_factory():
return MultiAggregator(specs)
# 創建 GroupBy 操作實例
self.groupby_op = GroupByOperation(self.group_key, multi_agg_factory)
self.value_field = None
# 創建結果 DataFrame
result_df = SimpleDataFrame(f"{self.source_df.name}_multi_agg")
# 使用統一的處理邏輯
self._setup_processing(result_df)
self._aggregation_configured = True
logger.info(f"Configured multi-aggregation: {list(specs.keys())}")
return result_df
基本多聚合用法:
# 客戶訂單統計:同時計算多種聚合
stats_df = (
orders_df
.group_by("customer_id")
.agg(
order_count="count", # 訂單數量
total_spent="sum(amount)", # 總消費額
max_order="max(amount)", # 最大單筆訂單
min_order="min(amount)", # 最小單筆訂單
avg_order="avg(amount)" # 平均訂單金額
)
)
輸出結果:
{
"group_key": "C001",
"timestamp": "2024-01-15T10:30:00Z",
"order_count": 5,
"total_spent": 1250.0,
"max_order": 450.0,
"min_order": 80.0,
"avg_order": 250.0
}
共同特點:
_setup_processing()
process_event()
介面處理事件差異點:
Data Lifecycle in Streaming Aggregation System
┌────────────────────────────────────────────────────┐
│ Phase 1: Event Input │
└─────────────────────────────┬──────────────────────┘
│
Raw Event Stream │{"customer_id": "C001", "amount": 120}
┌─────────────────┐ │{"customer_id": "C002", "amount": 200}
│ KafkaSource │───────┘{"customer_id": "C001", "amount": 80}
│ • Topic │
│ • Partition │
│ • Offset │
└─────────────────┘
│
▼
┌────────────────────────────────────────────────────┐
│ Phase 2: DataFrame Processing │
└─────────────────────────────┬──────────────────────┘
│
┌──────────────────────────────────────────────────────────────────┐
│ SimpleDataFrame: orders_df │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐│
│ │ Original │ │ Enhanced │ │ Result DataFrame ││
│ │ process_message │─▶│ process_message │─▶│ orders_df_count ││
│ │ • Filter │ │ • Original logic│ │ • Aggregated results││
│ │ • Transform │ │ • + Aggregation │ │ • Ready for sink ││
│ └─────────────────┘ └─────────────────┘ └─────────────────────┘│
└─────────────────────────────┬────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Phase 3: Aggregation Configuration │
└─────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ GroupedDataFrame │
│ ┌─────────────────────────────────┐ │
│ │ .group_by("customer_id") │ │
│ │ • source_df = orders_df │ │
│ │ • group_key = "customer_id" │ │
│ │ • _aggregation_configured=False │ │
│ └─────────────────────────────────┘ │
└─────────────────┬───────────────────┘
│
┌───────────────┴────────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────────┐
│ Single Agg │ │ Multi Agg │
│ .count() │ │ .agg( │
│ .sum("amount") │ │ count="count", │
│ .max("amount") │ │ sum="sum(amount)" │
│ .min("amount") │ │ ) │
│ .avg("amount") │ │ │
└─────────────────┘ └─────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Phase 4: Aggregator Factory │
└─────────────────────────────┬───────────────────────────────────┘
│
┌───────────────────┴───────────────────┐
│ │
▼ ▼
┌─────────────────────────────────┐ ┌───────────────────────────┐
│ Single Aggregator Factory │ │ Multi Aggregator Factory │
│ │ │ │
│ aggregator_factory = │ │ def multi_agg_factory(): │
│ CountAggregator │ │ return MultiAggregator({│
│ SumAggregator │ │ "count": "count", │
│ MaxAggregator │ │ "sum": "sum(amount)" │
│ MinAggregator │ │ }) │
│ AvgAggregator │ │ │
└─────────────────────────────────┘ └───────────────────────────┘
│ │
└───────────────────┬───────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Phase 5: GroupByOperation │
└─────────────────────────────┬────────────────────────────────────┘
│
┌──────────────────────────────────────────────────────────────────┐
│ GroupByOperation(group_key="customer_id", aggregator_factory) │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ group_aggregators = defaultdict(aggregator_factory) │ │
│ │ │ │
│ │ Dynamic State Creation: │ │
│ │ • First "C001" event → Creates aggregator for C001 │ │
│ │ • First "C002" event → Creates aggregator for C002 │ │
│ │ • Subsequent events → Updates existing aggregators │ │
│ └──────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Phase 6: Event Processing │
└─────────────────────────────┬────────────────────────────────────┘
│
Event: {"customer_id": "C001", "amount": 120}
│
▼
┌────────────────────────────────────────────────────────────────────┐
│ Event Processing Flow │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌───────────────────────┐│
│ │ 1. Extract Key │ │ 2. Get/Create │ │ 3. Update Aggregator ││
│ │ │ │ Aggregator │ │ ││
│ │ group_value = │─▶│ │─▶│ aggregator.update() ││
│ │ "C001" │ │ aggregator = │ │ • Count: +1 ││
│ │ │ │ group_aggs │ │ • Sum: +120.0 ││
│ │ │ │ ["C001"] │ │ • Max: max(old, 120.0)││
│ └─────────────────┘ └─────────────────┘ └───────────────────────┘│
│ │ │ │
│ ┌────────▼────────┐ │ │
│ │ defaultdict │ │ │
│ │ Auto-creates if │ │ │
│ │ key not exists │ │ │
│ └─────────────────┘ │ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 4. Generate Result │ │
│ │ │ │
│ │ Single Aggregation: Multi Aggregation: │ │
│ │ { { │ │
│ │ "group_key": "C001", "group_key": "C001", │ │
│ │ "value": 1, "timestamp": "...", │ │
│ │ "timestamp": "..." "order_count": 1, │ │
│ │ } "total_amount": 120.0, │ │
│ │ "max_amount": 120.0 │ │
│ │ } │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬──────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────┐
│ Phase 7: Result Output │
└─────────────────────────────┬──────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────┐
│ Result DataFrame Processing │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ result_df.process_message(aggregated_result) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────┐ ┌─────────────────┐ │ │
│ │ │ Further │ │ Sink │ │ │
│ │ │ Processing │───▶│ Processing │ │ │
│ │ │ • More filters │ │ • PostgreSink │ │ │
│ │ │ • Transformations│ │ │ │ │
│ │ │ • Chain ops │ │ │ │ │
│ │ └──────────────────┘ └─────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
Single-Aggregation State Evolution Timeline:
Time T1: First Event {"customer_id": "C001", "amount": 120}
├─ GroupByOperation State:
└─ group_aggregators = {"C001": CountAggregator(count=1)}
Time T2: Second Event {"customer_id": "C002", "amount": 200}
├─ GroupByOperation State:
└─ group_aggregators = {
"C001": CountAggregator(count=1),
"C002": CountAggregator(count=1) ← New group created
}
Time T3: Third Event {"customer_id": "C001", "amount": 80}
├─ GroupByOperation State:
└─ group_aggregators = {
"C001": CountAggregator(count=2), ← Updated existing
"C002": CountAggregator(count=1)
}
Multi-Aggregation State Evolution Timeline:
Time T1: {"customer_id": "C001", "amount": 120}
├─ MultiAggregator["C001"] = {
"order_count": CountAggregator(count=1),
"total_amount": SumAggregator(sum=120.0),
"max_amount": MaxAggregator(max=120.0)
}
Time T2: {"customer_id": "C001", "amount": 80}
├─ MultiAggregator["C001"] = {
"order_count": CountAggregator(count=2), ← +1
"total_amount": SumAggregator(sum=200.0), ← +80.0
"max_amount": MaxAggregator(max=120.0) ← max(120.0, 80.0)
}
我們採用抽象工廠模式建立聚合器的統一介面。Count、Sum、Max、Min、Avg 這些聚合器都實現相同的抽象介面,確保一致的行為模式。
將基礎的 Count 聚合擴展為完整的多聚合系統,實現了 SQL 風格的流式聚合介面。這是從簡單功能到複雜系統的重要演進。
目前我們的聚合是「全局的」——從開始到現在的所有數據。但在真實世界中,我們往往更關心「最近一小時」、「每天」或「過去五分鐘」的統計。
下一章我們將加入最基本的時間窗口功能——固定時間窗口,讓聚合變得更加實用。比如每5分鐘統計一次,每小時產生一份報告。
到時候,我們的語法會變成這樣:
orders_df.window(minutes(5)).group_by("customer_id").agg(orders_count="count")
從無時間概念的聚合,進化到有時間感知的聚合,讓我們一起迎接更實用的流式分析能力!