“Love is, that you are the knife which I plunge into myself.”
― Kafka, Franzv
消費者可以重新設定對於某個主題/分區的偏移量,只要使用方法 seek
,這個方法可以在消費者被初始化且開始運作後被呼叫
await consumer.connect()
await consumer.subscribe({ topic: 'example' })
consumer.run({ eachMessage: async ({ topic, message }) => true })
consumer.seek({ topic: 'example', partition: 0, offset: 12384 })
如果在生效的批次中重新設定偏移量,會將該批次的訊息都標註為陳舊訊息並且棄用,這是為了去確保下一個消費的訊息是從重新設定的偏移量開始消費,所以在使用 eachBatch 方法時,請記得要用 isStale() 檢查訊息是否是陳舊訊息
預設的情況下生產者會自動提交重新設定的偏移量,將生產者的自動提交(autoCommit)參數關閉可以避免這個行為
consumer.run({
autoCommit: false,
eachMessage: async ({ topic, message }) => true
})
consumer.seek({ topic: 'example', partition: 0, offset: "12384" })
KafkaJS 預設是按照順序( round robin )去分配分區的,但其實可以自己客製化分區的邏輯給消費者群組使用
一個分區器是一個會回傳帶有介面的物件的方法,以下為範例
const MyPartitionAssigner = ({ cluster }) => ({
name: 'MyPartitionAssigner',
version: 1,
async assign({ members, topics }) {},
protocol({ topics }) {}
})
這方法必須針對每個主題的分區去回傳分派計畫,分派計畫是一個由 memberId
列表和 memberAssignment
組成的,memberAssignment
必須用 MemberAssignment
加密,範例如下
const { AssignerProtocol: { MemberAssignment } } = require('kafkajs')
const MyPartitionAssigner = ({ cluster }) => ({
version: 1,
async assign({ members, topics }) {
// perform assignment
return myCustomAssignmentArray.map(memberId => ({
memberId,
memberAssignment: MemberAssignment.encode({
version: this.version,
assignment: assignment[memberId],
})
}))
}
})
方法 protocol
要回傳 name
和 metadata
,metadata
必須要用 MemberMetadata
去加密,以下為範例
const { AssignerProtocol: { MemberMetadata } } = require('kafkajs')
const MyPartitionAssigner = ({ cluster }) => ({
name: 'MyPartitionAssigner',
version: 1,
protocol({ topics }) {
return {
name: this.name,
metadata: MemberMetadata.encode({
version: this.version,
topics,
}),
}
}
})
當 assigner
完成後將它加到分配器的清單中,這邊的重點是要記得要設定預設的分配器去讓舊的消費者可以使用
const { PartitionAssigners: { roundRobin } } = require('kafkajs')
kafka.consumer({
groupId: 'my-group',
partitionAssigners: [
MyPartitionAssigner,
roundRobin
]
})
這個參數是新加入、實驗階段的參數,未來可能會被移除或是修改掉,這個參數會回傳消費者群組的元資料
const data = await consumer.describeGroup()
// {
// errorCode: 0,
// groupId: 'consumer-group-id-f104efb0e1044702e5f6',
// members: [
// {
// clientHost: '/172.19.0.1',
// clientId: 'test-3e93246fe1f4efa7380a',
// memberAssignment: Buffer,
// memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb',
// memberMetadata: Buffer,
// },
// ],
// protocol: 'RoundRobinAssigner',
// protocolType: 'consumer',
// state: 'Stable',
// },