iT邦幫忙

2021 iThome 鐵人賽

DAY 20
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 20

卡夫卡的藏書閣【Book20】- Kafka - KafkaJS 消費者 2

  • 分享至 

  • xImage
  •  

“I am in chains. Don't touch my chains.”
― Kafka, Franz
斷開鎖鏈、斷開魂結


併發式的消費分區


預設的情況下 eachMessage 是按照順序去消費分區 ( partition )中指定主題 ( topic ) 的訊息,如果想要一次同時併發取得多個訊息,你可以調高參數 partitionsConsumedConcurrently 的設定值

consumer.run({
    partitionsConsumedConcurrently: 3, // Default: 1
    eachMessage: async ({ topic, partition, message }) => {
        // This will be called up to 3 times concurrently
    },
})

在同一個分區的訊息是保證其順序性的,但是其實是可以同時處理來自不同分區的多個訊息,如果 eachMessage 是包含有異步的工作內容,像是發起網路請求、硬碟I/O...等,那使用這個參數可以提高效能,但是如果 eachMessage 完成是同步的,那這個設定並不會有顯著的影響

這個參數也可以使用在 eachBatch,在 eachBatch 加上參數 partitionsConsumedConcurrently,就可以一次併發處理多個批次的訊息

這邊會建議設定參數 partitionsConsumedConcurrently 的數值不要超過你消費的分區數量,另外一個影響效能的地方是 CPU 的上限,建議不要設定超過你 CPU 顆數,如果是一開始使用就從比較低的數值開始設定測試、觀察是否有效提高吞吐量

自動提交 ( autoCommit )


從 Kafka 取得訊息一定都是批次去取得訊息的,就算你使用 eachMessage 也是,而當整個批次都被消費處理完後,最後一個訊息的偏移量會被自動提交給 Kafka

週期性地去提交偏移量可以讓消費者在從異常狀況復原更容易,像是消費組的重新平衡、去除陳舊資料,但是過於頻繁的提交偏移量會造成網路流量負擔、降低處理速度,因此,自動提交提供了很有彈性的提交偏移量設定,有兩種方式可供選擇

首先,autoCommitInterval 消費者會以固定時間週期去提交偏移量,舉例來說,每五秒提交一次,autoCommitInterval 的預設值是 null,單位毫秒 ( milliseconds )

consumer.run({
  autoCommitInterval: 5000,
  // ...
})

第二種方式是 autoCommitThreshold 消費者會在處理完固定數量的訊息後去提交偏移量,舉例來說,每一百筆訊息提交一次,該參數的預設值是 null

consumer.run({
  autoCommitThreshold: 100,
  // ...
})

你可以同時使用這兩個參數,是可行的,消費者會在滿足其中一個條件 ( 時間或是數量 ) 時就提交偏移量

手動提交 ( Manual committing )


如果你將自動提交偏移量關閉 ( autoCommit ) 你還是可以進行手動提交,提交的方式有以下幾種

  • 使用方法 commitOffsetsIfNecessaryeachBatch 的 callback,這參數仍然會使用 autoCommit 設定參數
  • 在交易 ( transaction ) 中提交偏移量
  • 使用消費者的 commitOffsets 方法來提交

生產者的 commitOffsets 方法是最底層的提交方式,它會忽略所有自動提交的相關設定值,使用這個方法可以自由選擇要提交的偏移量、一次性的提交多個偏移量,這個在某些狀況很有用處,像是製作重置工具,在 consumer.run 後將自動提交關閉,手動去提交偏移量,這邊的偏移量是用來決定消費者要從分區的哪個位置開始消費訊息

consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
        // Process the message somehow
    },
})

consumer.commitOffsets([
  { topic: 'topic-A', partition: 0, offset: '1' },
  { topic: 'topic-A', partition: 1, offset: '3' },
  { topic: 'topic-B', partition: 0, offset: '2' }
])

值得注意的是你不一定要將消費者的偏移量存在 Kafka 之中,你可以選擇將偏移量存在你想到的機器,像是 SQL 資料庫,可以讓消費者的設定 exactly once 更加有保障

存在 Kafka 之外偏移量的狀妄

  • 在自動提交關閉的情況下使用消費者
  • 將訊息的偏移量+1後存起來,為了避免重複消費的問題
  • 重啟後用 seek 去找到消費者

上一篇
卡夫卡的藏書閣【Book19】- Kafka - KafkaJS 消費者 1
下一篇
卡夫卡的藏書閣【Book21】- Kafka - KafkaJS 消費者 3
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言