昨天我們打造了強大的 SQL 風格多聚合系統,但有個重要問題:所有聚合都是「全局的」,從程式開始到現在的累積統計。在真實業務中,我們更關心「最近的」、「定期的」統計結果。
今天讓我們為聚合系統加上時間感知能力!
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
# 目前的全局聚合
customer_stats = orders_df.group_by("customer_id").agg(total_orders="count")
# 結果:從程式開始到現在的累積統計
{
"group_key": "C001",
"total_orders": 152 # 從早上9點到現在下午3點的累積
}
問題:
# 有了時間窗口的聚合
hourly_stats = (orders_df
.window(hours(1)) # 每小時一個窗口
.group_by("customer_id")
.agg(hourly_orders="count")
)
# 結果:每小時的獨立統計
{
"group_key": "C001",
"hourly_orders": 12, # 這一小時的訂單數
"window_start": "14:00:00",
"window_end": "15:00:00"
}
價值:
固定窗口(Tumbling Window):
時間軸:09:00 -----> 10:00 -----> 11:00 -----> 12:00
窗口: [09:00-10:00) [10:00-11:00) [11:00-12:00)
事件: 事件1,2,3 事件4,5 事件6
結果: 統計A 統計B 統計C
[start, end)
- 開始包含,結束不包含這一步我們要打造時間窗口的基礎設施 - 就像建房子前要先打地基一樣。
步驟 1.1.1:核心屬性定義
class TimeWindow:
"""時間窗口的基礎類別 - 定義一個時間區間"""
def __init__(self, start_time: int, end_time: int):
# 使用毫秒級時間戳,避免浮點數精度問題
self.start_time = start_time # 窗口開始時間(毫秒)
self.end_time = end_time # 窗口結束時間(毫秒)
步驟 1.1.2:時間包含判斷邏輯
def contains(self, timestamp: int) -> bool:
"""判斷時間戳是否在窗口內"""
# 使用半開區間 [start_time, end_time)
# - start_time 包含:該時刻的事件屬於這個窗口
# - end_time 不包含:該時刻的事件屬於下一個窗口
return self.start_time <= timestamp < self.end_time
# 例子:窗口 [14:00:00, 14:05:00)
# - 14:00:00 包含 ✓
# - 14:02:30 包含 ✓
# - 14:05:00 不包含 ✗ (屬於下個窗口)
步驟 1.1.3:窗口顯示和字串表示
def __str__(self):
"""將窗口轉換為可讀的字串格式"""
# 步驟1:毫秒時間戳轉換為 datetime 對象
start = datetime.fromtimestamp(self.start_time / 1000) # 除以1000:毫秒→秒
end = datetime.fromtimestamp(self.end_time / 1000)
# 步驟2:格式化為 HH:MM:SS 格式
start_str = start.strftime('%H:%M:%S') # "14:00:00"
end_str = end.strftime('%H:%M:%S') # "14:05:00"
# 步驟3:組合成半開區間表示法 [start-end)
return f"[{start_str}-{end_str})"
步驟 1.1.4:窗口相等性和哈希支持
def __eq__(self, other):
"""判斷兩個窗口是否相等"""
# 只有開始和結束時間都相同才認為是同一個窗口
return (self.start_time == other.start_time and
self.end_time == other.end_time)
def __hash__(self):
"""計算窗口的哈希值,支持作為字典鍵使用"""
# 使用 (start_time, end_time) 元組的哈希
return hash((self.start_time, self.end_time))
# 用途:支持窗口對象作為字典鍵
# window_states = {
# TimeWindow(start1, end1): aggregator1,
# TimeWindow(start2, end2): aggregator2
# }
TimeWindow 使用範例:
# 創建一個 5分鐘窗口:14:00:00-14:05:00
window = TimeWindow(1635739800000, 1635740100000)
# 測試時間包含
print(window.contains(1635739980000)) # 14:03:00 -> True
print(window.contains(1635740100000)) # 14:05:00 -> False
# 窗口字串表示
print(str(window)) # "[14:00:00-14:05:00)"
# 作為字典鍵
window_data = {window: "some_aggregation_result"}
步驟 1.2.1:計算器初始化
class FixedWindowCalculator:
"""固定窗口計算器 - 負責時間對齊和窗口管理"""
def __init__(self, window_size_ms: int):
# 存儲窗口大小(毫秒)
self.window_size_ms = window_size_ms
# 例子:
# - 5分鐘窗口:window_size_ms = 5 * 60 * 1000 = 300000
# - 1小時窗口:window_size_ms = 60 * 60 * 1000 = 3600000
步驟 1.2.2:核心窗口對齊算法
def get_window_for_timestamp(self, timestamp: int) -> TimeWindow:
"""根據時間戳計算對應的窗口 - 核心對齊算法"""
# 例子數據:timestamp = 1635739980000 (14:03:00), window_size = 300000ms (5分鐘)
# 步驟1:計算在當前窗口內的偏移量
# 使用取模運算:1635739980000 % 300000 = 180000ms (3分鐘)
# 意義:14:03:00 距離最近窗口邊界(14:00:00)的偏移是3分鐘
offset = timestamp % self.window_size_ms
# 步驟2:向後對齊到窗口開始時間
# 1635739980000 - 180000 = 1635739800000 (14:00:00)
# 無論是 14:01:00, 14:03:30, 14:04:59 都會對齊到 14:00:00
window_start = timestamp - offset
# 步驟3:計算窗口結束時間
# 1635739800000 + 300000 = 1635740100000 (14:05:00)
# 形成窗口 [14:00:00 - 14:05:00)
window_end = window_start + self.window_size_ms
return TimeWindow(window_start, window_end)
步驟 1.2.3:窗口過期檢測機制
def is_window_expired(self, window: TimeWindow, current_time: int,
grace_period_ms: int = 0) -> bool:
"""判斷窗口是否已過期"""
# 過期判斷邏輯:當前時間 >= (窗口結束時間 + 寬限期)
expiry_time = window.end_time + grace_period_ms
is_expired = current_time >= expiry_time
return is_expired
# 例子:
# - 窗口:[14:00:00-14:05:00),結束時間 = 14:05:00
# - 當前時間:14:06:30
# - 無寬限期:14:06:30 >= 14:05:00,已過期 ✓
# - 2分鐘寬限期:14:06:30 >= 14:07:00,未過期 ✗
步驟 1.3.1:時間單位轉換工具
def hours(n: int) -> int:
"""小時轉毫秒"""
# n小時 * 60分鐘/小時 * 60秒/分鐘 * 1000毫秒/秒
return n * 60 * 60 * 1000
# 例子:
# hours(1) = 3600000ms (1小時)
# hours(2) = 7200000ms (2小時)
def minutes(n: int) -> int:
"""分鐘轉毫秒"""
# n分鐘 * 60秒/分鐘 * 1000毫秒/秒
return n * 60 * 1000
# 例子:
# minutes(5) = 300000ms (5分鐘)
# minutes(30) = 1800000ms (30分鐘)
def seconds(n: int) -> int:
"""秒轉毫秒"""
# n秒 * 1000毫秒/秒
return n * 1000
# 例子:
# seconds(30) = 30000ms (30秒)
# seconds(120) = 120000ms (2分鐘)
使用範例:
# 創建不同大小的窗口
calculator_5min = FixedWindowCalculator(minutes(5)) # 5分鐘窗口
calculator_1hour = FixedWindowCalculator(hours(1)) # 1小時窗口
calculator_30sec = FixedWindowCalculator(seconds(30)) # 30秒窗口
# 比直接使用毫秒數更直觀:
# - 好:FixedWindowCalculator(minutes(5))
# - 差:FixedWindowCalculator(300000)
步驟 2.1.1:狀態管理核心設計
class WindowState:
"""窗口狀態管理器 - 管理所有窗口的聚合狀態"""
def __init__(self, aggregator_factory):
# 核心狀態存儲:(group_key, window) -> aggregator
# 例:{("C001", Window[14:00-14:05)): CountAggregator(count=3)}
self.window_aggregators = {}
# 加速查詢:group_key -> Set[windows]
# 例:{"C001": {Window[14:00-14:05), Window[14:05-14:10)}}
self.active_windows = defaultdict(set)
# 追蹤最新時間戳:group_key -> latest_timestamp
# 例:{"C001": 1635739980000}
self.latest_timestamp = defaultdict(int)
# 聚合器工廠函數
self.aggregator_factory = aggregator_factory
步驟 2.1.2:獲取或創建聚合器
def get_or_create_aggregator(self, group_key: str, window: TimeWindow):
"""獲取或創建指定群組和窗口的聚合器"""
# 步驟1:組合複合鍵
composite_key = (group_key, window)
# 步驟2:檢查是否已存在
if composite_key not in self.window_aggregators:
# 步驟3:不存在則創建新聚合器
new_aggregator = self.aggregator_factory()
self.window_aggregators[composite_key] = new_aggregator
# 步驟4:更新活躍窗口索引
self.active_windows[group_key].add(window)
# 步驟5:返回聚合器
return self.window_aggregators[composite_key]
步驟 2.1.3:過期窗口檢測
def get_expired_windows(self, group_key: str, current_timestamp: int,
grace_period_ms: int) -> List[TimeWindow]:
"""找出已過期的窗口"""
expired_windows = []
# 遍歷該群組的所有活躍窗口
for window in list(self.active_windows[group_key]):
# 檢查是否過期:當前時間 >= (窗口結束 + 寬限期)
expiry_time = window.end_time + grace_period_ms
if current_timestamp >= expiry_time:
expired_windows.append(window)
return expired_windows
def remove_expired_windows(self, group_key: str, expired_windows: List[TimeWindow]):
"""清理過期窗口的狀態"""
for window in expired_windows:
# 步驟1:從狀態存儲中移除
composite_key = (group_key, window)
if composite_key in self.window_aggregators:
del self.window_aggregators[composite_key]
# 步驟2:從活躍窗口索引中移除
self.active_windows[group_key].discard(window)
狀態管理流程範例:
# 初始化狀態管理器
window_state = WindowState(lambda: CountAggregator())
# 事件1:C001 在 14:03:00
window1 = TimeWindow(1635739800000, 1635740100000) # [14:00-14:05)
agg1 = window_state.get_or_create_aggregator("C001", window1)
agg1.update(1) # count = 1
# 狀態快照:
# window_aggregators = {("C001", Window[14:00-14:05)): CountAgg(1)}
# active_windows = {"C001": {Window[14:00-14:05)}}
# 事件2:C001 在 14:07:00(新窗口)
window2 = TimeWindow(1635740100000, 1635740400000) # [14:05-14:10)
agg2 = window_state.get_or_create_aggregator("C001", window2)
agg2.update(1) # count = 1
# 狀態快照:
# window_aggregators = {
# ("C001", Window[14:00-14:05)): CountAgg(1),
# ("C001", Window[14:05-14:10)): CountAgg(1)
# }
# active_windows = {"C001": {Window[14:00-14:05), Window[14:05-14:10)}}
步驟 3.1.1:核心架構初始化
class WindowedGroupByOperation:
"""窗口化的 GroupBy 操作 - 整合所有窗口組件"""
def __init__(self, group_key: Union[str, Callable],
window_calculator: FixedWindowCalculator,
aggregator_factory: Callable[[], BaseAggregator],
grace_period_ms: int = 0):
# 步驟1:存儲核心參數
self.group_key = group_key # 分組欄位 ("customer_id")
self.window_calculator = window_calculator # 窗口計算器
self.aggregator_factory = aggregator_factory # 聚合器工廠
self.grace_period_ms = grace_period_ms # 寬限期(毫秒)
# 步驟2:初始化狀態管理器
self.window_state = WindowState(aggregator_factory)
# 設計理念:
# - window_calculator 負責時間對齊
# - window_state 負責狀態管理
# - aggregator_factory 負責聚合邏輯
# - grace_period_ms 提供延遲容忍度
步驟 3.1.2:事件處理核心流程
def process_event(self, event: Dict[str, Any], timestamp_field: str = "timestamp") -> List[Dict[str, Any]]:
"""處理事件並返回觸發的窗口結果"""
# 步驟1:提取和轉換時間戳
timestamp = self._extract_timestamp(event, timestamp_field)
# 步驟2:提取分組鍵值
group_value = self._extract_group_key(event)
# 步驟3:計算事件所屬窗口
window = self.window_calculator.get_window_for_timestamp(timestamp)
# 步驟4:更新聚合狀態
aggregator = self.window_state.get_or_create_aggregator(group_value, window)
aggregator.update(event.get("value", 1))
# 步驟5:更新時間戳追蹤
self.window_state.update_latest_timestamp(group_value, timestamp)
# 步驟6:檢查並處理過期窗口
return self._handle_expired_windows(group_value, timestamp)
步驟 3.1.3:過期窗口處理
def _handle_expired_windows(self, group_key: str, current_timestamp: int) -> List[Dict[str, Any]]:
"""處理過期窗口並返回結果"""
# 步驟1:找出已過期的窗口
expired_windows = self.window_state.get_expired_windows(
group_key, current_timestamp, self.grace_period_ms
)
# 步驟2:為每個過期窗口生成結果
results = []
for window in expired_windows:
# 獲取該窗口的最終聚合結果
final_result = self.window_state.get_window_result(group_key, window)
if final_result is not None:
# 組裝窗口結果
window_result = {
"group_key": group_key, # 分組鍵
"value": final_result, # 聚合結果
"window_start": window.start_time, # 窗口開始時間(毫秒)
"window_end": window.end_time, # 窗口結束時間(毫秒)
"window_start_str": self._format_time(window.start_time), # 可讀開始時間
"window_end_str": self._format_time(window.end_time) # 可讀結束時間
}
results.append(window_result)
# 步驟3:清理過期窗口狀態
if expired_windows:
self.window_state.remove_expired_windows(group_key, expired_windows)
return results
處理流程完整範例:
# 事件處理追蹤:{"customer_id": "C001", "amount": 100, "timestamp": "14:03:00"}
# 步驟1:時間戳提取 -> 1635739980000 (14:03:00的毫秒時間戳)
# 步驟2:分組鍵提取 -> "C001"
# 步驟3:窗口計算 -> TimeWindow[14:00:00-14:05:00) (5分鐘窗口)
# 步驟4:聚合更新 -> CountAggregator.update(1) -> count變為2
# 步驟5:時間追蹤 -> 更新C001的最新時間為1635739980000
# 步驟6:過期檢查 -> 檢查是否有過期窗口,如有則輸出結果並清理狀態
步驟 4.1.1:SimpleDataFrame 窗口方法
class SimpleDataFrame:
"""原有 DataFrame 類別擴展窗口功能"""
def window(self, duration_ms: int, grace_period_ms: int = 0) -> 'WindowedDataFrame':
"""創建固定時間窗口 - 裝飾者模式入口"""
# 步驟1:創建窗口裝飾器
# 將原有 DataFrame 包裝成窗口化版本
return WindowedDataFrame(self, duration_ms, grace_period_ms)
步驟 4.1.2:WindowedDataFrame
class WindowedDataFrame:
"""窗口化的 DataFrame - 裝飾原始 DataFrame"""
def __init__(self, source_df: 'SimpleDataFrame', duration_ms: int, grace_period_ms: int = 0):
# 步驟1:保存原對象
self.source_df = source_df
# 步驟2:存儲窗口配置
self.duration_ms = duration_ms # 窗口大小(毫秒)
self.grace_period_ms = grace_period_ms # 寬限期(毫秒)
# 步驟3:創建窗口計算器
self.window_calculator = FixedWindowCalculator(duration_ms)
# 設計優勢:
# - source_df:保持對原對象的引用,可以調用原有方法
# - window_calculator:專門負責窗口時間計算
# - 職責分離:窗口邏輯與原始邏輯解耦
def group_by(self, key: Union[str, Callable]) -> 'WindowedGroupedDataFrame':
"""窗口化分組 - 返回窗口分組對象"""
return WindowedGroupedDataFrame(self, key)
步驟 4.1.3:WindowedGroupedDataFrame 聚合介面
class WindowedGroupedDataFrame:
"""窗口化分組的 DataFrame - 提供聚合方法"""
def __init__(self, windowed_df: WindowedDataFrame, group_key: Union[str, Callable]):
# 步驟1:保存窗口化 DataFrame 引用
self.windowed_df = windowed_df
self.group_key = group_key
# 步驟2:初始化配置狀態
self._aggregation_configured = False # 防止重複配置
def count(self) -> 'SimpleDataFrame':
"""窗口化計數聚合"""
return self._setup_windowed_aggregation(CountAggregator, "count")
步驟 4.2.1:統一聚合設置方法
def _setup_windowed_aggregation(self, aggregator_class, aggregation_name: str,
value_field: Optional[str] = None) -> 'SimpleDataFrame':
"""設置窗口化聚合 - 統一處理流程"""
# 步驟1:防止重複配置
if self._aggregation_configured:
raise RuntimeError("Aggregation already configured")
# 步驟2:創建窗口化 GroupBy 操作
self.windowed_groupby_op = WindowedGroupByOperation(
group_key=self.group_key, # 分組鍵
window_calculator=self.windowed_df.window_calculator, # 窗口計算器
aggregator_factory=aggregator_class, # 聚合器類型
grace_period_ms=self.windowed_df.grace_period_ms # 寬限期
)
# 步驟3:存儲聚合配置
self.value_field = value_field
# 步驟4:創建結果 DataFrame
result_df = SimpleDataFrame(
f"{self.windowed_df.source_df.name}_windowed_{aggregation_name}"
)
# 步驟5:設置事件處理邏輯
self._setup_windowed_processing(result_df)
# 步驟6:標記為已配置並返回
self._aggregation_configured = True
return result_df
步驟 4.2.2:事件處理攔截機制
def _setup_windowed_processing(self, result_df: 'SimpleDataFrame'):
"""設置窗口化處理邏輯 - 函數攔截模式"""
# 步驟1:保存原始處理函數
original_process = self.windowed_df.source_df.process_message
# 步驟2:定義增強處理函數
def enhanced_process(message: Dict[str, Any]) -> bool:
try:
# 子步驟2.1:先執行原本的處理流程
original_result = original_process(message)
# 子步驟2.2:只有原處理成功才執行窗口化聚合
if original_result:
# 預處理:準備聚合數據
if self.value_field:
message = message.copy()
message["value"] = message.get(self.value_field, 0)
# 執行窗口化聚合
window_results = self.windowed_groupby_op.process_event(message)
# 將每個窗口結果發送到結果 DataFrame
for window_result in window_results:
result_df.process_message(window_result)
return original_result
except Exception as e:
# 子步驟2.3:異常處理 - 退回原始處理
logger.error(f"Error in windowed aggregation processing: {e}")
return original_process(message)
# 步驟3:替換處理函數 - 攔截機制的核心
self.windowed_df.source_df.process_message = enhanced_process
步驟 4.3.1:基本窗口聚合用法
# API 鏈式調用展示
windowed_counts = (orders_df # SimpleDataFrame
.window(minutes(5)) # -> WindowedDataFrame
.group_by("customer_id") # -> WindowedGroupedDataFrame
.count() # -> SimpleDataFrame (結果)
)
今天我們完成了流式聚合系統最重要的進化:從全局累積統計轉向時間分段統計。這個看似簡單的改變,實際上是將分組鍵從一維擴展到二維的架構升級,讓我們的聚合系統具備了時間感知能力。
核心的技術突破在於窗口對齊算法。通過一個優雅的數學公式 timestamp % window_size
,我們解決了事件時間到窗口邊界的映射問題,無論事件何時到達,都能精確計算出它所屬的時間窗口。這種設計既保證了計算效率,又確保了窗口邊界的一致性。
在狀態管理方面,我們建立了以 (group_key, window)
為複合主鍵的存儲結構,每個窗口維護獨立的聚合狀態。更重要的是,我們實現了完整的窗口生命週期管理機制:當窗口過期時,系統會自動輸出最終結果並清理相關狀態,避免記憶體洩漏的同時保證了結果的及時性。
API 設計保持了優雅的一致性,只需在原有的聚合調用前加上 .window(duration)
就能將全局聚合轉換為窗口化聚合。這種設計既維持了向後兼容性,又提供了強大的時間分析能力,讓開發者能夠輕鬆實現按時段的業務監控、異常檢測和趨勢分析。
窗口化聚合為實時數據分析開啟了新的可能性。我們不再只能看到「從開始到現在」的累積數據,而是能夠觀察「每個時段」的具體表現,這對於理解數據的時間模式和及時響應業務變化具有重要意義。
有了窗口化聚合的基礎,我們可以類似地實現更多 SQL 風格的算子:
這些算子的實現模式都很相似:設計專門的聚合器,然後利用我們已有的窗口化框架。重點在於選擇合適的數據結構和算法來高效維護狀態。
隨著聚合功能越來越豐富,我們面臨一個關鍵問題:如何保證狀態的可靠性?當程序崩潰重啟時,所有的聚合狀態都會丟失。下一章我們將探討狀態的持久化保存與故障恢復機制,讓流式處理系統從「功能完整」邁向「生產就緒」。