“They say ignorance is bliss.... they're wrong ”
― Franz Kafka
刪除指定主題的訊息,會刪除到指定分區、指定偏移量之前的所有訊息,如果想要刪除該分區所有訊息偏移量要設定為 -1
需要注意的是,這邊是無法指定刪除開始的點,一率都是從頭開始刪除的
await admin.deleteTopicRecords({
topic: <String>,
partitions: <SeekEntry[]>,
})
範例
await admin.deleteTopicRecords({
topic: 'custom-topic',
partitions: [
{ partition: 0, offset: '30' }, // delete up to and including offset 29
{ partition: 3, offset: '-1' }, // delete all available records on this partition
]
})
const {
AclResourceTypes,
AclOperationTypes,
AclPermissionTypes,
ResourcePatternTypes,
} = require('kafkajs')
const acl = [
{
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-name',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:bob',
host: '*',
operation: AclOperationTypes.ALL,
permissionType: AclPermissionTypes.DENY,
},
{
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-name',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:alice',
host: '*',
operation: AclOperationTypes.ALL,
permissionType: AclPermissionTypes.ALLOW,
},
]
await admin.createAcls({ acl })
這邊需要注意安全機制可能在叢集那邊沒開啟,如果是這種情況會噴錯誤:KafkaJSProtocolError: Security features are disabled
const {
AclResourceTypes,
AclOperationTypes,
AclPermissionTypes,
ResourcePatternTypes,
} = require('kafkajs')
const acl = {
resourceName: 'topic-name,
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.LITERAL,
}
await admin.deleteAcls({ filters: [acl] })
// {
// filterResponses: [
// {
// errorCode: 0,
// errorMessage: null,
// matchingAcls: [
// {
// errorCode: 0,
// errorMessage: null,
// resourceType: AclResourceTypes.TOPIC,
// resourceName: 'topic-name',
// resourcePatternType: ResourcePatternTypes.LITERAL,
// principal: 'User:alice',
// host: '*',
// operation: AclOperationTypes.ALL,
// permissionType: AclPermissionTypes.ALLOW,
// },
// ],
// },
// ],
// }
這邊需要注意安全機制可能在叢集那邊沒開啟,如果是這種情況會噴錯誤:KafkaJSProtocolError: Security features are disabled
const {
AclResourceTypes,
AclOperationTypes,
AclPermissionTypes,
ResourcePatternTypes,
} = require('kafkajs')
await admin.describeAcls({
resourceName: 'topic-name,
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternTypeFilter: ResourcePatternTypes.LITERAL,
})
// {
// resources: [
// {
// resourceType: AclResourceTypes.TOPIC,
// resourceName: 'topic-name,
// resourcePatternType: ResourcePatternTypes.LITERAL,
// acls: [
// {
// principal: 'User:alice',
// host: '*',
// operation: AclOperationTypes.ALL,
// permissionType: AclPermissionTypes.ALLOW,
// },
// ],
// },
// ],
// }
這邊需要注意安全機制可能在叢集那邊沒開啟,如果是這種情況會噴錯誤:KafkaJSProtocolError: Security features are disabled
要開啟Kafka 代理上的 ACL 安全機制,要在設定檔 config/server-ssl.properties
加上以下設定
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
重新啟動 Kafka 代理後,去查看相關 log,你會看會噴錯 ClusterAuthorizationException
在每一行 log 的尾端
org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9093-127.0.0.1:62402-0, session=Session(User:CN=localhost,/127.0.0.1), listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=null) is not authorized.
這是因為 User:CN=localhost
沒有被授權去動作,預設上是沒有帳號可以進行任何動作的
未授權錯誤會記錄在 INFO level 的 log,位置預設在 logs/kafka-authorizer.log,你應該可以查看到 ClusterAuthorizationException
噴錯記錄下來的 log
Principal = User:CN=localhost is Denied Operation = ClusterAction from host = 127.0.0.1 on resource = Cluster:LITERAL:kafka-cluster for request = UpdateMetadata with resourceRefCount = 1
資料來源:
https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-demo-acl-authorization.html