昨天我們學會了讓系統在過載時「踩煞車」,但還有個更重要的問題:外送員怎麼知道自己送到哪裡了?今天來聊聊 Checkpoint。
想像一下,你是個勤奮的外送員,一整天送了 500 個包裹。突然間,你的手機沒電了,系統重啟…
糟糕的情況:
理想的情況:
這就是今天要解決的核心問題:如何精確記錄和恢復處理進度。
在流處理世界裡,Checkpoint 就像外送員定期回調度中心報告:
「我是 A 路線的外送員,剛送完第 157 號包裹,請記錄下來。B 路線我送到第 89 號了。」
調度中心會把這些進度寫在白板上:
配送進度記錄
A 路線: ✓ 第 157 號包裹
B 路線: ✓ 第 89 號包裹
最後更新: 14:30
這樣即使外送員手機沒電,重啟後也能從正確位置繼續!
讓我們為 Simple Streaming 加上這個「定期報告進度」的機制。
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
為了讓本日重點更突出,我們會適度簡化前幾天的程式碼實作細節,主要聚焦在今天的 Checkpoint 機制講解。
我們的 Checkpoint 系統需要能夠記錄和保存處理進度。讓我們看看它是如何工作的:
Checkpoint 核心流程圖解
Step 1: Message Processing (訊息處理)
┌─────────────────────────────────────┐
│ New Message Arrives │
│ (topic: orders, partition: 0) │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ record_message() │
│ │
│ 記錄:orders[0] -> offset 487 │
└─────────────────────────────────────┘
Step 2: Time Check (時間檢查)
┌─────────────────────────────────────┐
│ should_commit()? │
│ │
│ current_time - start >= 5.0s │
└─────────────────────────────────────┘
│ YES │ NO
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Trigger │ │ Continue │
│ Checkpoint │ │ Processing │
└─────────────────┘ └─────────────────┘
Step 3: Two-Phase Commit (兩階段提交)
┌─────────────────────────────────────┐
│ commit() │
│ │
│ Phase 1: Flush all sinks │
│ Phase 2: Save to persistent │
└─────────────────────────────────────┘
設計思路說明:
Step 1 - 訊息處理追蹤:每處理一筆訊息,就記錄其 topic、partition 和 offset:
class SimpleCheckpoint:
"""
簡單的進度記錄器
就像外送員的工作日誌,記錄每條路線送到哪裡了
"""
def __init__(self, commit_interval: float = 5.0):
self.commit_interval = commit_interval
# 當前進度(還沒保存的)
self._offsets = {} # {(topic, partition): offset}
# 已確認的進度(已保存的)
self._committed_offsets = {}
# 計時相關
self._created_at = time.time() # 記錄建立時間
logger.info(f"開始記錄配送進度: 每 {commit_interval} 秒保存一次")
def record_message(self, topic: str, partition: int, offset: int):
"""記錄送達一個包裹"""
tp = (topic, partition)
self._offsets[tp] = offset
logger.debug(f"記錄進度: {tp} -> 包裹 #{offset}")
def should_commit(self) -> bool:
"""檢查是否該向調度中心報告了"""
return (time.time() - self._created_at) >= self.commit_interval
def reset(self):
"""重置 checkpoint,開始新的計時週期"""
self._created_at = time.time() # 重新開始計時
self._offsets.clear() # 清空當前進度
logger.debug("Checkpoint 已重置,開始新的計時週期")
當時間到了需要保存進度時,我們執行兩階段提交流程。讓我們詳細分析這個過程:
兩階段提交詳細流程
Phase 1: Flush All Sinks (刷新所有輸出)
┌─────────────────────────────────────┐
│ _flush_sinks() │
│ │
│ PostgreSQL Sink: flush buffer │
│ File Sink: flush buffer │
│ Other Sinks: flush buffer │
└─────────────────────────────────────┘
│
▼
Phase 2: Commit Progress (保存進度)
┌─────────────────────────────────────┐
│ _commit_progress() │
│ │
│ orders[0] -> offset 487 │
│ Save to checkpoint.json │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Return Success │
│ │
│ logger.info("進度保存完成") │
│ return True │
└─────────────────────────────────────┘
階段分析說明:
Phase 1 - 確保數據到達:在記錄進度前,先確保所有 sink 都完成了數據寫入:
def commit(self) -> bool:
"""
兩階段進度保存:
1. 確認所有配送點收到包裹
2. 向調度中心報告進度
"""
logger.info(f"開始保存進度: {self._message_count} 個包裹")
try:
# 階段1: 確認配送點都收到包裹了
logger.debug("階段1: 確認所有配送點")
self._flush_sinks() # 讓所有 sink 確認收到資料
# 階段2: 向調度中心報告進度(關鍵!)
logger.debug("階段2: 報告配送進度")
self._commit_progress()
# 階段3: 更新統計
logger.info("進度保存完成")
return True
except Exception as e:
logger.error(f"進度保存失敗: {e}")
return False
def _commit_progress(self):
"""
向調度中心報告進度 - 這是核心功能!
"""
logger.info("向調度中心報告進度:")
for (topic, partition), offset in self._offsets.items():
# 記錄已確認的進度
self._committed_offsets[(topic, partition)] = offset
logger.info(f"{topic}[{partition}] -> 包裹 #{offset}")
# 關鍵:保存到持久化儲存
self._save_to_persistent_storage()
logger.info(f"總共報告了 {len(self._offsets)} 條路線的進度")
def _save_to_persistent_storage(self):
"""
將 checkpoint 保存到持久化儲存
確保系統重啟後能恢復進度
"""
try:
# 保存到檔案系統
checkpoint_data = {
'committed_offsets': self._committed_offsets,
'timestamp': time.time()
}
with open('checkpoint.json', 'w') as f:
json.dump(checkpoint_data, f)
logger.debug("Checkpoint 已保存到持久化儲存")
except Exception as e:
logger.error(f"保存 checkpoint 失敗: {e}")
raise
在 SimpleStreamingEngine 中,我們需要創建 checkpoint 實例並整合到主處理循環
首先在應用系統啟動時創建 checkpoint:
class SimpleStreamingEngine:
def __init__(self, commit_interval: float = 5.0):
# 創建 checkpoint 實例
self._checkpoint = SimpleCheckpoint(commit_interval)
logger.info(f"StreamingEngine 初始化完成,checkpoint 間隔: {commit_interval}s")
# 其他初始化...
...
然後在主處理循環中加入 checkpoint 觸發機制:
# 在處理每個包裹時記錄進度
def message_handler(message):
# 處理包裹
df.process_message(message)
# 記錄進度
topic = source.topic
partition = message.partition
offset = message.offset
self._checkpoint.record_message(topic, partition, offset)
# 檢查是否該報告進度了 - 關鍵觸發點!
if self._checkpoint.should_commit():
self._try_commit_checkpoint() # 執行三階段保存流程
def _try_commit_checkpoint(self):
"""
嘗試執行 checkpoint - 這是觸發機制的核心
"""
logger.debug("Checkpoint 時間到,開始 commit")
try:
# 執行完整的 checkpoint 流程
success = self._checkpoint.commit()
if success:
# commit 成功,重置 checkpoint 開始下一輪
self._checkpoint.reset()
logger.debug("Checkpoint commit 成功並重置")
else:
# commit 失敗(可能因為過載),保持當前狀態
logger.debug("Checkpoint commit 失敗,保持當前狀態")
except Exception as e:
logger.error(f"Checkpoint commit 異常: {e}")
系統採用定時觸發機制:每隔 commit_interval
秒(例如 5 秒)自動保存進度。
# 定時觸發:在主循環中檢查
if self._checkpoint.should_commit():
self._try_commit_checkpoint()
當外送員(系統)重新上班時,需要從上次中斷的地方繼續工作。讓我們看看恢復流程:
System Restart Recovery Flow (系統重啟恢復流程):
Step 1: Load Checkpoint (載入檢查點)
┌─────────────────────────────────────┐
│ _load_checkpoint_from_storage() │
│ │
│ Read checkpoint.json │
│ orders[0] -> offset 487 │
└─────────────────────────────────────┘
│
▼
Step 2: Set Consumer Position (設定消費位置)
┌─────────────────────────────────────┐
│ kafka_consumer.seek() │
│ │
│ Set orders[0] start at 488 │
│ (offset + 1 to avoid duplicate) │
└─────────────────────────────────────┘
│
▼
Step 3: Resume Processing (恢復處理)
┌─────────────────────────────────────┐
│ Start Message Loop │
│ │
│ Continue from where left off │
│ No data loss or duplication │
│ Business as usual │
└─────────────────────────────────────┘
恢復流程說明:
Step 1 - 載入上次進度:從檔案系統讀取上次保存的 checkpoint:
def run(self):
"""啟動系統時恢復進度"""
# 1. 從持久化儲存讀取上次的進度
saved_progress = self._load_checkpoint_from_storage()
# 2. 告訴 Kafka Consumer 從哪裡開始讀取
if saved_progress:
for (topic, partition), offset in saved_progress.items():
logger.info(f"恢復進度: {topic}[{partition}] 從 offset {offset} 開始")
# 設定 Kafka Consumer 的起始位置
self._kafka_consumer.seek(topic, partition, offset + 1)
logger.info("系統啟動完成,從上次中斷處繼續處理")
# 開始正常的訊息處理循環...
def _load_checkpoint_from_storage(self):
"""
從持久化儲存讀取 checkpoint
"""
try:
# 從檔案系統讀取
with open('checkpoint.json', 'r') as f:
checkpoint_data = json.load(f)
return checkpoint_data['committed_offsets']
except FileNotFoundError:
logger.info("找不到 checkpoint 檔案,從頭開始處理")
return {}
except Exception as e:
logger.warning(f"無法載入 checkpoint: {e}")
return {} # 從頭開始處理
進度恢復的核心概念:
我們今天的示範是直接把當下的狀態 snapshot 下來,然後定期存一次,方便大家理解 Checkpoint 的基本概念。
不過,在真實的流處理系統中(尤其是 Kafka、Flink、RisingWave 這類平台),這種「一次性拍照」的方式有個缺點:
一致性挑戰:快照的時間點要和資料處理的邏輯完全對齊,不然會出現漏資料或重複處理的情況。
在更嚴謹的設計中,Checkpoint 事件會像數據一樣在管道中流動:
Checkpoint Barrier Propagation in Streaming Pipeline:
Timeline:
T1: [data] → [data] → [data] → [data]
T2: [data] → [data] → [data] → [data]
T3: [CHECKPOINT_BARRIER_#123] starts propagation
T4: [data] → [BARRIER] → [data] → [data]
T5: [data] → [data] → [BARRIER] → [data]
T6: [data] → [data] → [data] → [BARRIER]
When each Operator receives Checkpoint Barrier:
┌─────────────────────────────────────┐
│ Source receives BARRIER │
│ 1. Stop processing new data │
│ 2. Save state: offset=487 │
│ 3. Forward BARRIER downstream │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Filter receives BARRIER │
│ 1. Process data before BARRIER │
│ 2. Save state: processed=1234 │
│ 3. Forward BARRIER downstream │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Aggregator receives BARRIER │
│ 1. Save window state and results │
│ 2. Forward BARRIER downstream │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Sink receives BARRIER │
│ 1. Flush output buffer │
│ 2. Save state: written=456 │
│ 3. Report completion to Coordinator│
└─────────────────────────────────────┘
這種「事件流動」的設計確保了:
經過三天的學習,我們的 SimpleStreamingEngine 已經從基礎的資料流處理進化為具備進度管理能力的可靠流處理系統:
Day 8: SimpleStreamingEngine with Checkpoint (帶進度保存的流處理架構)
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ KafkaSource │───►│DataFrame.filter()│───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Lambda filter │ │ • Batch buffer │
│ • Consumer seek │ │ • Data transform │ │ • Timer trigger │
│ • Offset resume │ │ │ │ • Bulk insert │
│ │ │ │ │ • Progress track│
└─────────────────┘ └──────────────────┘ └─────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ │
│ • Message handler with progress tracking │
│ • Checkpoint trigger every 5 seconds │
│ • Two-phase commit (flush + save) │
│ • Persistent storage (checkpoint.json) │
│ • Auto recovery on system restart │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleCheckpoint │
│ │
│ • record_message(): Track processing progress │
│ • should_commit(): Time-based trigger (5s) │
│ • commit(): Two-phase progress saving │
│ • reset(): Clear and restart cycle │
│ • Persistent state in checkpoint.json │
└─────────────────────────────────────────────────────────────┘
從基礎處理到可靠系統的進化軌跡:
今天我們為流處理系統實現了「定期保存進度」的能力。這個機制讓系統能夠:
就像優秀的外送員會記錄自己送到哪裡一樣,優秀的流處理系統也需要「記憶力」。雖然每次報告進度會暫停幾毫秒,但換來的是系統的可靠性和可恢復性。
到這裡,我們已經把 Streaming Pipeline 中幾個常見且重要的系統功能走過一輪了:
當然,實際系統裡還有一些比較邊緣、或依場景才會需要的功能,這裡就不花篇幅展開了。接下來,我們要正式進入 Streaming Pipeline 裡最重要、也相對困難的主題:
這個部分會牽涉到:
敬請期待!