“By believing passionately in something that still does not exist, we create it. The nonexistent is whatever we have not sufficiently desired.”
― Franz Kafka
吸引力法則 只要有心人人都可以是食神
Kafka 腳本的練習到這邊大概結束了,之後 Kafka mirror 的部分會放到後面再介紹
今天開始會用 KafkaJS 這個套件來練習 Kafka 的各種運作,基本的生產者和消費者會需要做哪些設定,相信有實際的範例會更清楚了解 Kafka 的機制。
這邊使用的是 KafkaJS 的 Pre-release versions
安裝的方式就是基本的 npm 和 yarn
npm install kafkajs
或是
yarn add kafkajs
一開始可以將 KafkaJS 的客戶端指定一個或是多個代理 ( broker ) 去實例化它
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'],
})
接著,可以利用 KafkaJS 客戶端來創建一個生產者 ( producer ),指定一個主題向它傳遞訊息,並在最後中斷連結
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
await producer.disconnect()
然後,可以創建一個消費者來消費剛剛產生的訊息,確定剛剛有產生訊息成功,一樣是指定訂閱主題 test-topic
,並且指定 fromBeginning
為啟動,從這個主題的第一筆訊息開始消費,如果設定關閉就是消化訂閱當下之後新產生的訊息
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
今天是先簡單的入門的範例,明天會開始介紹生產者的相關設定概念,像是傳遞的方式、分區器、順序性、新舊生產者API..等