iT邦幫忙

2021 iThome 鐵人賽

DAY 26
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 26

卡夫卡的藏書閣【Book26】- Kafka - KafkaJS Admin 3

“You are at once both the quiet and the confusion of my heart.”
― Franz Kafka


取得叢集資訊

可以取得叢集中代理的資訊,通常跟訊息、事件處理無關,這些資訊是用來監測跟做維護動作

await admin.describeCluster()
// {
//   brokers: [
//     { nodeId: 0, host: 'localhost', port: 9092 }
//   ],
//   controller: 0,
//   clusterId: 'f8QmWTB8SQSLE6C99G4qzA'
// }

取得設定檔

取得指定資源的設定資訊

await admin.describeConfigs({
  includeSynonyms: <boolean>,
  resources: <ResourceConfigQuery[]>
})

ResourceConfigQuery 的結構如下

{
    type: <ConfigResourceType>,
    name: <String>,
    configNames: <String[]>
}

回傳指定資源的所有設定檔

const { ConfigResourceTypes } = require('kafkajs')

await admin.describeConfigs({
  includeSynonyms: false,
  resources: [
    {
      type: ConfigResourceTypes.TOPIC,
      name: 'topic-name'
    }
  ]
})

回傳指定資源的指定設定檔內容

const { ConfigResourceTypes } = require('kafkajs')

await admin.describeConfigs({
  includeSynonyms: false,
  resources: [
    {
      type: ConfigResourceTypes.TOPIC,
      name: 'topic-name',
      configNames: ['cleanup.policy']
    }
  ]
})

資源類型對照表如下:

  UNKNOWN: 0,
  TOPIC: 2,
  BROKER: 4,
  BROKER_LOGGER: 8,

回傳範例

{
    resources: [
        {
            configEntries: [{
                configName: 'cleanup.policy',
                configValue: 'delete',
                isDefault: true,
                configSource: 5,
                isSensitive: false,
                readOnly: false
            }],
            errorCode: 0,
            errorMessage: null,
            resourceName: 'topic-name',
            resourceType: 2
        }
    ],
    throttleTime: 0
}

修改設定檔

alterConfigs 可以修改指定資源的相關設定

await admin.alterConfigs({
    validateOnly: false,
    resources: <ResourceConfig[]>
})

ResourceConfig 的結構如下

{
    type: <ConfigResourceType>,
    name: <String>,
    configEntries: <ResourceConfigEntry[]>
}

ResourceConfigEntry 的結構如下

{
    name: <String>,
    value: <String>
}

範例

const { ConfigResourceTypes } = require('kafkajs')

await admin.alterConfigs({
    resources: [{
        type: ConfigResourceTypes.TOPIC,
        name: 'topic-name',
        configEntries: [{ name: 'cleanup.policy', value: 'compact' }]
    }]
})

回傳範例

{
    resources: [{
        errorCode: 0,
        errorMessage: null,
        resourceName: 'topic-name',
        resourceType: 2,
    }],
    throttleTime: 0,
}

取得消費者群組的清單

取得代理中有效的消費者群組清單

await admin.listGroups()

回傳範例

{
    groups: [
        {groupId: 'testgroup', protocolType: 'consumer'}
    ]
}

取得消費者群組的詳細資料

用消費者群組ID去取得消費者群組的詳細資料,使用方式跟 consumer.describeGroup() (https://kafka.js.org/docs/next/consuming#describe-group) 類似,但是允許你取得多個消費者群組的資料

await admin.describeGroups([ 'testgroup' ])
// {
//   groups: [{
//     errorCode: 0,
//     groupId: 'testgroup',
//     members: [
//       {
//         clientHost: '/172.19.0.1',
//         clientId: 'test-3e93246fe1f4efa7380a',
//         memberAssignment: Buffer,
//         memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb',
//         memberMetadata: Buffer,
//       },
//     ],
//     protocol: 'RoundRobinAssigner',
//     protocolType: 'consumer',
//     state: 'Stable',
//   }]
// `

可以使用 AssignerProtocol 的解碼方法去拿到 memeberMetadata 和 memberAssignment 的資料
範例如下

const memberMetadata = AssignerProtocol.MemberMetadata.decode(memberMetadata)
const memberAssignment = AssignerProtocol.MemberAssignment.decode(memberAssignment)

刪除消費者群組

用消費者群組ID去刪除消費者群組
註記:只能夠刪除沒有跟任何消費者有連線的消費者群組

await admin.deleteGroups([groupId])

範例:

await admin.deleteGroups(['group-test'])

範例回傳

[
    {groupId: 'testgroup', errorCode: 'consumer'}
]

因為此方法可以一次刪除多個消費者群組,其中一個或多個群組可能會刪除失敗,這些資訊會在報錯訊息中

try {
    await admin.deleteGroups(['a', 'b', 'c'])
} catch (error) {
  // error.name 'KafkaJSDeleteGroupsError'
  // error.groups = [{
  //   groupId: a
  //   error: KafkaJSProtocolError
  // }]
}

上一篇
卡夫卡的藏書閣【Book25】- Kafka - KafkaJS Admin 2
下一篇
卡夫卡的藏書閣【Book27】- Kafka - KafkaJS Admin 4
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30

尚未有邦友留言

立即登入留言