之前練習時寫的
const mqtt = require('mqtt');
const { Kafka } = require('kafkajs');
// MQTT連接
const mqttBrokerUrl = 'mqtt://localhost'; // Mosquitto Broker的URL
const mqttTopic = 'your/mqtt/topic'; // MQTT主题
// Kafka連接
const kafkaBrokers = ['localhost:9092']; // Kafka Broker的地址列表
const kafkaTopic = 'your-kafka-topic'; // 要发送到的Kafka主题
// 連到MQTT Broker
const mqttClient = mqtt.connect(mqttBrokerUrl);
// 連到到Kafka
const kafka = new Kafka({
brokers: kafkaBrokers,
});
const producer = kafka.producer();
// 處理MQTT
mqttClient.on('connect', () => {
console.log('Connected to MQTT broker');
mqttClient.subscribe(mqttTopic);
});
mqttClient.on('message', (topic, message) => {
console.log(`Received MQTT message: ${message.toString()}`);
// 發送到Kafka
(async () => {
await producer.connect();
await producer.send({
topic: kafkaTopic,
messages: [{ value: message.toString() }],
});
await producer.disconnect();
console.log(`Sent message to Kafka: ${message.toString()}`);
})();
});
// 關閉事件
process.on('SIGINT', async () => {
await producer.disconnect();
mqttClient.end();
process.exit(0);
});