iT邦幫忙

2024 iThome 鐵人賽

DAY 2
0

相信各位讀者在上一篇利用Python實作MQTT(ㄧ)的文章中,嘗試實作完後已經迫不及待想知道如何利用Python實作發佈和訂閱了,那我們接下來先進行發布(Publish)的實作。

發布主題

import paho.mqtt.client as mqtt
import time

#定義發布函式
def publish(client, topic):
    msg_count = 0 #計算已發布的訊息數量
    while True: #設定迴圈讓他不斷的發布訊息
        time.sleep(5) #每間隔5秒發一次訊息
        msg = f'訊息: {msg_count}' #想傳送的訊息
        result = client.publish(topic, msg) #發布訊息到主題,回傳發送結果
        rc = result[0] #將發送結果指定到rc變數
        if (rc == 0): #若rc等於0,則發送成功
            print(f'傳送{msg} 到主題 {topic}')
        else:
             print(f'傳送訊息失敗,無法傳送訊息到該主題{topic}')
        msg_count += 1

publish(client, "/home/plug") #主題的部分可自行自訂,格式以"/"分隔

完整發布程式碼

import paho.mqtt.client as mqtt
import time

def connect_MQTT_broker(broker_host, port):
    def on_connect(client, userdata, flags, rc):
        if (rc == 0):
            print("連線成功")
        else:
            print(f'連線失敗, 錯誤代碼={rc}')

    client = mqtt.Client()
    client.on_connect = on_connect
    client.connect(broker_host, port)
    client.loop_start()
    return client

def publish(client, topic):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f'訊息: {msg_count}'
        result = client.publish(topic, msg)
        rc = result[0]
        if (rc == 0):
            print(f'傳送{msg} 到主題 {topic}')
        else:
             print(f'傳送訊息失敗,無法傳送訊息到該主題{topic}')
        msg_count += 1

iron_client_pub = connect_MQTT_broker("broker.emqx.io", 1883)
publish(iron_client_pub, "/home/plug")

執行結果

連線成功
傳送訊息: 0 到主題 /home/plug
傳送訊息: 1 到主題 /home/plug
傳送訊息: 2 到主題 /home/plug
傳送訊息: 3 到主題 /home/plug
傳送訊息: 4 到主題 /home/plug
傳送訊息: 5 到主題 /home/plug
傳送訊息: 6 到主題 /home/plug
傳送訊息: 7 到主題 /home/plug
傳送訊息: 8 到主題 /home/plug
傳送訊息: 9 到主題 /home/plug
.
.
.

成功執行的讀者,有沒有覺得離學會MQTT又更近一步了啊,下一篇就一起繼續來實作訂閱(Subscribe)的部分吧!


上一篇
[DAY1]使用Python實作MQTT(一)
系列文
讓Python不拍勝-實用套件實作與介紹2
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言