“Now I can look at you in peace; I don't eat you any more.”
― Franz Kafka
腸胃支配者
fromBeginning
:消費者群組會依據最後提交的偏移量,從那個偏移量開始讀取訊息,如果偏移量是無效 ( invalid ) 或是未定義 ( not defined ),那就會看參數 fromBeginning
的設定值來決定消費者群組怎麼動作
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.subscribe({ topic: 'other-topic', fromBeginning: false })
如果設定為 true
消費者會從最早的偏移量開始讀取訊息,設定為 false
消費者會從最新的偏移量開始讀取訊息
預設值是 false
kafka.consumer({
groupId: <String>,
partitionAssigners: <Array>,
sessionTimeout: <Number>,
rebalanceTimeout: <Number>,
heartbeatInterval: <Number>,
metadataMaxAge: <Number>,
allowAutoTopicCreation: <Boolean>,
maxBytesPerPartition: <Number>,
minBytes: <Number>,
maxBytes: <Number>,
maxWaitTimeInMs: <Number>,
retry: <Object>,
maxInFlightRequests: <Number>,
rackId: <String>
})
消費者提供了暫停(pause)跟恢復(resume)兩個方法,讓消費者可以在消費一個或多個主題途中暫停或恢復消費,也有提供方法已暫停(paused)去取得目前暫停消費的主題清單
需要注意的是暫停消費一個主題,會是在下一個週期才生效,消費者還是可以讀取當前這個批次的訊息
暫停一個消費者沒有訂閱的主題是不會動作的,恢復消費一個沒有被暫停的主題也是不會動作的
另外,當消費者未啟動時去暫停或是恢復消費都會噴錯
範例,情境是消費者使用的外部依賴系統超過負載,我們可以暫停消費主題,等待一段時間後重新恢復消費主題
await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })
await consumer.run({ eachMessage: async ({ topic, message }) => {
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
consumer.pause([{ topic }])
setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
}
throw e
}
}})
這部分是可以做到更細微地控制的,可以暫停主題中的特定分區,而非只能暫停整個主題
舉例來說,這樣可以避免中斷整個消費,只因為其中一個分區過於緩慢
consumer.run({
partitionsConsumedConcurrently: 3, // Default: 1
eachMessage: async ({ topic, partition, message }) => {
// This will be called up to 3 times concurrently
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
consumer.pause([{ topic, partitions: [partition] }])
// Other partitions will keep fetching and processing, until if / when
// they also get throttled
setTimeout(() => {
consumer.resume([{ topic, partitions: [partition] }])
// Other partitions that are paused will continue to be paused
}, e.retryAfter * 1000)
}
throw e
}
},
})
可以呼叫 paused 方法去取得暫停的主題清單
const pausedTopicPartitions = consumer.paused()
for (const topicPartitions of pausedTopicPartitions) {
const { topic, partitions } = topicPartitions
console.log({ topic, partitions })
}