昨天我們學習了 Kafka 多層架構設計,解決了「JDBC 無法 streaming read」的問題,讓 Silver 層保持在 Kafka 中的流動性。
因商業邏輯的改動,所以需重跑歷史資料,並同時重建狀態,實務上的確只能一筆一筆跑,相比起 batch pipeline 擅長大量運算,streaming 在這方面的確是一個消耗效能與時間的事情。
尤其當真正開始處理 TB 級的大狀態時(資料多且有狀態運算很多),會開始遭遇一些讓人頭疼的問題。所以實務上如果要避免這些難點,就會想辦法減少狀態或增加一些系統複雜度來跨過這些困難
重要提醒:本文中所有數字(狀態大小、處理時間、記憶體用量等)都是為了說明概念而設計的範例數值,實際數據會因硬體配置、業務場景、網路環境等因素而有很大差異。請依據實際情況進行測試和調整。
Checkpoint 是 Flink 的容錯機制核心,它會定期將所有算子的狀態數據保存到持久化存儲中,確保系統故障時能夠恢復:
Checkpoint 流程:
1. JobManager 觸發 Checkpoint
↓
2. Source 算子插入 Checkpoint Barrier
↓
3. 各算子收到 Barrier,保存當前狀態
↓
4. 狀態寫入遠端持久化存儲
↓
5. 回報完成狀態給 JobManager
當狀態從 GB 級增長到 TB 級時,Checkpoint 時間呈指數級增長:
狀態大小與 Checkpoint 時間:
小狀態 (GB 級):
本地記憶體 → 網路傳輸 → 持久化存儲
1GB → 2-3秒 → 完成
大狀態 (TB 級):
本地記憶體 → 網路傳輸 → 持久化存儲
1TB → 10-30分鐘 → 完成
實際影響:
隨著業務需求增加,JOIN 的表越來越多,每個 JOIN 都需要維護狀態:
-- 簡單 JOIN (狀態較小)
SELECT o.*, c.customer_name
FROM orders o JOIN customers c ON o.customer_id = c.customer_id
-- 複雜 JOIN (狀態巨大)
SELECT o.*, c.*, p.*, d.*, r.*, pr.*
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN payments p ON o.order_id = p.order_id
JOIN deliveries d ON o.order_id = d.order_id
JOIN reviews r ON o.order_id = r.order_id
JOIN promotions pr ON o.promo_id = pr.promo_id
狀態增長:每增加一個 JOIN,狀態可能成倍增長。
當 Flink Job 失敗或需要重啟時,系統會從最近的成功 Checkpoint 恢復所有算子的狀態,這個過程就是 Job 復原:
Job 復原流程:
1. JobManager 選擇最新可用的 Checkpoint
↓
2. 分配 TaskManager 資源
↓
3. 從持久化存儲讀取狀態數據
↓
4. 重建各算子的內部狀態
↓
5. 恢復數據處理
TB 級狀態的復原時間可能長達數小時,這對生產環境是災難性的:
復原時間對比:
小狀態 (GB 級):
持久化存儲 → 網路傳輸 → TaskManager
讀取 → 1-2分鐘 → 狀態重建完成
大狀態 (TB 級):
持久化存儲 → 網路傳輸 → TaskManager
讀取 → 30-60分鐘 → 狀態重建完成
業務影響:
狀態就像是一個資料庫
可以把 Flink 的狀態想像成一個資料庫,當資料量越來越大時,查詢和操作就會越來越慢:
傳統資料庫的效能問題:
小資料庫 (1GB):
├── 查詢速度:毫秒級
├── 索引效果:很好
└── 整體效能:很順暢
大資料庫 (1TB):
├── 查詢速度:秒級甚至分鐘級
├── 索引效果:受限
└── 整體效能:明顯變慢
狀態大小對處理速度的影響
就像資料庫表格越大、JOIN 越慢一樣,Flink 狀態越大,每次狀態操作就越慢
面對這些 TB 級狀態的挑戰,我們需要從根本上重新思考狀態管理策略。
Flink State Backend 演進:
1. MemoryStateBackend
- 狀態存在 TaskManager 記憶體中
- 適合:小狀態(MB - GB級)
- 問題:狀態大會導致記憶體不足
2. FsStateBackend
- 狀態存在記憶體中,Checkpoint 寫入檔案系統
- 適合:中等狀態(GB級)
- 問題:大狀態一樣會爆記憶體
3. RocksDBStateBackend
- 狀態存在 RocksDB(磁碟),記憶體只做緩存
- 適合:大狀態(TB級)
- 代價:需要高速磁碟,複雜度增加
現實選擇:當狀態超過 GB 級別時,只剩 RocksDB 一個選擇,除非你有鈔能力。
RocksDB 將狀態存在磁碟上,但這帶來新的挑戰:
硬體需求升級:
技術門檻提高:
RocksDB 還有很多可以調優的地方,而且每個公司的 Flink 場景不同,也沒有標準答案。光是 Flink 本身就有很多知識點需要學習,再加上 RocksDB 調優,會對團隊帶來很大的學習負擔。
這邊只是經驗分享,不過多深入 RocksDB 的複雜調優細節,這邊只是讓讀者知道調優 RocksDB 參數可加強有狀態運算。
傳統問題:每個事件都觸發狀態更新,造成頻繁 IO
Traditional Stream Processing:
Event1 ─┐
Event2 ─┼── [Individual Processing] ─ State Read/Write ─ RocksDB
Event3 ─┘
↓
每個事件都要訪問狀態 = 大量 IO 開銷
MiniBatch 解決方案:批量處理,減少狀態訪問次數
MiniBatch Processing:
Event1 ─┐
Event2 ─┼─ [Batch Buffer] ─ [Batch Processing] ─ State Read/Write ─ RocksDB
Event3 ─┘
↓
1000個事件只需 1次狀態訪問 = IO 開銷減少 1000倍
透過犧牲少量延遲,換取巨大的性能提升。
什麼是 Interval JOIN?
Interval JOIN 是一種有時間窗口限制的 JOIN 操作,只會關聯在特定時間範圍內的數據。與無限期保持狀態的 Regular JOIN 不同,Interval JOIN 會自動清理超出時間窗口的舊數據。
-- Regular JOIN:無限期保持所有數據
SELECT * FROM orders o
JOIN payments p ON o.order_id = p.order_id;
-- Interval JOIN:只關聯 1 小時內的數據
SELECT * FROM orders o
JOIN payments p ON o.order_id = p.order_id
AND o.rowtime BETWEEN p.rowtime - INTERVAL '1' HOUR
AND p.rowtime + INTERVAL '1' HOUR;
Regular JOIN 的狀態爆炸:
Regular JOIN State Growth:
Month 1: [Orders: 100GB] + [Payments: 50GB] = 150GB
Month 2: [Orders: 200GB] + [Payments: 100GB] = 300GB
Month 3: [Orders: 300GB] + [Payments: 150GB] = 450GB
↓
State keeps growing indefinitely!
Interval JOIN 的狀態控制:
Interval JOIN Fixed State:
Hour 1: [Orders: 1GB] + [Payments: 0.5GB] = 1.5GB
Hour 2: [Orders: 1GB] + [Payments: 0.5GB] = 1.5GB
Hour 3: [Orders: 1GB] + [Payments: 0.5GB] = 1.5GB
↓
State size remains constant over time!
只保留時間窗口內的數據,超時自動清理,狀態大小維持穩定。
關鍵考量:這個優化策略完全仰賴於商業邏輯的判斷。需要與業務方討論:
技術優化必須在業務需求和系統性能之間取得平衡。
Traditional JOIN:需要在狀態中緩存完整維度表
Traditional Stream JOIN:
Orders Stream ────┐
├── [JOIN] ── Output Stream
Customer Stream ──┘
↓
需要狀態中緩存: 完整客戶維度表 (10GB)
Lookup JOIN:只緩存熱點數據,需要時查詢資料庫
Lookup JOIN:
Orders Stream ────┐
├── [LOOKUP] ── Output Stream
Customer DB ──────┘
↓
只需緩存: 熱點客戶 (100MB)
狀態使用對比:
商業邏輯考量:Lookup JOIN 同樣需要評估業務適合性:
詳細的 Lookup JOIN 優缺點分析可以參考 Day 10 的討論,這裡著重於狀態優化的角度。
TTL (Time To Live) 讓狀態可以自動過期清理,避免無限制累積:
Without TTL - State Growth Problem:
Day 1: [User States: 100MB]
Day 30: [User States: 3GB]
Day 90: [User States: 9GB]
Day 365: [User States: 36GB]
↓
State keeps growing forever!
With TTL - Controlled State Size:
Day 1: [Active States: 100MB]
Day 30: [Active States: 100MB] + Auto cleanup of 30+ day old data
Day 90: [Active States: 100MB] + Auto cleanup of 30+ day old data
Day 365: [Active States: 100MB] + Auto cleanup of 30+ day old data
↓
State size remains stable!
根據業務特性設定不同的過期時間:
Business-Driven TTL Strategy:
Risk Control (1 Hour):
[New Transaction] ──→ [Risk Check] ──→ [Auto Expire: 1h]
短期狀態,快速清理
User Profile (30 Days):
[User Activity] ──→ [Profile Update] ──→ [Auto Expire: 30d]
中期狀態,定期更新
Historical Stats (365 Days):
[Annual Report] ──→ [Long-term Storage] ──→ [Auto Expire: 1y]
長期狀態,年度清理
核心原則:TTL 時間設定必須符合業務邏輯,不能為了技術優化而破壞業務需求。
讓我們用一個完整的例子,展示如何整合所有優化策略:
Complete State Optimization Architecture:
Orders Stream ────┐ ┌──── OrderDetail Stream
│ │
▼ ▼
┌─────────────────────────────────────────────┐
│ [Interval JOIN: 1h window] │
│ (Recent data only) │
└─────────────────┬───────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ [MiniBatch: 1000 events] │
└─────────────────┬───────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ [RocksDB Storage] │
│ [TTL: 7 days cleanup] │
└─────────────────┬───────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ [Lookup JOIN Layer] │
│ ┌──────────────┬──────────────┐ │
│ ▼ │ ▼ │
│┌────────────┐ │ ┌─────────────────┐ │
││Orders │◄──┼──►│OrderDetail │ │
││Historical │ │ │Historical │ │
││Redis/KV DB │ │ │Redis/KV DB │ │
│└────────────┘ │ └─────────────────┘ │
│ ▲ │ ▲ │
│ │ ▼ │ │
│ Compensate [Output] Compensate │
│ lost data (Complete) lost data │
└─────────────────────────────────────────────┘
│
▼
┌─────────────────────┐ ┌───────────────────┐
│ [Final Output OLAP] │◄──────│ [Batch recaculate]│
└─────────────────────┘ └───────────────────┘
各層優化策略說明:
1. Interval JOIN (1小時窗口):
2. MiniBatch 聚合 (1000 events):
3. RocksDB 存儲:
4. TTL 自動清理 (7天):
5. 歷史數據補償 (Compensate)(Optional):
適用場景:業務需要完整歷史數據的情況
6. Batch Engine 負責重算新商業邏輯:
效果對比:
Before Optimization:
- State Size: 5TB (all historical data)
- Checkpoint Time: 45 minutes
- Recovery Time: 4 hours
After Optimization:
- State Size: 50GB (recent + aggregated data)
- Checkpoint Time: 1 minutes
- Recovery Time: 1 minutes
關鍵平衡:犧牲少量狀態完整性與些微增加系統複雜度,換取系統穩定性和性能提升。
大狀態是流處理中的性能殺手,尤其當狀態規模達到 TB 級別時,系統面臨檢查點過慢、故障恢復困難、性能下降等問題。
介紹了許多優化的方向,但優化總是有代價的。工程師的智慧就在於找到業務需求和系統性能的最佳平衡點。
真正的生產級 streaming data pipeline 不是追求功能的完美,而是在複雜的約束條件下做出合理的權衡。理解這些權衡的本質,才是掌握流式處理技術的關鍵。
過去幾天我們深入了解了 Flink + Kafka 的 Kappa 架構,這套組合在流式處理領域流行了很久。從基礎的流式 JOIN、狀態管理,到高級的多層架構設計和性能優化,我們見識了這個經典組合的強大能力和常見問題。
然而,隨著近幾年 Data Lakehouse 概念的興起,加上許多業務場景的實時性要求其實只需要分鐘級(而非毫秒到秒級),所以誕生了一個新的流式架構組合:Flink + Iceberg。