“I have spent all my life resisting the desire to end it.”
― Franz Kafka, Letters to Milena
To be or not to be
當消費者耗盡所有的重試次數後,一個非同步 ( async
) 的函式會被呼叫來決定是否重啟消費者(重新設定 consumer.run
),例如,這可用於在崩潰之前乾淨地關閉資源,如果這是首選的話。該函數將傳遞錯誤,這允許它根據錯誤類型決定是否退出應用程序或允許它重新啟動。
這個函數具有以下簽名: (error: Error) => Promise<boolean>
如果 promise 解析為 true:消費者將重新啟動
如果 promise 解析為 false:消費者將不會重新啟動
如果承諾拒絕:消費者將重新啟動
如果沒有提供 restartOnFailure:消費者將重新啟動
請注意,該函數只會在 KafkaJS 認為可重試的錯誤時被調用。對於不可重試的錯誤,不會重新啟動使用者並且不會調用 restartOnFailure 函數。請參閱此列表以了解 Kafka 協議中的可重試錯誤,但請注意,在 KafkaJS 中仍會認為某些其他錯誤是可重試的,例如網絡連接錯誤。
KafkaJS 是支援 Kafka transactions
註記: 交易在 Kafka 版本v0.11以上才有支援
在交易中傳送訊息
呼叫異步方式 producer.transaction()
來初始化一個交易,回傳的交易物件包含有方法 send 和 sendBatch
,交易完成時可以呼叫方法 transaction.commit()
或是 transaction.abort()
來結束交易,消費者只會看到那些已經提交 ( commit ) 的交易,避免讀取稍後 rollback 掉的資料ㄡ
註記: Kafka 需要生產者做到以下設定去保證訊息只會傳送一次( EoS ("Exactly-once-semantics"))
生產者必須設定 max flight request 為1
生產者必須設定 acks 為-1,也就是必須等待所有副本都回應收到
生產者必須有無限的重試次數
為了要生效 EOS 必須設定 maxInFlightRequests: 1
和 idempotent: true
const client = new Kafka({
clientId: 'transactional-client',
brokers: ['kafka1:9092', 'kafka2:9092'],
})
const producer = client.producer({ maxInFlightRequests: 1, idempotent: true })
一次交易中可以傳送多個訊息,當呼叫 transaction.abort 時,所有的訊息都會 rollback
const transaction = await producer.transaction()
try {
await transaction.send({ topic, messages })
await transaction.commit()
} catch (e) {
await transaction.abort()
}
傳送偏移量
將傳送偏移量視為交易的一部分,也就是代表只有交易成功時才會去提交偏移量,這邊使用方法 transaction.sendOffsets()
。
await transaction.sendOffsets({
consumerGroupId, topics
})
topics 的結構如下:
[{
topic: <String>,
partitions: [{
partition: <Number>,
offset: <String>
}]
}]