發佈訊息
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
public class PubThread extends Thread {
private String topic;
final static String IP = "127.0.0.1";
final static int PORT = 1883;
final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void run(){
MQTT mqtt = new MQTT();
try {
mqtt.setHost(IP, PORT); // 設定ip和port
BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); // 連接Broker
System.out.println("Connected to Broker!");
while (true) {
String value = topic + "-->" +createRandom();
//發佈訊息,TOPIC為"temperature/Wuling",設置傳送品質為AT_LEAST_ONCE,不保留訊息
connection.publish(topic, value.getBytes(), QoS.AT_LEAST_ONCE, false);
System.out.println("Sent messages with temperature=" + value);
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static String createRandom() {
return
sdf.format(new Date()) + ":" +
new BigDecimal(Math.random() * -10 + 30).setScale(1, RoundingMode.FLOOR).toPlainString();
}
}
public class Pub {
final static String TOPIC_NAME = "temperature/Wuling";
final static String IP = "127.0.0.1";
final static int PORT = 1883;
public static void main(String[] args) throws Exception {
startThread("台灣/太魯閣國家公園/武嶺/溫度");
startThread("台灣/太魯閣國家公園/武嶺/濕度");
startThread("台灣/太魯閣國家公園/長春祠/溫度");
startThread("台灣/太魯閣國家公園/長春祠/濕度");
startThread("台灣/墾丁國家公園/溫度");
startThread("台灣/墾丁國家公園/濕度");
startThread("台灣/陽明山國家公園/溫度");
startThread("台灣/陽明山國家公園/濕度");
}
private static void startThread(String topic) {
PubThread t = new PubThread();
t.setTopic(topic);
t.start();
}
}
訂閱
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
public class SubThread extends Thread {
private String topic;
final static String IP = "127.0.0.1";
final static int PORT = 1883;
final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void run(){
MQTT mqtt = new MQTT();
try {
mqtt.setHost(IP, PORT); // 設定ip和port
BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); // 連接Broker
System.out.println("Connected to Broker!");
//設置Topic,傳送品質為EXACTLY_ONCE
Topic[] topics = { new Topic(topic, QoS.EXACTLY_ONCE) };
connection.subscribe(topics);
while (true) {
//取得訊息
Message message = connection.receive(10, TimeUnit.SECONDS);
if (message != null) {
System.out.println("Received messages. " + topic + " " + new String(message.getPayload()));
message.ack(); // 返回ack,告知Broker收到訊息
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static String createRandom() {
return
sdf.format(new Date()) + ":" +
new BigDecimal(Math.random() * -10 + 30).setScale(1, RoundingMode.FLOOR).toPlainString();
}
}
public class Sub {
final static String TOPIC_NAME = "temperature/Wuling";
final static String IP = "127.0.0.1";
final static int PORT = 1883;
public static void main(String[] args) throws Exception {
startThread("台灣/太魯閣國家公園/武嶺/溫度");
startThread("台灣/太魯閣國家公園/#");
startThread("台灣/+/#");
startThread("台灣/+/武嶺/溫度");
}
private static void startThread(String topic) {
SubThread t = new SubThread();
t.setTopic(topic);
t.start();
}
}