iT邦幫忙

2021 iThome 鐵人賽

DAY 21
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 21

卡夫卡的藏書閣【Book21】- Kafka - KafkaJS 消費者 3

  • 分享至 

  • xImage
  •  

“Now I can look at you in peace; I don't eat you any more.”
― Franz Kafka
腸胃支配者


參數 fromBeginning


消費者群組會依據最後提交的偏移量,從那個偏移量開始讀取訊息,如果偏移量是無效 ( invalid ) 或是未定義 ( not defined ),那就會看參數 fromBeginning 的設定值來決定消費者群組怎麼動作

await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.subscribe({ topic: 'other-topic', fromBeginning: false })

如果設定為 true 消費者會從最早的偏移量開始讀取訊息,設定為 false 消費者會從最新的偏移量開始讀取訊息

預設值是 false

參數選項:


kafka.consumer({
  groupId: <String>,
  partitionAssigners: <Array>,
  sessionTimeout: <Number>,
  rebalanceTimeout: <Number>,
  heartbeatInterval: <Number>,
  metadataMaxAge: <Number>,
  allowAutoTopicCreation: <Boolean>,
  maxBytesPerPartition: <Number>,
  minBytes: <Number>,
  maxBytes: <Number>,
  maxWaitTimeInMs: <Number>,
  retry: <Object>,
  maxInFlightRequests: <Number>,
  rackId: <String>
})
  • partitionAssigners
    • 分區器的清單
    • 預設值:[PartitionAssigners.roundRobin]
  • sessionTimeout
    • session 的 timeout 時間(毫秒),消費者會以固定週期傳送心跳給代理 ( broker ),如果在 session timeout 時間超過前,代理都沒有收到心跳,那代理會將這個消費者從消費者群組中移除,並且進行重新分配 ( rebalance )
    • 預設值:30000
  • rebalanceTimeout
    • 消費者協調器等待消費者重新加入消費者群組的最大等待時間(毫秒)
    • 預設值:60000
  • heartbeatInterval
    • 傳送心跳給消費者協調氣的時間間隔,心跳是用來確保消費者的session還活著,這個數值必須設定的比 session timeout 小
    • 預設值:3000
  • metadataMaxAge
    • 多久強迫更新元資料(metadata)一次,即使沒有發生 partition leadership 的改變
    • 預設值:300000 (5 minutes)
  • allowAutoTopicCreation
    • 是否允許自動建立主題 ( topic ),如果消費者消費的主題尚未存在的話
    • 預設值:true
  • maxBytesPerPartition
    • 每個分區最大可以回傳的訊息數量,這個大小至少必須跟單一個訊息的大小一樣大,不然可能會發生生產者傳送了大於消費者可以消費大小的訊息造成卡住
    • 預設值:1048576 (1MB)
  • minBytes
    • 每個消費要求最小的訊息回傳量,如果沒有達到這個最小訊息回傳量,會等待 maxWaitTimeInMs 的設定時間去累積訊息量
    • 預設值:1
  • maxBytes
    • Kafka版本大於 0.10.1.0 支援這個參數,每次消費需求最大的訊息回傳量
    • 預設值:10485760 (10MB)
  • maxWaitTimeInMs
    • 在資料數量滿足 miniBytes 的最大等待資料搜尋的時間
    • 預設值:5000
  • retry
    • 重試機制,在生產者的章節有介紹過
    • 預設值:{ retries: 5 }
  • readUncommitted
    • 設定消費者的隔離程度,預設值設定為 false 消費者不會回傳未提交偏移量的交易訊息
    • 預設值:false
  • maxInFlightRequests
    • 同時最多可以有幾個消費請求,設定為 false 表示沒有上限
    • 預設值:null (no limit)
  • rackId
    • 開啟 rack 的 id
    • 預設值:null

暫停消費和重新開始消費 ( Pause & Resume )


消費者提供了暫停(pause)跟恢復(resume)兩個方法,讓消費者可以在消費一個或多個主題途中暫停或恢復消費,也有提供方法已暫停(paused)去取得目前暫停消費的主題清單
需要注意的是暫停消費一個主題,會是在下一個週期才生效,消費者還是可以讀取當前這個批次的訊息

暫停一個消費者沒有訂閱的主題是不會動作的,恢復消費一個沒有被暫停的主題也是不會動作的
另外,當消費者未啟動時去暫停或是恢復消費都會噴錯

範例,情境是消費者使用的外部依賴系統超過負載,我們可以暫停消費主題,等待一段時間後重新恢復消費主題

await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })

await consumer.run({ eachMessage: async ({ topic, message }) => {
    try {
        await sendToDependency(message)
    } catch (e) {
        if (e instanceof TooManyRequestsError) {
            consumer.pause([{ topic }])
            setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
        }

        throw e
    }
}})

這部分是可以做到更細微地控制的,可以暫停主題中的特定分區,而非只能暫停整個主題

舉例來說,這樣可以避免中斷整個消費,只因為其中一個分區過於緩慢

consumer.run({
    partitionsConsumedConcurrently: 3, // Default: 1
    eachMessage: async ({ topic, partition, message }) => {
      // This will be called up to 3 times concurrently
        try {
            await sendToDependency(message)
        } catch (e) {
            if (e instanceof TooManyRequestsError) {
                consumer.pause([{ topic, partitions: [partition] }])
                // Other partitions will keep fetching and processing, until if / when
                // they also get throttled
                setTimeout(() => {
                    consumer.resume([{ topic, partitions: [partition] }])
                    // Other partitions that are paused will continue to be paused
                }, e.retryAfter * 1000)
            }

            throw e
        }
    },
})

可以呼叫 paused 方法去取得暫停的主題清單

const pausedTopicPartitions = consumer.paused()

for (const topicPartitions of pausedTopicPartitions) {
  const { topic, partitions } = topicPartitions
  console.log({ topic, partitions })
}

上一篇
卡夫卡的藏書閣【Book20】- Kafka - KafkaJS 消費者 2
下一篇
卡夫卡的藏書閣【Book22】- Kafka - KafkaJS 消費者 4
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言