“I am in chains. Don't touch my chains.”
― Kafka, Franz
斷開鎖鏈、斷開魂結
預設的情況下 eachMessage
是按照順序去消費分區 ( partition )中指定主題 ( topic ) 的訊息,如果想要一次同時併發取得多個訊息,你可以調高參數 partitionsConsumedConcurrently
的設定值
consumer.run({
partitionsConsumedConcurrently: 3, // Default: 1
eachMessage: async ({ topic, partition, message }) => {
// This will be called up to 3 times concurrently
},
})
在同一個分區的訊息是保證其順序性的,但是其實是可以同時處理來自不同分區的多個訊息,如果 eachMessage
是包含有異步的工作內容,像是發起網路請求、硬碟I/O...等,那使用這個參數可以提高效能,但是如果 eachMessage
完成是同步的,那這個設定並不會有顯著的影響
這個參數也可以使用在 eachBatch
,在 eachBatch
加上參數 partitionsConsumedConcurrently
,就可以一次併發處理多個批次的訊息
這邊會建議設定參數 partitionsConsumedConcurrently
的數值不要超過你消費的分區數量,另外一個影響效能的地方是 CPU 的上限,建議不要設定超過你 CPU 顆數,如果是一開始使用就從比較低的數值開始設定測試、觀察是否有效提高吞吐量
從 Kafka 取得訊息一定都是批次去取得訊息的,就算你使用 eachMessage
也是,而當整個批次都被消費處理完後,最後一個訊息的偏移量會被自動提交給 Kafka
週期性地去提交偏移量可以讓消費者在從異常狀況復原更容易,像是消費組的重新平衡、去除陳舊資料,但是過於頻繁的提交偏移量會造成網路流量負擔、降低處理速度,因此,自動提交提供了很有彈性的提交偏移量設定,有兩種方式可供選擇
首先,autoCommitInterval
消費者會以固定時間週期去提交偏移量,舉例來說,每五秒提交一次,autoCommitInterval
的預設值是 null,單位毫秒 ( milliseconds )
consumer.run({
autoCommitInterval: 5000,
// ...
})
第二種方式是 autoCommitThreshold
消費者會在處理完固定數量的訊息後去提交偏移量,舉例來說,每一百筆訊息提交一次,該參數的預設值是 null
consumer.run({
autoCommitThreshold: 100,
// ...
})
你可以同時使用這兩個參數,是可行的,消費者會在滿足其中一個條件 ( 時間或是數量 ) 時就提交偏移量
如果你將自動提交偏移量關閉 ( autoCommit ) 你還是可以進行手動提交,提交的方式有以下幾種
commitOffsetsIfNecessary
在 eachBatch
的 callback,這參數仍然會使用 autoCommit
設定參數commitOffsets
方法來提交生產者的 commitOffsets
方法是最底層的提交方式,它會忽略所有自動提交的相關設定值,使用這個方法可以自由選擇要提交的偏移量、一次性的提交多個偏移量,這個在某些狀況很有用處,像是製作重置工具,在 consumer.run
後將自動提交關閉,手動去提交偏移量,這邊的偏移量是用來決定消費者要從分區的哪個位置開始消費訊息
consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
// Process the message somehow
},
})
consumer.commitOffsets([
{ topic: 'topic-A', partition: 0, offset: '1' },
{ topic: 'topic-A', partition: 1, offset: '3' },
{ topic: 'topic-B', partition: 0, offset: '2' }
])
值得注意的是你不一定要將消費者的偏移量存在 Kafka 之中,你可以選擇將偏移量存在你想到的機器,像是 SQL 資料庫,可以讓消費者的設定 exactly once
更加有保障
存在 Kafka 之外偏移量的狀妄