iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 24

【知其然,更知其所以然】Day 24: Flink 大狀態管理與維運挑戰

  • 分享至 

  • xImage
  •  

昨天我們學習了 Kafka 多層架構設計,解決了「JDBC 無法 streaming read」的問題,讓 Silver 層保持在 Kafka 中的流動性。
因商業邏輯的改動,所以需重跑歷史資料,並同時重建狀態,實務上的確只能一筆一筆跑,相比起 batch pipeline 擅長大量運算,streaming 在這方面的確是一個消耗效能與時間的事情。
尤其當真正開始處理 TB 級的大狀態時(資料多且有狀態運算很多),會開始遭遇一些讓人頭疼的問題。所以實務上如果要避免這些難點,就會想辦法減少狀態或增加一些系統複雜度來跨過這些困難

重要提醒:本文中所有數字(狀態大小、處理時間、記憶體用量等)都是為了說明概念而設計的範例數值,實際數據會因硬體配置、業務場景、網路環境等因素而有很大差異。請依據實際情況進行測試和調整。

大狀態困難點

1. Checkpoint 越來越慢:狀態達到 TB 級別時的性能瓶頸

什麼是 Checkpoint?

Checkpoint 是 Flink 的容錯機制核心,它會定期將所有算子的狀態數據保存到持久化存儲中,確保系統故障時能夠恢復:

Checkpoint 流程:

1. JobManager 觸發 Checkpoint
   ↓
2. Source 算子插入 Checkpoint Barrier
   ↓
3. 各算子收到 Barrier,保存當前狀態
   ↓
4. 狀態寫入遠端持久化存儲
   ↓
5. 回報完成狀態給 JobManager

TB 級別狀態帶來的性能瓶頸

當狀態從 GB 級增長到 TB 級時,Checkpoint 時間呈指數級增長:

狀態大小與 Checkpoint 時間:

小狀態 (GB 級):
本地記憶體 → 網路傳輸 → 持久化存儲
  1GB     →   2-3秒    →   完成

大狀態 (TB 級):
本地記憶體 → 網路傳輸 → 持久化存儲
  1TB     →   10-30分鐘  →   完成

實際影響

  • Checkpoint 超時:預設 10 分鐘超時,大狀態經常無法完成
  • 網路頻寬飽和:多個 TaskManager 同時上傳,競爭網路資源
  • 背壓影響:Checkpoint 期間,算子需要暫緩處理,造成整體延遲

為什麼狀態會越來越大?

隨著業務需求增加,JOIN 的表越來越多,每個 JOIN 都需要維護狀態:

-- 簡單 JOIN (狀態較小)
SELECT o.*, c.customer_name 
FROM orders o JOIN customers c ON o.customer_id = c.customer_id

-- 複雜 JOIN (狀態巨大)
SELECT o.*, c.*, p.*, d.*, r.*, pr.*
FROM orders o 
JOIN customers c ON o.customer_id = c.customer_id
JOIN payments p ON o.order_id = p.order_id
JOIN deliveries d ON o.order_id = d.order_id  
JOIN reviews r ON o.order_id = r.order_id
JOIN promotions pr ON o.promo_id = pr.promo_id

狀態增長:每增加一個 JOIN,狀態可能成倍增長。

2. Job 復原緩慢:重啟時重新載入大量狀態的痛苦

什麼是 Job 復原?

當 Flink Job 失敗或需要重啟時,系統會從最近的成功 Checkpoint 恢復所有算子的狀態,這個過程就是 Job 復原:

Job 復原流程:

1. JobManager 選擇最新可用的 Checkpoint
   ↓
2. 分配 TaskManager 資源
   ↓  
3. 從持久化存儲讀取狀態數據
   ↓
4. 重建各算子的內部狀態
   ↓
5. 恢復數據處理

大狀態導致的復原痛苦

TB 級狀態的復原時間可能長達數小時,這對生產環境是災難性的:

復原時間對比:

小狀態 (GB 級):
持久化存儲 → 網路傳輸 → TaskManager
    讀取     →   1-2分鐘   →    狀態重建完成

大狀態 (TB 級):
持久化存儲 → 網路傳輸 → TaskManager
    讀取     →   30-60分鐘  →    狀態重建完成

