iT邦幫忙

2021 iThome 鐵人賽

DAY 13
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 13

卡夫卡的藏書閣【Book13】- KafkaJS 生產者 1

“Slept, awoke, slept, awoke, miserable life.”
― franz kafka
很多人40歲就死了,80歲才入葬,供勉之


今天要創建一個生產者推送訊息給 Kafka,很單純的只要呼叫 KafkaJS 客戶端的 producer function 即可

const producer = kafka.producer()

可選選項們

可選選項 說明 預設數值
createPartitioner 創建一個分區器,之後會再詳述 null
retry 設定重試機制,之後會再詳述 null
metadataMaxAge 在 partition leader 沒有改變(沒有新增代理或是分區)的情況,可以設定間隔多長的區間(毫秒)去強制更新一次元資料,預設是五分鐘 300000毫秒(5分鐘)
allowAutoTopicCreation 當消費者指定一個尚不存在的主題進行消費時,是否要自動創建該主題 true
transactionTimeout 事務狀態會等待多久才去更新的時間區間(毫秒),如果這個數值大於代理 (broker) 中的 transaction.max.timeout.ms 的設定數值,這個事務請求會噴錯、並且顯示InvalidTransactionTimeout error 60000
idempotent 這個參數目前還在實驗階段,如果開啟這個參數生產者將會確保每一條訊息只會寫入一次,必須將 Acks 設定為 -1、重試次數會設定為 MAX_SAFE_INTEGER false
maxInFlightRequests 是否設定同時可以行進的 request 上限數量,如果沒有設定就是無上限 null

這邊很簡單的傳入索引鍵 (key) 和訊息 (value),分區將會以預設的分區器,去決定該消息要傳遞的分區

const producer = kafka.producer()

await producer.connect()
await producer.send({
    topic: 'first-topic',
    messages: [
        { key: 'key1', value: 'hello kafka' },
        { key: 'key2', value: 'I am message!' },
        { key: 'key3', value: 'No metter what' }
    ],
})

當然也可以自訂要傳送的分區,直接輸入該分區的編號即可,像是 partition: 0

const producer = kafka.producer()

await producer.connect()
await producer.send({
    topic: 'first-topic',
    messages: [
        { key: 'key1', value: 'hello kafka', partition: 0 },
        { key: 'key2', value: 'I am message!', partition: 1 },
        { key: 'key3', value: 'No metter what', partition: 2 }
    ],
})

明天會再詳細敘述各個設定參數的詳細內容


上一篇
卡夫卡的藏書閣【Book12】- KafkaJS 安裝
下一篇
卡夫卡的藏書閣【Book14】- KafkaJS 生產者 - 壓縮 2
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言