如同上篇文章所說,這裡選用AWS的DynamoDB進行資料的存放,主要是對把地端(實體)與雲端(虛擬)產生ROS topics,針對有需要的部分記錄下來。
目前已知小車移動會用到的topics主要有兩個,initialpose和clicked_point。
並利用ros的rcply library產生ros node,負責監聽地端的ROS topic,將上述指定的兩個topics deliver給AWS DynamoDB。
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
def on_connect(client, userdata, flags, rc):
client.subscribe("initialpose")
client.subscribe("clicked_point")
就是之前提到在雲端的VM上透過一隻python程式去監聽,負責將從mqtt subscribe到的topics,透過boto3記錄到DynamoDB資料庫中。
boto3可以很方便直接使用aws中的服務(只要在.aws configuration設定好IAM即可),然後就可以直接透過.Table取得要存取的資料表。
import ast
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(TABLE_NAME)
def on_message(client, userdata, msg):
items = []
items.append(ast.literal_eval(str(msg.payload.decode("utf-8", "ignore"))))
try:
with table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
except ClientError:
raise
這裡要注意on_message拿到的payload資料是encode into byte string,所以要用utf-8 decode一次並使用ast.literal_eval轉成dict的資料型別。
取得資料的時候則可以這樣:
response = table.batch_get_item(
RequestItems={
TABLE_NAME: {
'Keys': [
{
'topic': 'initialpose'
},
{
'topic': 'clicked_point'
}
]
}
}
)
而這個部分以後可以使用Greengrass,等問題解決後取代mqtt,透過Greengrass去存取DynamoDB資料。可參考此程式碼。
import awsiot.greengrasscoreipc as gg
import awsiot.greengrasscoreipc.model as model
self.ipc_client = gg.connect()
operation = self.ipc_client.new_publish_to_iot_core()
operation.activate(model.PublishToIoTCoreRequest(
topic_name=topic,
qos=model.QOS.AT_LEAST_ONCE,
payload=pickle.dumps(msg),
))
這樣就只要在Greengrass的console UI介面去設定Greengrass rule將資料轉至DynamoDB。