Flink 的一個重要特性是狀態保存和還原機制,它可以讓 Flink 應用在發生故障時,能夠自動恢復到之前的狀態,而不會丟失或重複處理數據。
Flink 的狀態保存和還原機制基於以下幾個概念
Flink 應用中的任何運算子(Operator)都可以有自己的狀態,例如計數器、窗口、緩衝區等。狀態可以分為兩種類型:鍵控狀態(Keyed State)和算子狀態(Operator State)。
鍵控狀態是根據數據流中的鍵(Key)來分組和存儲的,例如每個用戶的消費金額。
算子狀態是不依賴於鍵的,例如一個計時器或一個隊列。
Flink 會定期地對運算子的狀態進行快照,並將快照存儲到外部的持久化存儲系統中,例如 HDFS 或 S3。快照是一種分佈式的全局一致性的檢查點(Checkpoint),它保證了所有運算子的快照都是在同一個邏輯時間點上進行的,而且不會影響數據流的處理。
當 Flink 應用發生故障時,例如某個節點掛掉或者程序出錯,Flink 會從最近的一個快照中恢復運算子的狀態,並重新啟動數據流的處理。Flink 會確保數據流的處理具有精確一次(Exactly-Once)的語義,也就是說,每筆數據只會被處理一次,不會出現丟失或重複。 下面是一個使用 Flink 的 Java API 來實現一個簡單的單詞計數程序的程式範例,它展示了如何使用鍵控狀態來保存每個單詞出現的次數:
@Override
public void open(Configuration parameters) throws Exception {
// 初始化狀態
countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Long.class));
}
@Override
public Tuple2<String, Long> map(String value) throws Exception {
// 獲取當前的計數
Long count = countState.value();
if (count == null) {
// 如果是第一次出現,則設為 1
count = 1L;
} else {
// 如果不是第一次出現,則加 1
count += 1;
}
// 更新狀態
countState.update(count);
// 返回結果
return new Tuple2<>(value, count);
}
});
// 輸出結果
counts.print();
// 啟動 Flink 程式
env.execute("WordCount with State");
還記得前面在介紹 Web UI 時有個 Savepoint 嗎?我們如果有開啟自動儲存的話,上面講的 state 會被寫成 checkpoint,在 Web 的這裡:
在 Overview 裡可以看到有哪些 operator 狀態要被記錄,並且說明當前最新的 checkpoint 儲存路徑。當這個 job 真的遇到狀況,首先系統會試著自動載入最新的 checkpoint 後還原狀態。
在必要的時候,我們也可以看看 History 裡的記錄,決定要從哪個點還原:
不過要注意一點,多久自動儲存一次跟每次儲存要花的時間會有關係。如果你的狀態大到要花一分鐘才能完整寫入,那你的 checkpoint 間隔設定 30 秒的話,你就會看到系統整天在試著存檔而無法好好的去執行你的 job。