今天要說明另一個小程式,我用來將感測器資料由邊緣端資料庫備份至雲端。
底下是我的程式:
import sys
import traceback
import json
from influxdb import InfluxDBClient
from datetime import datetime, timezone, timedelta
# 分別連上邊緣端以及雲端資料庫
edgedb = InfluxDBClient('192.168.0.7', '8086', 'telegraf', 'telegraf', 'db0')
clouddb = InfluxDBClient('192.168.0.3', '30006', 'telegraf', 'telegraf', 'LightSensorBackup')
# 從邊緣端資料庫取出資料(目前所有資料),
result = edgedb.query('select * from Sensor')
result_list = list(result.get_points())
# 寫入雲端資料庫
try:
for i in range (len(result_list)):
write_json = [{
"measurement": "Sensor",
"time": result_list[i]['time'],
"tags":{
'location': result_list[i]['location']
},
"fields":{
'light': result_list[i]['light']
}
}]
clouddb.write_points(write_json)
except Exception as e:
print(e)
else:
# 取得當前時間(UTC+8)
dt1 = datetime.utcnow().replace(tzinfo=timezone.utc)
dt2 = dt1.astimezone(timezone(timedelta(hours=8)))
# 刪除邊緣端上剛備份的所有資料
start_time = result_list[0]['time']
end_time = result_list[len(result_list) - 1]['time']
delete_cmd = "delete from Sensor where time >= '" + start_time + "' and time <= '" + end_time +"'"
edgedb.query(delete_cmd)
# 顯示這次備份訊息
print('Backup' + str(len(result_list)) + 'data')
print('from' + result_list[0]['time'] + 'to' + result_list[len(result_list)-1]['time'])
print('Backup finish ...')
print('timestamp: '+ d2.strftime("%Y-%m-%d %H:%M:%S"))
註解如同上面所寫的,其實這個方法蠻土法煉鋼的,先query邊緣端的資料存成list後再依序取出寫入雲端資料庫中,最後再透過選取時間範圍的方式刪掉邊緣端上的資料,避免誤刪到還沒上傳過的資料。主要是沒有查到influxDB有沒有跟備份資料庫有關的指令,所以先用這種方法作備份。
其實正確來說KubeEdge應該是能夠將邊緣端資料上傳至雲端的,概念上好像是EventBus會訂閱幾個特定主題,如果我們發布訊息到這幾個主題上便會被接收到,接到後會先核對預先宣告的device中有沒有對應到這則訊息的device,如果有的話會再作上傳的動作。根據我測試官方提供的範例程式以及官方說明文件推論得到的,但我當時測試的時候好像是邊緣端找不到預先宣告的device,所以沒有觸發後續資料上傳至雲端的動作,具體的問題在那裡還沒有發現,如果之後解決這個問題就可以省去另外運行資料庫的麻煩了,也能自動上傳資料到雲端資料庫。
接下來兩天會分別說明Grafana與influxdb的部屬。