“It's only because of their stupidity that they're able to be so sure of themselves.”
― Franz Kafka, The Trial
認識自己,方能認識人生
生產者如何傳送訊息 ( message ) 到多個主題 ( topic )
生產者要同時傳送訊息到多個主題可以使用 sendBatch
,這個在做聚合資料時十分有用。
const topicMessages = [
{
topic: 'topic-a',
messages: [{ key: 'key', value: 'hello topic-a' }],
},
{
topic: 'topic-b',
messages: [{ key: 'key', value: 'hello topic-b' }],
},
{
topic: 'topic-c',
messages: [
{
key: 'key',
value: 'hello topic-c',
headers: {
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67',
},
}
],
}
]
await producer.sendBatch({ topicMessages })
sendBatch 使用方式跟 send 一樣,只是將 topic 和 message 合併成 topicMessages
await producer.sendBatch({
topicMessages: <TopicMessages[]>,
acks: <Number>,
timeout: <Number>,
compression: <CompressionTypes>,
})
topicMessages 是一個陣列,包含一個或多個物件,物件內容是 topic 和 messages,messages 的格式跟 send 使用的 messages 是一樣的。
客製化屬於自己的分區器
生產者是可以接受客製化的分區器,一個分區器是一個函式,它會回傳另外一個實際用來寫分區邏輯的函式
const MyPartitioner = () => {
return ({ topic, partitionMetadata, message }) => {
// select a partition based on some logic
// return the partition number
return 0
}
}
partitionMetadata 是一個陣列裡面包含分區和領袖的資訊,它的格式如下:
{ partitionId: , leader: }`
Example:
[
{ partitionId: 1, leader: 1 },
{ partitionId: 2, leader: 2 },
{ partitionId: 0, leader: 0 }
]
使用客製化分區器的方式很簡單,只要在新增生產者時,使用可選參數 createPartitioner 即可,範例如下:kafka.producer({ createPartitioner: MyPartitioner })
預設的分區器
KafkaJS隨附有兩個預設的分區器:DefaultPartitioner 和 JavaCompatiblePartitioner
JavaCompatiblePartitioner 理論上是可以跟 Java Kafka client 隨附的預設分區器相容的,是否相容這一點很重要,會影響到多個主題聚合的時候(Co-partitioning,當 ksqlDB
在 join 串流資料時,必須要能夠確認資料串流和表格是分區器是相同邏輯下去分配的)。
使用分區器 JavaCompatiblePartitioner 的方式一樣是將它指派給生產者
const { Partitioners } = require('kafkajs')
kafka.producer({ createPartitioner: Partitioners.JavaCompatiblePartitioner })
明天會是生產者的最後一篇,接下來會進入消費者的篇章