iT邦幫忙

2023 iThome 鐵人賽

DAY 21
0

Flink 的一個重要特性是狀態保存和還原機制,它可以讓 Flink 應用在發生故障時,能夠自動恢復到之前的狀態,而不會丟失或重複處理數據。

Flink 的狀態保存和還原機制基於以下幾個概念

狀態 State

Flink 應用中的任何運算子(Operator)都可以有自己的狀態,例如計數器、窗口、緩衝區等。狀態可以分為兩種類型:鍵控狀態(Keyed State)和算子狀態(Operator State)。

鍵控狀態是根據數據流中的鍵(Key)來分組和存儲的,例如每個用戶的消費金額。

算子狀態是不依賴於鍵的,例如一個計時器或一個隊列。

快照(Snapshot)

Flink 會定期地對運算子的狀態進行快照,並將快照存儲到外部的持久化存儲系統中,例如 HDFS 或 S3。快照是一種分佈式的全局一致性的檢查點(Checkpoint),它保證了所有運算子的快照都是在同一個邏輯時間點上進行的,而且不會影響數據流的處理。

還原(Restore)

當 Flink 應用發生故障時,例如某個節點掛掉或者程序出錯,Flink 會從最近的一個快照中恢復運算子的狀態,並重新啟動數據流的處理。Flink 會確保數據流的處理具有精確一次(Exactly-Once)的語義,也就是說,每筆數據只會被處理一次,不會出現丟失或重複。 下面是一個使用 Flink 的 Java API 來實現一個簡單的單詞計數程序的程式範例,它展示了如何使用鍵控狀態來保存每個單詞出現的次數:

    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化狀態
        countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Long.class));
    }

    @Override
    public Tuple2<String, Long> map(String value) throws Exception {
        // 獲取當前的計數
        Long count = countState.value();
        if (count == null) {
            // 如果是第一次出現,則設為 1
            count = 1L;
        } else {
            // 如果不是第一次出現,則加 1
            count += 1;
        }
        // 更新狀態
        countState.update(count);
        // 返回結果
        return new Tuple2<>(value, count);
    }
});

// 輸出結果
counts.print();
// 啟動 Flink 程式
env.execute("WordCount with State");

還記得前面在介紹 Web UI 時有個 Savepoint 嗎?我們如果有開啟自動儲存的話,上面講的 state 會被寫成 checkpoint,在 Web 的這裡:

Untitled

在 Overview 裡可以看到有哪些 operator 狀態要被記錄,並且說明當前最新的 checkpoint 儲存路徑。當這個 job 真的遇到狀況,首先系統會試著自動載入最新的 checkpoint 後還原狀態。

在必要的時候,我們也可以看看 History 裡的記錄,決定要從哪個點還原:

Untitled

不過要注意一點,多久自動儲存一次跟每次儲存要花的時間會有關係。如果你的狀態大到要花一分鐘才能完整寫入,那你的 checkpoint 間隔設定 30 秒的話,你就會看到系統整天在試著存檔而無法好好的去執行你的 job。


上一篇
Flink key-by - Day20
下一篇
來寫一個 Flink Streaming Job 吧 - Day22
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言