“Slept, awoke, slept, awoke, miserable life.”
― franz kafka
很多人40歲就死了,80歲才入葬,供勉之
今天要創建一個生產者推送訊息給 Kafka,很單純的只要呼叫 KafkaJS 客戶端的 producer function 即可
const producer = kafka.producer()
可選選項 | 說明 | 預設數值 |
---|---|---|
createPartitioner | 創建一個分區器,之後會再詳述 | null |
retry | 設定重試機制,之後會再詳述 | null |
metadataMaxAge | 在 partition leader 沒有改變(沒有新增代理或是分區)的情況,可以設定間隔多長的區間(毫秒)去強制更新一次元資料,預設是五分鐘 | 300000毫秒(5分鐘) |
allowAutoTopicCreation | 當消費者指定一個尚不存在的主題進行消費時,是否要自動創建該主題 | true |
transactionTimeout | 事務狀態會等待多久才去更新的時間區間(毫秒),如果這個數值大於代理 (broker) 中的 transaction.max.timeout.ms 的設定數值,這個事務請求會噴錯、並且顯示InvalidTransactionTimeout error |
60000 |
idempotent | 這個參數目前還在實驗階段,如果開啟這個參數生產者將會確保每一條訊息只會寫入一次,必須將 Acks 設定為 -1、重試次數會設定為 MAX_SAFE_INTEGER | false |
maxInFlightRequests | 是否設定同時可以行進的 request 上限數量,如果沒有設定就是無上限 | null |
這邊很簡單的傳入索引鍵 (key) 和訊息 (value),分區將會以預設的分區器,去決定該消息要傳遞的分區
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'first-topic',
messages: [
{ key: 'key1', value: 'hello kafka' },
{ key: 'key2', value: 'I am message!' },
{ key: 'key3', value: 'No metter what' }
],
})
當然也可以自訂要傳送的分區,直接輸入該分區的編號即可,像是 partition: 0
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'first-topic',
messages: [
{ key: 'key1', value: 'hello kafka', partition: 0 },
{ key: 'key2', value: 'I am message!', partition: 1 },
{ key: 'key3', value: 'No metter what', partition: 2 }
],
})
明天會再詳細敘述各個設定參數的詳細內容