“I usually solve problems by letting them devour me.”
― Franz Kafka
打不贏就加入他
消費者可以利用多台機器和進程來消費很多主題、進行負載平衡,視為一個消費者群組,一但當有消費者失敗的時候,會自動分配給這個消費者群組中的其他成員,對於代理 ( broker ) 來說,每個消費者必須擁有一個在消費者群組中唯一的ID
const consumer = kafka.consumer({ groupId: 'my-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'topic-A' })
// 可以多次呼叫訂閱方法
await consumer.subscribe({ topic: 'topic-B' })
await consumer.subscribe({ topic: 'topic-C' })
// 可以設定從第一則訊息開始讀取
await consumer.subscribe({ topic: 'topic-D', fromBeginning: true })
RegExp
去訂閱多個同系列的主題Alternatively, you can subscribe to multiple topics at once using a RegExp:
await consumer.connect()
await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })
值得注意的是,消費者不會追蹤訂閱主題是否建立,所以如果今天有一個代理擁有主題 topic-1
和 topic-2
,但是你利用 RegExp
下了條件 /topic-.*/
去訂閱主題後,主題 topic-3
在那之後創建出來,消費者並不會自動去新增訂閱 topic-3
KafkaJS 提供了兩種方式去處理消費的訊息
首先,eachmessage
提供一個最方便和簡單的方式去使用消費者 API,它每次只讀取一個訊息,他在實作在 eachBatch
之上,並且會依據你的設定自動的去提交 ( commmit )你的偏移量 ( offsets )、心跳監測 ( heartbeat),如果你的需求是單純的,那使用 eachMessage
是個好選擇
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
})
有一些使用上的需求需要直接以批次去處理訊息,eachBatch
提供了一些功能讓你可以更方的的去實作,像是resolveOffset
、heartbeat
、commitOffsetsIfNecessary
、uncommittedOffsets
、 isRunning
、isStale
,基本上所有處理完成的訊息,eachBatch
會自動去提供偏移量
註記:eachBatch
是更進階的使用方式,因此你必須了解 session timeouts 和 heartbeats 是怎麼設定的
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
}) => {
for (let message of batch.messages) {
console.log({
topic: batch.topic,
partition: batch.partition,
highWatermark: batch.highWatermark,
message: {
offset: message.offset,
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
}
})
resolveOffset(message.offset)
await heartbeat()
}
},
})
KafkaJS
會自動提教批次處理的最後一個偏移量heartbeatInterval
的設定值去跟代理傳送心跳autoCommitInterval
和 autoCommitThreshold
的設定值,去提交偏移量,要注意的是,如果這個參數未被使用,那eachBatch
就不會自動提交偏移量範例:
consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
await processMessage(message)
resolveOffset(message.offset)
await heartbeat()
}
}
})
上面這則例子,如果消費者在中途需要關閉,該批次剩餘的訊息不會被消費、也不會提交偏移量,因此,你可以安心的關掉消費者,而不用擔心會遺失訊息,當然如果消費者批次處理訊息時,遇到其他原因而中斷,也不會去處理剩餘的訊息。