iT邦幫忙

2021 iThome 鐵人賽

DAY 27
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 27

卡夫卡的藏書閣【Book27】- Kafka - KafkaJS Admin 4

“They say ignorance is bliss.... they're wrong ”
― Franz Kafka


刪除主題的訊息

刪除指定主題的訊息,會刪除到指定分區、指定偏移量之前的所有訊息,如果想要刪除該分區所有訊息偏移量要設定為 -1
需要注意的是,這邊是無法指定刪除開始的點,一率都是從頭開始刪除的

await admin.deleteTopicRecords({
    topic: <String>,
    partitions: <SeekEntry[]>,
})

範例

await admin.deleteTopicRecords({
    topic: 'custom-topic',
    partitions: [
        { partition: 0, offset: '30' }, // delete up to and including offset 29
        { partition: 3, offset: '-1' }, // delete all available records on this partition
    ]
})

新增 存取控制安全機制 ( create ACL )

const {
  AclResourceTypes,
  AclOperationTypes,
  AclPermissionTypes,
  ResourcePatternTypes,
} = require('kafkajs')

const acl = [
  {
    resourceType: AclResourceTypes.TOPIC,
    resourceName: 'topic-name',
    resourcePatternType: ResourcePatternTypes.LITERAL,
    principal: 'User:bob',
    host: '*',
    operation: AclOperationTypes.ALL,
    permissionType: AclPermissionTypes.DENY,
  },
  {
    resourceType: AclResourceTypes.TOPIC,
    resourceName: 'topic-name',
    resourcePatternType: ResourcePatternTypes.LITERAL,
    principal: 'User:alice',
    host: '*',
    operation: AclOperationTypes.ALL,
    permissionType: AclPermissionTypes.ALLOW,
  },
]

await admin.createAcls({ acl })

這邊需要注意安全機制可能在叢集那邊沒開啟,如果是這種情況會噴錯誤:KafkaJSProtocolError: Security features are disabled

刪除 存取控制安全機制 ( DELETE ACL )

const {
  AclResourceTypes,
  AclOperationTypes,
  AclPermissionTypes,
  ResourcePatternTypes,
} = require('kafkajs')

const acl = {
  resourceName: 'topic-name,
  resourceType: AclResourceTypes.TOPIC,
  host: '*',
  permissionType: AclPermissionTypes.ALLOW,
  operation: AclOperationTypes.ANY,
  resourcePatternType: ResourcePatternTypes.LITERAL,
}

await admin.deleteAcls({ filters: [acl] })
// {
//   filterResponses: [
//     {
//     errorCode: 0,
//     errorMessage: null,
//     matchingAcls: [
//         {
//         errorCode: 0,
//         errorMessage: null,
//         resourceType: AclResourceTypes.TOPIC,
//         resourceName: 'topic-name',
//         resourcePatternType: ResourcePatternTypes.LITERAL,
//         principal: 'User:alice',
//         host: '*',
//         operation: AclOperationTypes.ALL,
//         permissionType: AclPermissionTypes.ALLOW,
//         },
//     ],
//     },
//   ],
// }

這邊需要注意安全機制可能在叢集那邊沒開啟,如果是這種情況會噴錯誤:KafkaJSProtocolError: Security features are disabled

取得存取控制安全機制的相關資訊 ( Describe ACL )

const {
  AclResourceTypes,
  AclOperationTypes,
  AclPermissionTypes,
  ResourcePatternTypes,
} = require('kafkajs')

await admin.describeAcls({
  resourceName: 'topic-name,
  resourceType: AclResourceTypes.TOPIC,
  host: '*',
  permissionType: AclPermissionTypes.ALLOW,
  operation: AclOperationTypes.ANY,
  resourcePatternTypeFilter: ResourcePatternTypes.LITERAL,
})
// {
//   resources: [
//     {
//       resourceType: AclResourceTypes.TOPIC,
//       resourceName: 'topic-name,
//       resourcePatternType: ResourcePatternTypes.LITERAL,
//       acls: [
//         {
//           principal: 'User:alice',
//           host: '*',
//           operation: AclOperationTypes.ALL,
//           permissionType: AclPermissionTypes.ALLOW,
//         },
//       ],
//     },
//   ],
// }

這邊需要注意安全機制可能在叢集那邊沒開啟,如果是這種情況會噴錯誤:KafkaJSProtocolError: Security features are disabled

開啟 ACL 安全機制

要開啟Kafka 代理上的 ACL 安全機制,要在設定檔 config/server-ssl.properties 加上以下設定

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

重新啟動 Kafka 代理後,去查看相關 log,你會看會噴錯 ClusterAuthorizationException 在每一行 log 的尾端

org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9093-127.0.0.1:62402-0, session=Session(User:CN=localhost,/127.0.0.1), listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=null) is not authorized.

這是因為 User:CN=localhost 沒有被授權去動作,預設上是沒有帳號可以進行任何動作的

查看安全機制相關 log

未授權錯誤會記錄在 INFO level 的 log,位置預設在 logs/kafka-authorizer.log,你應該可以查看到 ClusterAuthorizationException 噴錯記錄下來的 log

Principal = User:CN=localhost is Denied Operation = ClusterAction from host = 127.0.0.1 on resource = Cluster:LITERAL:kafka-cluster for request = UpdateMetadata with resourceRefCount = 1

資料來源:
https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-demo-acl-authorization.html


上一篇
卡夫卡的藏書閣【Book26】- Kafka - KafkaJS Admin 3
下一篇
卡夫卡的藏書閣【Book28】- Kafka - MirroMaker
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30

尚未有邦友留言

立即登入留言