iT邦幫忙

2021 iThome 鐵人賽

DAY 25
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 25

卡夫卡的藏書閣【Book25】- Kafka - KafkaJS Admin 2

  • 分享至 

  • xImage
  •  

“As Gregor Samsa awoke one morning from uneasy dreams he found himself transformed in > his bed into a gigantic insect.”
― Franz Kafka, The Metamorphosis


取得主題的元資料

await admin.fetchTopicMetadata({ topics: <Array<String>> })

TopicsMetadata 的結構

{
    topics: <Array<TopicMetadata>>,
}

TopicMetadata 的結構

{
    topic: <String>,
    partitions: <Array<PartitionMetadata>> // 預設值為 1
}

PartitionMetadata 的結構

{
    partitionErrorCode: <Number>, // 預設值為 0
    partitionId: <Number>,
    leader: <Number>,
    replicas: <Array<Number>>,
    isr: <Array<Number>>,
}

如果主題清單中的任一個主題不存在的話 admin 會噴錯,如果你忽略不帶參數 topics 會預設讀取所有的主題

await admin.fetchTopicMetadata()

取得主題的偏移量資訊

admin 的方法 fetchTopicOffsets 回傳最新一筆訊息的偏移量、高水位和低水位

await admin.fetchTopicOffsets(topic)
// [
//   { partition: 0, offset: '31004', high: '31004', low: '421' },
//   { partition: 1, offset: '54312', high: '54312', low: '3102' },
//   { partition: 2, offset: '32103', high: '32103', low: '518' },
//   { partition: 3, offset: '28', high: '28', low: '0' },
// ]

依據時間來取得偏移量

指定一個時間點,取得那個時間點最早一筆訊息的偏移量

await admin.fetchTopicOffsetsByTimestamp(topic, timestamp)
// [
//   { partition: 0, offset: '3244' },
//   { partition: 1, offset: '3113' },
// ]

取得消費者群組的偏移量

fetchOffsets 會回傳給定主題清單中的消費者群組的偏移量

await admin.fetchOffsets({ groupId, topics: ['topic1', 'topic2'] })
// [
//   {
//     topic: 'topic1',
//     partitions: [
//       { partition: 0, offset: '31004' },
//       { partition: 1, offset: '54312' },
//       { partition: 2, offset: '32103' },
//       { partition: 3, offset: '28' },
//     ],
//   },
//   {
//     topic: 'topic2',
//     partitions: [
//       { partition: 0, offset: '1234' },
//       { partition: 1, offset: '4567' },
//     ],
//   },
// ]

如果你想取得消費者群組所有有提交偏移量的主題,可以直接忽略不帶參數 topics
使用可選參數 resolveOffsets 可以在不啟動消費者的情況下取得該分區主題的偏移量,通常在呼叫方法 resetOffets (https://kafka.js.org/docs/next/admin#a-name-reset-offsets-a-reset-consumer-group-offsets) 之後使用

await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
// [
//   { partition: 0, offset: '-1' },
//   { partition: 1, offset: '-1' },
//   { partition: 2, offset: '-1' },
//   { partition: 3, offset: '-1' },
// ]

await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: true })
// [
//   { partition: 0, offset: '31004' },
//   { partition: 1, offset: '54312' },
//   { partition: 2, offset: '32103' },
//   { partition: 3, offset: '28' },
// ]

重置消費者群組的偏移量

將消費者群組的偏移量重置為最早的偏移量或是最近一筆的偏移量 ( 預設值為最近一筆的偏移量 ),重置時必須確保消費者群組沒有在運作中,否則此動作無法進行

await admin.resetOffsets({ groupId, topic }) // latest by default
// await admin.resetOffsets({ groupId, topic, earliest: true })

設定消費者群組的偏移量

admin 的方法 setOffsets 可以讓設定任意數值的偏移量給消費者

await admin.setOffsets({
    groupId: <String>,
    topic: <String>,
    partitions: <SeekEntry[]>,
})

SeekEntry 的結構

{
    partition: <Number>,
    offset: <String>,
}

範例

await admin.setOffsets({
    groupId: 'my-consumer-group',
    topic: 'custom-topic',
    partitions: [
        { partition: 0, offset: '35' },
        { partition: 3, offset: '19' },
    ]
})

依照時間點去重置消費者群組的偏移量

合併使用方法 fetchTopicOffsetsByTimestamp 和 setOffsets 可以取得那個時間點最早一筆訊息的偏移量並設定消費者偏移量,重置時必須確保消費者群組沒有在運作中,否則此動作無法進行

await admin.setOffsets({ groupId, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) })

上一篇
卡夫卡的藏書閣【Book24】- Kafka - KafkaJS Admin 1
下一篇
卡夫卡的藏書閣【Book26】- Kafka - KafkaJS Admin 3
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言