業務影響

  • 業務中斷時間延長:原本 5 分鐘能恢復,現在需要 1 小時
  • SLA 違約:無法滿足「服務 5 分鐘內恢復」的承諾
  • 客戶投訴增加:長時間服務不可用

3. 狀態過大導致效能瓶頸:就像資料庫資料太多會變慢一樣

狀態就像是一個資料庫
可以把 Flink 的狀態想像成一個資料庫,當資料量越來越大時,查詢和操作就會越來越慢:

傳統資料庫的效能問題:
小資料庫 (1GB):
├── 查詢速度:毫秒級
├── 索引效果:很好
└── 整體效能:很順暢

大資料庫 (1TB):
├── 查詢速度:秒級甚至分鐘級
├── 索引效果:受限
└── 整體效能:明顯變慢

狀態大小對處理速度的影響
就像資料庫表格越大、JOIN 越慢一樣,Flink 狀態越大,每次狀態操作就越慢

技術解決方案:從問題到實務

面對這些 TB 級狀態的挑戰,我們需要從根本上重新思考狀態管理策略。

Flink 大狀態必須得用 RocksDB 存儲

Flink State Backend 演進:

1. MemoryStateBackend
   - 狀態存在 TaskManager 記憶體中
   - 適合:小狀態(MB - GB級)
   - 問題:狀態大會導致記憶體不足

2. FsStateBackend  
   - 狀態存在記憶體中,Checkpoint 寫入檔案系統
   - 適合:中等狀態(GB級)
   - 問題:大狀態一樣會爆記憶體

3. RocksDBStateBackend
   - 狀態存在 RocksDB(磁碟),記憶體只做緩存
   - 適合:大狀態(TB級)
   - 代價:需要高速磁碟,複雜度增加

現實選擇:當狀態超過 GB 級別時,只剩 RocksDB 一個選擇,除非你有鈔能力。

改用 RocksDB 的現實代價

RocksDB 將狀態存在磁碟上,但這帶來新的挑戰:

硬體需求升級

  • 高速 SSD 成為必需品:RocksDB 頻繁讀寫磁碟,機械硬碟會拖垮性能
  • 成本大幅上升:NVMe SSD 比機械硬碟貴很多
  • 運維複雜度增加:需要懂 RocksDB 調優、磁碟 IO 監控

技術門檻提高

  • 需要了解 RocksDB 的存儲原理
  • 需要會調整磁碟 IO 參數
  • 需要監控磁碟使用率和 IOPS

RocksDB 還有很多可以調優的地方,而且每個公司的 Flink 場景不同,也沒有標準答案。光是 Flink 本身就有很多知識點需要學習,再加上 RocksDB 調優,會對團隊帶來很大的學習負擔。

這邊只是經驗分享,不過多深入 RocksDB 的複雜調優細節,這邊只是讓讀者知道調優 RocksDB 參數可加強有狀態運算。

架構優化:加速狀態存取與減少狀態大小

MiniBatch 聚合優化

傳統問題:每個事件都觸發狀態更新,造成頻繁 IO

Traditional Stream Processing:

Event1 ─┐
Event2 ─┼── [Individual Processing] ─ State Read/Write ─ RocksDB
Event3 ─┘
   ↓
每個事件都要訪問狀態 = 大量 IO 開銷

MiniBatch 解決方案:批量處理,減少狀態訪問次數

MiniBatch Processing:

Event1 ─┐
Event2 ─┼─ [Batch Buffer] ─ [Batch Processing] ─ State Read/Write ─ RocksDB
Event3 ─┘
   ↓
1000個事件只需 1次狀態訪問 = IO 開銷減少 1000倍

透過犧牲少量延遲,換取巨大的性能提升。

Interval JOIN 取代 Regular JOIN

什麼是 Interval JOIN?

Interval JOIN 是一種有時間窗口限制的 JOIN 操作,只會關聯在特定時間範圍內的數據。與無限期保持狀態的 Regular JOIN 不同,Interval JOIN 會自動清理超出時間窗口的舊數據。

-- Regular JOIN:無限期保持所有數據
SELECT * FROM orders o 
JOIN payments p ON o.order_id = p.order_id;

-- Interval JOIN:只關聯 1 小時內的數據
SELECT * FROM orders o 
JOIN payments p ON o.order_id = p.order_id 
AND o.rowtime BETWEEN p.rowtime - INTERVAL '1' HOUR 
                 AND p.rowtime + INTERVAL '1' HOUR;

