iT邦幫忙

2021 iThome 鐵人賽

DAY 18
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 18

卡夫卡的藏書閣【Book18】- Kafka - KafkaJS 生產者 - 6

  • 分享至 

  • xImage
  •  

“I have spent all my life resisting the desire to end it.”
― Franz Kafka, Letters to Milena
To be or not to be


restartOnFailure 失敗時重啟

當消費者耗盡所有的重試次數後,一個非同步 ( async ) 的函式會被呼叫來決定是否重啟消費者(重新設定 consumer.run),例如,這可用於在崩潰之前乾淨地關閉資源,如果這是首選的話。該函數將傳遞錯誤,這允許它根據錯誤類型決定是否退出應用程序或允許它重新啟動。
這個函數具有以下簽名: (error: Error) => Promise<boolean>
如果 promise 解析為 true:消費者將重新啟動
如果 promise 解析為 false:消費者將不會重新啟動
如果承諾拒絕:消費者將重新啟動
如果沒有提供 restartOnFailure:消費者將重新啟動
請注意,該函數只會在 KafkaJS 認為可重試的錯誤時被調用。對於不可重試的錯誤,不會重新啟動使用者並且不會調用 restartOnFailure 函數。請參閱此列表以了解 Kafka 協議中的可重試錯誤,但請注意,在 KafkaJS 中仍會認為某些其他錯誤是可重試的,例如網絡連接錯誤。

交易

KafkaJS 是支援 Kafka transactions
註記: 交易在 Kafka 版本v0.11以上才有支援
在交易中傳送訊息
呼叫異步方式 producer.transaction() 來初始化一個交易,回傳的交易物件包含有方法 send 和 sendBatch,交易完成時可以呼叫方法 transaction.commit() 或是 transaction.abort() 來結束交易,消費者只會看到那些已經提交 ( commit ) 的交易,避免讀取稍後 rollback 掉的資料ㄡ
註記: Kafka 需要生產者做到以下設定去保證訊息只會傳送一次( EoS ("Exactly-once-semantics"))
生產者必須設定 max flight request 為1
生產者必須設定 acks 為-1,也就是必須等待所有副本都回應收到
生產者必須有無限的重試次數
為了要生效 EOS 必須設定 maxInFlightRequests: 1idempotent: true

const client = new Kafka({
  clientId: 'transactional-client',
  brokers: ['kafka1:9092', 'kafka2:9092'],
})
const producer = client.producer({ maxInFlightRequests: 1, idempotent: true })

一次交易中可以傳送多個訊息,當呼叫 transaction.abort 時,所有的訊息都會 rollback

const transaction = await producer.transaction()

try {
  await transaction.send({ topic, messages })

  await transaction.commit()
} catch (e) {
  await transaction.abort()
}

傳送偏移量
將傳送偏移量視為交易的一部分,也就是代表只有交易成功時才會去提交偏移量,這邊使用方法 transaction.sendOffsets()

await transaction.sendOffsets({
  consumerGroupId, topics
})

topics 的結構如下:

[{
  topic: <String>,
  partitions: [{
    partition: <Number>,
    offset: <String>
  }]
}]

上一篇
卡夫卡的藏書閣【Book17】- Kafka - KafkaJS 生產者 - 5
下一篇
卡夫卡的藏書閣【Book19】- Kafka - KafkaJS 消費者 1
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言