iT邦幫忙

2021 iThome 鐵人賽

DAY 19
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 19

卡夫卡的藏書閣【Book19】- Kafka - KafkaJS 消費者 1

  • 分享至 

  • xImage
  •  

“I usually solve problems by letting them devour me.”
― Franz Kafka
打不贏就加入他


消費者

消費者可以利用多台機器和進程來消費很多主題、進行負載平衡,視為一個消費者群組,一但當有消費者失敗的時候,會自動分配給這個消費者群組中的其他成員,對於代理 ( broker ) 來說,每個消費者必須擁有一個在消費者群組中唯一的ID

創建一個消費者
const consumer = kafka.consumer({ groupId: 'my-group' })
讓消費者訂閱一些主題
await consumer.connect()

await consumer.subscribe({ topic: 'topic-A' })

// 可以多次呼叫訂閱方法
await consumer.subscribe({ topic: 'topic-B' })
await consumer.subscribe({ topic: 'topic-C' })

// 可以設定從第一則訊息開始讀取
await consumer.subscribe({ topic: 'topic-D', fromBeginning: true })
當然,你也可以利用 RegExp 去訂閱多個同系列的主題

Alternatively, you can subscribe to multiple topics at once using a RegExp:

await consumer.connect()
await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })

值得注意的是,消費者不會追蹤訂閱主題是否建立,所以如果今天有一個代理擁有主題 topic-1topic-2,但是你利用 RegExp 下了條件 /topic-.*/ 去訂閱主題後,主題 topic-3 在那之後創建出來,消費者並不會自動去新增訂閱 topic-3


KafkaJS 提供了兩種方式去處理消費的訊息

eachMessage

首先,eachmessage 提供一個最方便和簡單的方式去使用消費者 API,它每次只讀取一個訊息,他在實作在 eachBatch 之上,並且會依據你的設定自動的去提交 ( commmit )你的偏移量 ( offsets )、心跳監測 ( heartbeat),如果你的需求是單純的,那使用 eachMessage 是個好選擇

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
        })
    },
})

eachBatch

有一些使用上的需求需要直接以批次去處理訊息,eachBatch 提供了一些功能讓你可以更方的的去實作,像是
resolveOffsetheartbeatcommitOffsetsIfNecessaryuncommittedOffsetsisRunningisStale,基本上所有處理完成的訊息,eachBatch 會自動去提供偏移量

註記:eachBatch 是更進階的使用方式,因此你必須了解 session timeouts 和 heartbeats 是怎麼設定的

await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async ({
        batch,
        resolveOffset,
        heartbeat,
        commitOffsetsIfNecessary,
        uncommittedOffsets,
        isRunning,
        isStale,
    }) => {
        for (let message of batch.messages) {
            console.log({
                topic: batch.topic,
                partition: batch.partition,
                highWatermark: batch.highWatermark,
                message: {
                    offset: message.offset,
                    key: message.key.toString(),
                    value: message.value.toString(),
                    headers: message.headers,
                }
            })

            resolveOffset(message.offset)
            await heartbeat()
        }
    },
})
  • eachBatchAutoResolve
    • 在每個批次完成後自動提供偏移量,如果設定為開啟,KafkaJS會自動提教批次處理的最後一個偏移量
    • 預設值: true
  • batch.highWatermark
    • 該主題分區最一個被提交訊息的偏移量,主要是方便計算延遲
  • resolveOffset()
    • 用來標記批次中的訊息是否已經處理,如果出現錯誤,消費者會自動提交這些已處理訊息的偏移量
  • heartbeat(): Promise
    • 根據 heartbeatInterval 的設定值去跟代理傳送心跳
  • commitOffsetsIfNecessary(offsets?): Promise
    • 根據 autoCommitIntervalautoCommitThreshold 的設定值,去提交偏移量,要注意的是,如果這個參數未被使用,那eachBatch 就不會自動提交偏移量
  • uncommittedOffsets()
    • 回傳所有尚未提交的主題分區的偏移量
  • isRunning()
    • 回傳消費者的運行狀態,運行中回傳 true,反之回傳 false
  • isStale()
    • 回傳批次處理的訊息是否已經過時、應該要捨棄掉

範例:

consumer.run({
    eachBatchAutoResolve: false,
    eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
        for (let message of batch.messages) {
            if (!isRunning() || isStale()) break
            await processMessage(message)
            resolveOffset(message.offset)
            await heartbeat()
        }
    }
})

上面這則例子,如果消費者在中途需要關閉,該批次剩餘的訊息不會被消費、也不會提交偏移量,因此,你可以安心的關掉消費者,而不用擔心會遺失訊息,當然如果消費者批次處理訊息時,遇到其他原因而中斷,也不會去處理剩餘的訊息。


上一篇
卡夫卡的藏書閣【Book18】- Kafka - KafkaJS 生產者 - 6
下一篇
卡夫卡的藏書閣【Book20】- Kafka - KafkaJS 消費者 2
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言