Regular JOIN 的狀態爆炸

Regular JOIN State Growth:

Month 1: [Orders: 100GB] + [Payments: 50GB] = 150GB
Month 2: [Orders: 200GB] + [Payments: 100GB] = 300GB  
Month 3: [Orders: 300GB] + [Payments: 150GB] = 450GB
   ↓
State keeps growing indefinitely!

Interval JOIN 的狀態控制

Interval JOIN Fixed State:

Hour 1: [Orders: 1GB] + [Payments: 0.5GB] = 1.5GB
Hour 2: [Orders: 1GB] + [Payments: 0.5GB] = 1.5GB
Hour 3: [Orders: 1GB] + [Payments: 0.5GB] = 1.5GB
   ↓
State size remains constant over time!

只保留時間窗口內的數據,超時自動清理,狀態大小維持穩定。

關鍵考量:這個優化策略完全仰賴於商業邏輯的判斷。需要與業務方討論:

  • 訂單和付款的合理時間窗口是多久?1小時、6小時、還是24小時?
  • 超出時間窗口的數據真的可以丟棄嗎?
  • 是否會影響業務準確性?

技術優化必須在業務需求和系統性能之間取得平衡。

Lookup JOIN 減少狀態存儲

Traditional JOIN:需要在狀態中緩存完整維度表

Traditional Stream JOIN:

Orders Stream ────┐
                  ├── [JOIN] ── Output Stream
Customer Stream ──┘
      ↓
需要狀態中緩存: 完整客戶維度表 (10GB)

Lookup JOIN:只緩存熱點數據,需要時查詢資料庫

Lookup JOIN:

Orders Stream ────┐
                  ├── [LOOKUP] ── Output Stream
Customer DB ──────┘
      ↓
只需緩存: 熱點客戶 (100MB)

狀態使用對比

  • Traditional JOIN:完整維度表 = 10GB 狀態
  • Lookup JOIN:熱點緩存 = 100MB 狀態
  • 狀態減少:99% 的狀態被消除

商業邏輯考量:Lookup JOIN 同樣需要評估業務適合性:

  • 查詢頻率:維度表查詢會不會對資料庫造成壓力?
  • 數據一致性:能接受維度數據的輕微延遲嗎?

詳細的 Lookup JOIN 優缺點分析可以參考 Day 10 的討論,這裡著重於狀態優化的角度。

狀態生命週期管理:TTL 自動清理

TTL (Time To Live) 讓狀態可以自動過期清理,避免無限制累積:

Without TTL - State Growth Problem:

Day 1:  [User States: 100MB]
Day 30: [User States: 3GB]
Day 90: [User States: 9GB]
Day 365: [User States: 36GB]
   ↓
State keeps growing forever!
With TTL - Controlled State Size:

Day 1:  [Active States: 100MB] 
Day 30: [Active States: 100MB] + Auto cleanup of 30+ day old data
Day 90: [Active States: 100MB] + Auto cleanup of 30+ day old data
Day 365: [Active States: 100MB] + Auto cleanup of 30+ day old data
   ↓
State size remains stable!

根據業務特性設定不同的過期時間:

Business-Driven TTL Strategy:

Risk Control (1 Hour):
[New Transaction] ──→ [Risk Check] ──→ [Auto Expire: 1h]
短期狀態,快速清理

User Profile (30 Days):  
[User Activity] ──→ [Profile Update] ──→ [Auto Expire: 30d]
中期狀態,定期更新

Historical Stats (365 Days):
[Annual Report] ──→ [Long-term Storage] ──→ [Auto Expire: 1y]  
長期狀態,年度清理

核心原則:TTL 時間設定必須符合業務邏輯,不能為了技術優化而破壞業務需求。

綜合優化架構:實際案例展示

讓我們用一個完整的例子,展示如何整合所有優化策略:

Complete State Optimization Architecture:

