iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 10
1
IoT

簡簡單單的MQTT入門系列 第 10

保留訊息

  • 分享至 

  • xImage
  •  

在發佈訊息時,如果有將Retain設定為true,則MQTT Broker必須保留該訊息,並且替換此主題的任何已存在的訊息,也就是會保留『最新的一筆』。未來有訂閱該主題的訂閱者將會取得該筆訊息。
如果發佈訊息的Payload為空且Retain設定為true,則Broker會丟棄該主題的保留訊息,之後有訂閱該主題的訂閱者將不會收到保留訊息。
如果發佈訊息時將Retain設定為false,MQTT Broker不會將此訊息儲存成保留訊息,也不會丟棄、替換已存在的保留訊息。

以下程式可以用來測試

import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;

public class Collector {

	private static String[] temperatureTypes = new String[] {"Celsius", "Fahrenheit"};
	final static String PUB_TOPIC_NAME = "台灣/太魯閣國家公園/武嶺/設定";
	final static String IP = "127.0.0.1";
	final static int PORT = 1883;

	public static void main(String[] args) throws Exception {
		MQTT mqtt = new MQTT();
		try {
			mqtt.setHost(IP, PORT); // 設定ip和port
			BlockingConnection connection = mqtt.blockingConnection();
			connection.connect(); // 連接Broker
			System.out.println("Connected to Broker!");
			
			//定時發佈訊息
//			while (true) {
//				String value = temperatureTypes[(int) (new Date().getTime() % 2)];
//				//發佈訊息,設置傳送品質為AT_LEAST_ONCE,保留訊息
//				connection.publish(PUB_TOPIC_NAME, temperatureTypes[1].getBytes(), QoS.AT_LEAST_ONCE, true);
//              Thread.sleep(10000);
//			}
			

			//發佈訊息,設置傳送品質為AT_LEAST_ONCE,保留訊息
			connection.publish(PUB_TOPIC_NAME, temperatureTypes[1].getBytes(), QoS.AT_LEAST_ONCE, true);
			Thread.sleep(1000);
			connection.publish(PUB_TOPIC_NAME, temperatureTypes[0].getBytes(), QoS.AT_LEAST_ONCE, false);
			Thread.sleep(1000);
			connection.publish(PUB_TOPIC_NAME, temperatureTypes[0].getBytes(), QoS.AT_LEAST_ONCE, false);
//			Thread.sleep(1000);
//			connection.publish(PUB_TOPIC_NAME, "".getBytes(), QoS.AT_LEAST_ONCE, true);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.TimeUnit;

import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

public class TemperatureSensor {

	private static String temperatureType = "Celsius";
	final static String PUB_TOPIC_NAME = "台灣/太魯閣國家公園/武嶺/溫度";
	final static String SUB_TOPIC_NAME = "台灣/太魯閣國家公園/武嶺/設定";
	final static String IP = "127.0.0.1";
	final static int PORT = 1883;

	public static void main(String[] args) throws Exception {
		MQTT mqtt = new MQTT();
		try {
			mqtt.setHost(IP, PORT); // 設定ip和port
			BlockingConnection connection = mqtt.blockingConnection();
			connection.connect(); // 連接Broker
			System.out.println("Connected to Broker!");
			
			while (true) {
				//先訂閱『設定』主題
				Topic[] topics = { new Topic(SUB_TOPIC_NAME, QoS.EXACTLY_ONCE) };
				connection.subscribe(topics);
				System.out.println("Subscribe " + SUB_TOPIC_NAME);
				Message message = connection.receive(5, TimeUnit.SECONDS);
				if (message != null) {
                    //如果有收到『設定』主題的訊息,更新TemperatureType
					setTemperatureType(new String(message.getPayload()));
					System.out.println("Received messages. " + SUB_TOPIC_NAME + " {" + getTemperatureType() +"}");
					message.ack(); // 返回ack,告知Broker收到訊息
				}
				connection.unsubscribe(new String[] {SUB_TOPIC_NAME});//取消訂閱
				System.out.println("unsubscribe");
				
				String value = createRandom();
				//發佈訊息,設置傳送品質為AT_LEAST_ONCE,保留訊息
				connection.publish(PUB_TOPIC_NAME, value.getBytes(), QoS.AT_LEAST_ONCE, true);
				System.out.println("Sent messages with temperature=" + value);
				Thread.sleep(15000);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static String createRandom() {
		BigDecimal bd = new BigDecimal(Math.random() * -10 + 30).setScale(1, RoundingMode.FLOOR);
		if("Celsius".equals(TemperatureSensor.temperatureType)) {
			return bd.toPlainString() + "C";
		} else {
			return bd.multiply(new BigDecimal(9)).divide(new BigDecimal(5)).add(new BigDecimal(32)).toPlainString() + "F";
		}
	}

	public static String getTemperatureType() {
		return temperatureType;
	}

	public static void setTemperatureType(String temperatureType) {
		TemperatureSensor.temperatureType = temperatureType;
	}
	
}

上一篇
Topic Name、Topic Filter的Java範例
下一篇
Qos(Quality of Service)
系列文
簡簡單單的MQTT入門23
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言