為了快速了解如何使用 MicroPython 開發 GATT 伺服器,咱們來實作一個最簡單的 Insulin Delivery Service。
要使用 MicroPython 的藍牙模組時,第一步是呼叫 bluetooth.BLE 類別實體的 active(True):
def main():
# 取得系統唯一的 bluetooth.BLE 物件
ble = bluetooth.BLE()
# 開啟 BLE radio
# 在呼叫任何 BLE 方法前,必須先啟用 BLE
ble.active(True)
# 取得本地端的藍芽位址及類型
mac = ble.config("mac")
print(f"Loacal address: {addr_to_str(mac[1])} ({addr_type_to_str(mac[0])})")
if __name__ == "__main__":
main()
print("done")
完整程式請參考 step01.py
接下來咱們來執行它:
點擊功能表「View」勾選「Files」開啟檔案窗格
檔案窗格的「MicroPython device」部分應可顯示裝置內的檔案,剛燒錄完的裝置的根目錄下至少有 boot.py。
在 Files 窗格選擇要執行的檔案後,快速點擊滑鼠左鍵兩下將其打開,然後按下 F5 執行,可以看到 Shell 窗格印出了藍牙位址以及「done」的字樣:
順利開啟藍牙後,為了讓 GATT 用戶端能夠發現並連接我們的 IDS 伺服器,必須讓伺服器發送可被連接的廣播:
async def main():
# 取得系統唯一的 bluetooth.BLE 物件
ble = bluetooth.BLE()
# 開啟 BLE radio
# 在呼叫任何 BLE 方法前,必須先啟用 BLE
ble.active(True)
# 發送廣播
adv_data = build_advertising_payload()
local_name = "IDS 🍭"
resp_data = build_scan_response_payload(local_name)
ble.gap_advertise(_adv_interval_us, adv_data, resp_data=resp_data)
# 主執行緒會一直等下去
flag = asyncio.ThreadSafeFlag()
await flag.wait()
if __name__ == "__main__":
asyncio.run(main())
print("done")
完整程式請參考 step02.py
廣播的發送是藉由 ble.gap_advertise() 來達成:
它會以指定的間隔(以微秒為單位)開始廣播,此間隔將向下捨入到最接近的 625 微秒的倍數。若要停止廣播,將 interval_us 設為 None。
當程式第一次呼叫 gap_advertise() 時,若只傳入 interval_us,GATT 用戶端得到的伺服器資訊將只有藍牙位址,這並不足以讓使用者明確區分搜尋到的裝置。為了方便識別,通常會再傳入參數 adv_data,甚至 resp_data,來提供更多資訊給使用者。
為了讓伺服器在開始廣播後,主執行緒不會立刻結束(印出 done),本喵在 main() 的最後建立一個asyncio.ThreadSafeFlag 物件,以其 wait() 方法讓主執行緒永遠等待。
為了試驗咱們的伺服器是否運作正常,在手機上安裝 nRF Connect app 來擔當 GATT 用戶端。執行此 app 的 SCAN,並點擊咱們的 IDS 伺服器,可看到類似如下資訊:
當咱們點擊選伺服器資訊右上角的 CONNECT,便可以與其進行連接,並且存取裝置上固有的 Characteristics。但有一個嚴重的問題,就是當 nRF Connect 與 IDS 伺服器斷開連線後,若要再度與其連接,會發現 nRF Connect 無法與 IDS 伺服器建立連線。這是因為此時的伺服器並沒有發送可被連接的廣播,所以用戶端無法與其連線。伺服器在連線成功後,會自動停止廣播;而當斷開連線後,並不會自行重啟廣播,需要由設計者自行啟用。因此咱們必須監控 BLE 的斷線事件,一旦收到通知,便要求藍牙再次廣播。
使用 ble.irq() 將自訂函數 ble_isr() 設為監聽藍牙事件的回呼函數:
async def main():
# 取得系統唯一的 bluetooth.BLE 物件
ble = bluetooth.BLE()
# 指定處理 BLE 事件的函數
ble.irq(ble_isr)
# 開啟 BLE radio
# 在呼叫任何 BLE 方法前,必須先啟用 BLE
ble.active(True)
當收到斷線事件,ble_isr() 會以 micropython.schedule() 安排 ble.gap_advertise(),在離開 ISR 後儘快執行:
def ble_isr(event, data):
ble = bluetooth.BLE()
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, addr_type, addr = data
print(
f"Connected to {addr_to_str(addr)} ({addr_type_to_str(addr_type)}): "
f"conn_handle({conn_handle})"
)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, addr_type, addr = data
print(
f"Disconnected from {addr_to_str(addr)} ({addr_type_to_str(addr_type)}): "
f"conn_handle({conn_handle})"
)
# 要求 MicroPython 在 BLE 中斷後,儘快重新廣播
micropython.schedule(ble.gap_advertise, _adv_interval_us)
完整程式請參考 step03.py
在這裡傳給 ble.gap_advertise() 的參數只有 _adv_interval_us,但本喵曾在「發送廣播」小節裡提過,若沒有傳入 adv_data 和 resp_data,會導致用戶端收到的廣播資訊只有藍牙位址,那為什麼這裡卻只傳入 _adv_interval_us 一個參數呢?這是因為 ble.gap_advertise() 設計成若 adv_data 或 resp_data 為 None,則上一次傳遞給 gap_advertise() 的資料將被重複使用。
有三種安全機制可供設定:le_secure、bond 和 mitm,可依需求自行設定。這裡最重要的設定是 IDS 伺服器的輸入輸出能力,因咱們只需伺服器顯示 PIN,所以使用 ble.config() 將 io 的值設為 0 (_IO_CAPABILITY_DISPLAY_ONLY):
_IO_CAPABILITY_DISPLAY_ONLY = micropython.const(0)
_IO_CAPABILITY_KEYBOARD_ONLY = micropython.const(2)
_IO_CAPABILITY_NO_INPUT_OUTPUT = micropython.const(3)
_IO_CAPABILITY_KEYBOARD_DISPLAY = micropython.const(4)
async def main():
# 取得系統唯一的 bluetooth.BLE 物件
ble = bluetooth.BLE()
# 開啟 BLE radio
# 在呼叫任何 BLE 方法前,必須先啟用 BLE
ble.active(True)
# 啟用安全機制
ble.config(le_secure=True)
ble.config(bond=True)
ble.config(mitm=True)
# 指定本裝置只能顯示連線密碼
ble.config(io=_IO_CAPABILITY_DISPLAY_ONLY)
為了處理配對事件,必須在 ble_isr() 裡監聽 IRQ 31 (_IRQ_PASSKEY_ACTION)。且因伺服器的 I/O 設定是 _IO_CAPABILITY_DISPLAY_ONLY,所以需處理 action 為 3 (_PASSKEY_ACTION_DISPLAY) 時的情況:
# BLE 事件
_IRQ_PASSKEY_ACTION = micropython.const(31)
# 配對類型
_PASSKEY_ACTION_DISPLAY = micropython.const(3)
# 配對的密碼
_passkey = 123456
def ble_isr(event, data):
ble = bluetooth.BLE()
if event == _IRQ_PASSKEY_ACTION:
conn_handle, action, passkey = data
print(
f"PASSKEY_ACTION: conn_handle({conn_handle}), action({action}), passkey({passkey})"
)
if action == _PASSKEY_ACTION_DISPLAY:
micropython.schedule(
lambda params: ble.gap_passkey(*params), (conn_handle, action, _passkey)
)
完整程式請參考 step04.py
當 event 和 action 符合時,會安排 ble.gap_passkey() 在 ISR 結束後,設定預期的 passkey。
以 nRF Connect app 來與其配對:
i. 一開始可以看到 IDS 伺服器的配對狀態為 NOT BONDED
ii. 點擊右上角 DISCONNECT 右邊的圖示
iii. 點擊 Bond
此時可以在 Thonny 的 Shell 窗格看到 IDS 伺服器收到 _IRQ_PASSKEY_ACTION 事件:
iv. 等待用戶端顯示配對要求,然後輸入 PIN
v. 當配對成功,可以觀察到配對資訊變為 BONDED
要註冊 GATT服務的第一步是建立其支援服務的 tuple,格式為:
(
(
Service_1_UUID,
(
(
Characteristic_1_UUID,
Characteristic_1_Flags,
(
(Descriptor_1, Descriptor_1_Flags),
# 其他 Descriptor
)
),
# 其他 Characteristic
)
),
# 其他 Service
)
# Characteristic 的屬性
_FLAG_READ = micropython.const(0x0002)
# Insulin Delivery Service 相關 UUID
_IDS_UUID = micropython.const(0x183A)
_IDD_FEATURES_UUID = micropython.const(0x2B23)
def build_ids_definition():
"""創建 Insulin Delivery Service Characteristics"""
ids = bluetooth.UUID(_IDS_UUID)
features = (bluetooth.UUID(_IDD_FEATURES_UUID), _FLAG_READ)
return (ids, (features,))
服務 tuple 建立好後,用 ble.gatts_register_services() 註冊 GATT 服務:
async def main():
# 取得系統唯一的 bluetooth.BLE 物件
ble = bluetooth.BLE()
# 指定處理 BLE 事件的函數
ble.irq(ble_isr)
# 開啟 BLE radio
# 在呼叫任何 BLE 方法前,必須先啟用 BLE
ble.active(True)
# 註冊 IDS
ids = build_ids_definition()
services = (ids,)
handles = ble.gatts_register_services(services)
ids_handles = handles[0]
print("IDS Characteristic value handles: " + str(ids_handles))
ble.gatts_register_services() 返回的是 Services 包含的 Characteristics 的 value handles,格式為:
(
(
Service_1_Characteristic_1_value_handle,
Service_1_Characteristic_2_value_handle,
# 其他 value handles
),
(
Service_2_Characteristic_1_value_handle,
Service_2_Characteristic_2_value_handle,
# 其他 value handles
),
# 其他 Service 的 Characteristic value handles
)
因為目前的伺服器只有一個 Service - IDS,其內只有一個 Characteristic - IDD Features,所以由 handles 得到 IDD Features 的 value handle 是 16:
註冊了 GATT 服務後,還需要處理用戶端對於伺服器的 Characteristic 的讀寫要求。因為我們只賦予 IDD Features 能被 Read 的 Flags,所以只需處理 Characteristic Read 的事件:
# BLE 事件
_IRQ_GATTS_READ_REQUEST = micropython.const(4)
def ble_isr(event, data):
ble = bluetooth.BLE()
if event == _IRQ_GATTS_READ_REQUEST:
conn_handle, value_handle = data
rsp = bytes((0x01, 0x02, 0x03))
# 將要回覆的讀取要求寫入 characteristic 裡
ble.gatts_write(value_handle, rsp)
print(f"Send read response: {tuple(rsp)}")
完整程式請參考 step05.py
如此,當使用者在 nRF Connect 點擊讀取圖示後:
便可得到回應:
然後在 Thonny 的 Shell 窗格可以看到紀錄:
至此,一個基礎的 GATT server 便完成了。在之後的日子,本喵會一步步重構,完善 characteristics 的功能。