Orders Stream ────┐                                  ┌──── OrderDetail Stream
                  │                                  │
                  ▼                                  ▼
            ┌─────────────────────────────────────────────┐
            │        [Interval JOIN: 1h window]           │
            │         (Recent data only)                  │
            └─────────────────┬───────────────────────────┘
                              │
                              ▼
            ┌─────────────────────────────────────────────┐
            │        [MiniBatch: 1000 events]             │
            └─────────────────┬───────────────────────────┘
                              │
                              ▼
            ┌─────────────────────────────────────────────┐
            │             [RocksDB Storage]               │
            │           [TTL: 7 days cleanup]             │
            └─────────────────┬───────────────────────────┘
                              │
                              ▼
            ┌─────────────────────────────────────────────┐
            │            [Lookup JOIN Layer]              │
            │  ┌──────────────┬──────────────┐            │
            │  ▼              │              ▼            │
            │┌────────────┐   │   ┌─────────────────┐     │
            ││Orders      │◄──┼──►│OrderDetail      │     │
            ││Historical  │   │   │Historical       │     │
            ││Redis/KV DB │   │   │Redis/KV DB      │     │
            │└────────────┘   │   └─────────────────┘     │
            │        ▲        │        ▲                  │
            │        │        ▼        │                  │
            │    Compensate [Output] Compensate           │
            │    lost data  (Complete) lost data          │
            └─────────────────────────────────────────────┘
                              │
                              ▼
                ┌─────────────────────┐       ┌───────────────────┐
                │ [Final Output OLAP] │◄──────│ [Batch recaculate]│
                └─────────────────────┘       └───────────────────┘

各層優化策略說明
1. Interval JOIN (1小時窗口)

  • Orders 和 OrderDetail 只保留 1 小時內的數據進行 JOIN
  • 超時數據自動清理,狀態從 TB 級降到 GB 級

2. MiniBatch 聚合 (1000 events)

  • 收集 1000 個訂單事件後批量處理
  • 狀態讀寫次數減少 1000 倍

3. RocksDB 存儲

  • 大狀態存在高速 SSD 上

4. TTL 自動清理 (7天)

  • 聚合結果 7 天後自動過期
  • 防止狀態無限累積

5. 歷史數據補償 (Compensate)(Optional)
適用場景:業務需要完整歷史數據的情況

  • 問題:Interval JOIN 和 TTL 清理掉的歷史數據導致狀態不完整,是否需要歷史數據補償取決於業務場景
  • 解決方案:將歷史的 Orders 和 OrderDetail 數據預先存在 response latency 夠低 的 infra,如Redis/KV DB 中
  • 補償機制:當處理需要歷史數據時,通過 Lookup JOIN 從 Redis/KV DB 查詢補足缺失的歷史狀態

6. Batch Engine 負責重算新商業邏輯

  • 問題:Streaming Engine 需一筆一筆跑,不擅長重跑歷史資料
  • 解決方案:讓 streaming job 重新提交後只開始新資料的部分,狀態會在搭配歷史數據補償後補上,而 OLAP DB 裡面的資料,透過 batch engine 重跑後直接寫入

效果對比

Before Optimization:
- State Size: 5TB (all historical data)
- Checkpoint Time: 45 minutes  
- Recovery Time: 4 hours

After Optimization:
- State Size: 50GB (recent + aggregated data)
- Checkpoint Time: 1 minutes
- Recovery Time: 1 minutes  

關鍵平衡:犧牲少量狀態完整性與些微增加系統複雜度,換取系統穩定性和性能提升。

總結

大狀態是流處理中的性能殺手,尤其當狀態規模達到 TB 級別時,系統面臨檢查點過慢、故障恢復困難、性能下降等問題。

介紹了許多優化的方向,但優化總是有代價的。工程師的智慧就在於找到業務需求和系統性能的最佳平衡點。

真正的生產級 streaming data pipeline 不是追求功能的完美,而是在複雜的約束條件下做出合理的權衡。理解這些權衡的本質,才是掌握流式處理技術的關鍵。

Day 25 預告:Flink + Iceberg

過去幾天我們深入了解了 Flink + Kafka 的 Kappa 架構,這套組合在流式處理領域流行了很久。從基礎的流式 JOIN、狀態管理,到高級的多層架構設計和性能優化,我們見識了這個經典組合的強大能力和常見問題。

然而,隨著近幾年 Data Lakehouse 概念的興起,加上許多業務場景的實時性要求其實只需要分鐘級(而非毫秒到秒級),所以誕生了一個新的流式架構組合:Flink + Iceberg


上一篇
【知其然,更知其所以然】Day 23: 多層架構設計
下一篇
【知其然,更知其所以然】Day 25: Flink + Iceberg
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」26
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言