iT邦幫忙

2025 iThome 鐵人賽

DAY 17
1
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 17

【知其然,更知其所以然】Day17:狀態的生死之謎 - 持久化

  • 分享至 

  • xImage
  •  

經過前面的努力,我們打造了功能豐富的流式處理系統:過濾、聚合、窗口、join 樣樣不缺。但現在面臨一個殘酷的現實:程序一重啟,所有狀態灰飛煙滅!

今天我們要理解流式處理最核心的挑戰之一:狀態管理與故障恢復

狀態丟失的痛點

看似完美的系統,其實很脆弱

System State During Runtime:
┌─────────────────────┐
│ In-Memory Aggregate │
│ State               │
├─────────────────────┤
│ C001: count=150     │
│ C002: count=89      │
│ C003: count=203     │
│ ...                 │
└─────────────────────┘
         ↓ Program Crash
┌─────────────────────┐
│ After Restart       │
├─────────────────────┤
│ (Empty State)       │
│                     │
│ Starting from       │
│ scratch...          │
└─────────────────────┘

丟失狀態的後果

  • 聚合重置:客戶訂單統計從零開始
  • 窗口混亂:時間窗口邊界不一致
  • Join 失效:關聯狀態丟失,無法匹配
  • 業務中斷:實時報表顯示錯誤數據

狀態管理的三大核心概念

1. Serialize(序列化)- 把狀態變成可保存的格式

內存中的對象狀態          序列化後的數據
┌─────────────────┐    ┌─────────────────┐
│ CountAggregator │────│ {"count": 42}   │
│ count = 42      │    │                 │
└─────────────────┘    └─────────────────┘

┌─────────────────┐    ┌─────────────────┐
│ TimeWindow      │────│ {"start": 14000,│
│ start = 14:00   │    │  "end": 14:05}  │
│ end = 14:05     │    │                 │
└─────────────────┘    └─────────────────┘

常見序列化格式

  • JSON: 人類可讀,跨語言,但只支持基本數據類型
  • Pickle: Python 專用,支持複雜對象,但不跨語言
  • Protobuf: 高效,跨語言,但需要定義 schema

2. Deserialize(反序列化)- 把保存的數據還原成對象

保存的數據               恢復後的對象狀態
┌─────────────────┐    ┌─────────────────┐
│ {"count": 42}   │────│ CountAggregator │
│                 │    │ count = 42      │
└─────────────────┘    └─────────────────┘

┌─────────────────┐    ┌─────────────────┐
│ {"start": 14000,│────│ TimeWindow      │
│  "end": 14:05}  │    │ start = 14:00   │
│                 │    │ end = 14:05     │
└─────────────────┘    └─────────────────┘

3. Checkpoint(檢查點)- 狀態保存的時機和策略

Step-by-Step Checkpoint Process Timeline:

Time: T1────T2────T3────T4
      │     │     │     │
      ▼     ▼     ▼     ▼
     Event Event Event Event
      │     │     │     │
      └─────┴─────┴─────┘
         Accumulate
        In-Memory State
             │
             ▼
    ┌─ CHECKPOINT TRIGGERED ─┐
    │                        │
    │ Step 1: Pause Events   │◀─── (Block new events)
    │         Processing     │
    │                        │
    │ Step 2: Serialize      │◀─── Convert state to JSON/Binary
    │         Current State  │
    │                        │
    │ Step 3: Upload to      │◀─── Send serialized data
    │         Remote Store   │     to S3/GCS/Kafka
    │                        │
    │ Step 4: Update         │◀─── Mark checkpoint as complete
    │         Metadata       │
    │                        │
    │ Step 5: Resume Event   │◀─── Continue processing
    │         Processing     │
    └────────────────────────┘
             │
             ▼
      Continue────Continue────Continue
     Event T5    Event T6    Event T7
      │           │           │
      └───────────┴───────────┘
         Next Checkpoint Cycle

檢查點策略

  • 時間觸發: 每隔 N 秒保存一次
  • 事件觸發: 每處理 N 個事件保存一次
  • 手動觸發: 在關鍵業務時點手動保存

兩種主要的狀態存儲方案

方案一:遠端存儲

State Saving Process:
┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│Stream Proc. │=>│Checkpoint   │=>│Serialize    │=>│Save to      │
│Program      │  │Manager      │  │State        │  │Remote Store │
└─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘

