在資料流處理的世界裡,很多人覺得「即時」等於「一條一條馬上寫入 DB」。但這就好比外送員每接到一單,就立刻停下手上的工作、騎車送到客人家 - 送得慢、油耗高,還容易堵在半路。
我們在 Day 5 說過,一個更聰明的做法,是先過濾、後入庫,再批量寫入 DB。今天,我們就來聊聊批量寫入最佳實踐。
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
我們在前面提到,批量寫入(batch insert)能大幅提升吞吐與效能。下面這段程式碼是一個 SimplePostgreSQLSink 的實作,它會:
在 init 中,我們初始化了用來暫存資料的 _buffer:
class SimplePostgreSQLSink(BaseSink):
"""
支援 batch 寫入的 PostgreSQL Sink
"""
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
self.connection = None # DB 連接
# Batch 相關
self._buffer: List[Dict[str, Any]] = []
這是對外提供的主要入口,每當收到一筆資料時:
┌─────────────────────┐
│ Buffer │
│ │
│ [data1, data2, ...] │
└─────────────────────┘
│
▼
┌──────────────────────┐
│ Count Trigger │
│ │
│ size ≥ 100? │
│ Yes → Flush to DB │
└──────────────────────┘
def write(self, message: Dict[str, Any]):
"""
將 message 加入 buffer,達到 batch_size 時批次寫入
"""
# 從 message 取得實際資料
data = message.get('value', {})
if not data:
return
# 加入 buffer
self._buffer.append(data)
# 達到批次大小,批量寫入
if len(self._buffer) >= self.batch_size:
# buffer 滿了,批量寫入
# ... 執行批量插入
self._buffer.clear() # 清空 buffer
核心特色:累積到指定筆數後,批量寫入資料庫。
如果資料流入速度很慢,buffer 可能很久才滿,這樣就失去了即時性。
解決方案:除了「筆數觸發」,還要加上「時間觸發」。
┌─────────────────────┐
│ Buffer │
│ │
│ [data1, data2, ...] │
└─────────────────────┘
│ │
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Count │ │ Timer │
│ Trigger │ │ Trigger │
│ │ │ │
│size ≥ 100│ │100ms tick│
└──────────┘ └──────────┘
│ │
└─────┬───┘
│
▼
┌─────────────────────┐
│ Flush to DB │
│ │
│ INSERT batch data │
│ Clear buffer │
└─────────────────────┘
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
self._buffer = []
self._start_timer() # 啟動定時器
def write(self, message):
data = message.get('value', {})
self._buffer.append(data)
# 筆數觸發:滿了就寫入
if len(self._buffer) >= self.batch_size:
self._flush_and_clear()
def _start_timer(self):
# 每 100ms 檢查一次是否需要 flush
timer = threading.Timer(0.1, self._timer_flush)
timer.start()
def _timer_flush(self):
if self._buffer: # 有資料就寫入
self._flush_and_clear()
self._start_timer() # 重新啟動計時器
核心概念:雙重觸發機制,確保資料既能批量處理,又不會延遲太久。
批量寫入是 Stream Processing 中的核心優化技術,它解決了「一筆一筆寫入」帶來的性能瓶頸:
在雙 11、黑五這種訂單洪峰時,這種設計能讓系統從容應對,而不是被一條條寫入拖垮。同時在深夜低峰期,也不會讓用戶等太久才看到資料更新。
這種批量寫入設計並非我們的創新,而是業界的標準做法:
知名 Streaming Engine:
batch.size
和 batch.interval
Database Client 也都有:
executeBatch()
方法COPY
指令和 execute_values()
insertMany()
批量插入原因很簡單:這是性能優化的必然選擇,任何高性能系統都會採用類似設計。
下一步思考:當 Consumer 處理速度跟不上 Producer 時會發生什麼?這就帶出了「背壓與流量控制」的重要性。
想像你的 SimpleStreamEngine 每秒處理 1000 筆資料,但資料庫寫入已經優化到極限,硬體瓶頸讓它只能每秒寫入 500 筆。多出來的 500 筆去哪了?堆積在記憶體裡,直到系統 OOM 崩潰!
明天我們會探討: