在KubeEdge小專題中一共用到兩支Python程式,今天要說的是我用python訂閱某一主題的python程式。
底下是程式碼部分,有對應的註解分別說明程式碼:
mport paho.mqtt.client as mqtt
import json
from influxdb import InfluxDBClient
from datetime import datetime, timezone, timedelta
# 連接到InfluxDB
dbClient = InfluxDBClient('192.168.0.7', '8086', 'telegraf', 'telegraf', 'db0')
def on_connect(client, userdata, flags, rc):
print("Connect with result code" + str(rc))
# 訂閱主題
client.subscribe("channels/light/")
def on_message(client, userdata, msg):
# 吐出訂閱到的主題與資料(Debug 用)
print(msg.topic+""+msg.payload.decode('utf-8'))
# 取出訂閱主題的資料值(從MQTT取到的資料格式為json)
extract_light = sensorData(msg.payload.decode('utf-8'))
#print(type(extract_light))
# 取得現在時間
dt1 = datetime.utcnow().replace(tzinfo=timezone.utc)
dt2 = dt1.astimezone(timezone(timedelta(hours=0))) # timezone: UTC+8
time = dt2.strftime("%Y-%m-%d %H:%M:%S")
# 包裝json格式用來放入influxdb資料庫中
toDB_json = [{
"measurement": "Sensor",
"time": time,
"tags":{
'location': "Dormitory"
},
"fields":{
'light': extract_light
}
}]
# 寫入資料點
dbClient.write_points(toDB_json)
# 用來處理訂閱MQTT取得的json格式資料,回傳感測器數值
def sensorData(data):
data = json.loads(data)
light = data["Light"]
return light
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("192.168.0.7", 1883, 60)
client.loop_forever()
package的部份用了幾個。paho.mqtt.client是用來控制MQTT的;json是因為寫入influxDB時使用json格式,因此我們使用這個套件來處理格式轉換;
influxdb是python用來控制influxdb資料庫的套件;datetime是用來處理時區轉換的,因為不確定influxdb接到資料後預設寫入的時間是否符合台灣的時區,所以特別使用這個取得符合台灣時區的資料時間後一併寫入資料庫中。
底下是influxdb資料庫讀取資料的json格式:
toDB_json = [{
"measurement": <數據表名>,
"time": 時間(influxDB以時間作排序依據),
"tags":{
# 用來標記資料標籤,後續作資料選擇時可以使用
'標籤': '健值'
},
"fields":{
# 寫入資料內容
'資料名': 資料
}
}]
連接資料庫部分帶的參數是:
InfluxDBClient(資料庫IP, 資料庫port(預設是8086), '使用者名稱', '使用者密碼', '資料庫名稱')
時區轉換的部分,如同我前面所說的我預想寫入符合台灣時區的當前時間,作為資料寫入時間。但我使用台灣時區(UTC+8)寫入時反而發現Grafana從資料庫取出資料後的顯示的資料紀錄時間反而多了8小時,後來還沒仔細研究是誰的時間轉換出了問題就先把時間改成以UTC+0為時區寫入,Grafana顯示的資料紀錄時間正好對的上。
參考資料:
第11屆IT邦幫忙鐵人賽:使用 Python 進行 Publish & Subscribe,從python入門到物聯網系列
python 寫入 influxdb