State Recovery Process:
┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│Program      │=>│Download from│=>│Deserialize  │=>│Restore      │
│Restart      │  │Remote Store │  │State        │  │Program State│
└─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘
遠端存儲結構:
Remote Store (S3/GCS/Azure Blob)
├── checkpoints/app-instance-1/
│   ├── checkpoint_20241201_140000.json
│   ├── checkpoint_20241201_140030.json
│   └── latest.json
└── checkpoints/app-instance-2/
    ├── checkpoint_20241201_140000.json
    └── latest.json

優點

  • 高可用性,防止單點故障
  • 跨機器共享狀態
  • 自動備份與版本控制
  • 擴展性好

缺點

  • 網絡延遲影響性能
  • 依賴外部存儲服務
  • 需要處理網絡異常

方案二:Kafka Topic 存儲

State Saving Process:
┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│Stream Proc. │=>│Checkpoint   │=>│Serialize    │=>│Send to      │
│Program      │  │Manager      │  │State        │  │Kafka        │
└─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘

State Recovery Process:
┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│Program      │=>│Read from    │=>│Deserialize  │=>│Restore      │
│Restart      │  │Kafka        │  │State        │  │Program State│
└─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘

Kafka Cluster Architecture:
┌─────────────────────────────┐
│     checkpoint-topic        │
├─────────────┬───────────────┤
│ partition-0 │ partition-1   │
│ replicas    │ replicas      │
│ 1,2,3       │ 1,2,3         │
└─────────────┴───────────────┘
Kafka Topic 結構:
Topic: app-checkpoints
├── Key: "customer-aggregator"
│   Value: {"C001": {"count": 42}, "C002": {"count": 33}}
├── Key: "window-state"  
│   Value: {"windows": [...], "timestamps": {...}}
└── Key: "join-state"
    Value: {"order_123": [...], "order_456": [...]}

優點

  • 分散式存儲,高可用
  • 自動複製,數據安全
  • 可以跨機器共享狀態
  • 支持多消費者讀取

缺點

  • 依賴 Kafka 基礎設施
  • 網絡延遲
  • 配置相對複雜

實際例子:電商訂單聚合的狀態恢復

讓我們用一個具體例子來理解整個流程:

場景:實時統計每個客戶的訂單數量

運行中狀態:
┌─────────────────┐
│ customer_stats  │
├─────────────────┤
│ C001: 25 orders │
│ C002: 18 orders │
│ C003: 43 orders │
└─────────────────┘

檢查點保存(JSON格式):
{
  "timestamp": 1635739530000,
  "customer_stats": {
    "C001": {"count": 25},
    "C002": {"count": 18},
    "C003": {"count": 43}
  }
}

程序崩潰重啟後:
1. 讀取檢查點文件
2. 反序列化狀態:C001=25, C002=18, C003=43  
3. 繼續處理新訂單:C001 下了新訂單 → count變成26
4. 保存新檢查點...

結果:無縫恢復,沒有數據丟失!

這個例子展示了狀態管理如何讓系統在故障後「記起」之前的工作。

總結:狀態管理是流式處理的生命線

狀態管理讓流式處理系統具備了「記憶」能力,從一個健忘的計算器進化成有持續記憶的智能系統。通過合理的序列化、檢查點和恢復機制,我們可以構建真正可靠的生產級流式處理系統。

值得注意的是,當狀態規模達到 TB 級別時,會面臨更多挑戰:檢查點時間過長、內存不足、恢復速度慢等問題。這些大狀態(Large State)的處理需要更精細的優化策略,將在後續分享 Flink 與大狀態管理中詳細探討這些進階技術。

Day 18 預告:從 SQL 到流式處理的魔法轉換

「老闆,我只會寫 SQL,這個流式處理太複雜了...」

這是每個 Junior 工程師的心聲。他們對 SELECT COUNT(*) FROM orders WHERE customer_id = 'C001' GROUP BY product_id 駕輕就熟,但面對 orders_df.filter(...).group_by(...).count() 就開始頭疼。

下一章我們要解決這個問題:如何將熟悉的 SQL 語法自動轉換成我們的 SimpleStream 算子?我們將探討 SQL AST 解析、算子映射規則,以及如何讓資料庫老手也能輕鬆上手流式處理。讓 SQL 成為流式處理的入口,而不是阻礙!


上一篇
【知其然,更知其所以然】Day16: 固定時間窗口讓聚合更實用
下一篇
【知其然,更知其所以然】Day18:從 DataFrame 到支援 SQL 語法
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言