iT邦幫忙

2021 iThome 鐵人賽

DAY 12
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 12

卡夫卡的藏書閣【Book12】- KafkaJS 安裝

  • 分享至 

  • xImage
  •  

“By believing passionately in something that still does not exist, we create it. The nonexistent is whatever we have not sufficiently desired.”
― Franz Kafka
吸引力法則 只要有心人人都可以是食神


Kafka 腳本的練習到這邊大概結束了,之後 Kafka mirror 的部分會放到後面再介紹

今天開始會用 KafkaJS 這個套件來練習 Kafka 的各種運作,基本的生產者和消費者會需要做哪些設定,相信有實際的範例會更清楚了解 Kafka 的機制。

這邊使用的是 KafkaJS 的 Pre-release versions

  • 原始碼的連結:https://github.com/tulios/kafkajs
  • 文件的連結:https://kafka.js.org/docs/next/getting-started

安裝 KafkaJS

安裝的方式就是基本的 npm 和 yarn

npm install kafkajs

或是

yarn add kafkajs

基本用法

一開始可以將 KafkaJS 的客戶端指定一個或是多個代理 ( broker ) 去實例化它

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'],
})

接著,可以利用 KafkaJS 客戶端來創建一個生產者 ( producer ),指定一個主題向它傳遞訊息,並在最後中斷連結

const producer = kafka.producer()

await producer.connect()
await producer.send({
  topic: 'test-topic',
  messages: [
    { value: 'Hello KafkaJS user!' },
  ],
})

await producer.disconnect()

然後,可以創建一個消費者來消費剛剛產生的訊息,確定剛剛有產生訊息成功,一樣是指定訂閱主題 test-topic,並且指定 fromBeginning 為啟動,從這個主題的第一筆訊息開始消費,如果設定關閉就是消化訂閱當下之後新產生的訊息

const consumer = kafka.consumer({ groupId: 'test-group' })

await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      value: message.value.toString(),
    })
  },
})

今天是先簡單的入門的範例,明天會開始介紹生產者的相關設定概念,像是傳遞的方式、分區器、順序性、新舊生產者API..等


上一篇
卡夫卡的藏書閣【Book11】- Kafka Connect 2
下一篇
卡夫卡的藏書閣【Book13】- KafkaJS 生產者 1
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言