iT邦幫忙

2023 iThome 鐵人賽

DAY 7
0

之前練習時寫的

const mqtt = require('mqtt');
const { Kafka } = require('kafkajs');

// MQTT連接
const mqttBrokerUrl = 'mqtt://localhost'; // Mosquitto Broker的URL
const mqttTopic = 'your/mqtt/topic'; // MQTT主题

// Kafka連接
const kafkaBrokers = ['localhost:9092']; // Kafka Broker的地址列表
const kafkaTopic = 'your-kafka-topic'; // 要发送到的Kafka主题

// 連到MQTT Broker
const mqttClient = mqtt.connect(mqttBrokerUrl);

// 連到到Kafka
const kafka = new Kafka({
  brokers: kafkaBrokers,
});

const producer = kafka.producer();

// 處理MQTT
mqttClient.on('connect', () => {
  console.log('Connected to MQTT broker');
  mqttClient.subscribe(mqttTopic);
});

mqttClient.on('message', (topic, message) => {
  console.log(`Received MQTT message: ${message.toString()}`);

  // 發送到Kafka
  (async () => {
    await producer.connect();
    await producer.send({
      topic: kafkaTopic,
      messages: [{ value: message.toString() }],
    });
    await producer.disconnect();
    console.log(`Sent message to Kafka: ${message.toString()}`);
  })();
});

// 關閉事件
process.on('SIGINT', async () => {
  await producer.disconnect();
  mqttClient.end();
  process.exit(0);
});


上一篇
<Day6> 元件 X kafka
下一篇
<Day8>
系列文
擱淺中掙扎29
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言