各位看官好啊~昨天咱們終於將 IDD Features 完成了,各位應該也受夠了只能一直點擊 Read、Read、Read 個不停吧!那麼接下來就做做不一樣的事吧 —— 玩玩 Indicate。而說到 Indicate,就不能不提 IDD Status Changed!那麼咱們開始 IDD Status Changed 的實作之旅吧~
IDD Status Changed 的欄位如下:
咱們只需記錄 Flags 欄位,所以在 config.py 的 Config 類別裡,增加一個成員變數:
class Config:
def __init__(self):
...
self.idd_status_changed_flags = 0
def to_dict(self) -> dict[str, str | int | tuple]:
return {
...
"idd_status_changed_flags": self.idd_status_changed_flags,
}
@classmethod
def from_dict(cls, d: dict[str, str | int | tuple]):
...
obj.idd_status_changed_flags = d["idd_status_changed_flags"]
因 IDD Status Changed 和 IDD Features 一樣具有 Read 屬性,所以可以沿用 IddFeatures 類別的形式,為 IDD Status Changed 建立一個類別雛形:
class IddStatusChanged(ble.mixin.ReadMixin, ble.stack.Characteristic):
def __init__(self, config: config.Config):
ble.stack.Characteristic.__init__(self, 0x2B20, read=True, indicate=True)
self._config = config
def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
common.utils.write_uint16(buf, 0, self._config.idd_status_changed_flags)
return 2
class E2EIddStatusChanged(ble.mixin.E2ETxMixin, IddStatusChanged):
def __init__(self, config: config.Config):
ble.mixin.E2ETxMixin.__init__(self)
IddStatusChanged.__init__(self, config)
def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
data_len = super()._build_read_rsp(buf)
buf[data_len] = self._tx_counter.value
ble.e2e.Crc.fill_crc(buf, data_len + 1, data_len + 3)
return data_len + 3
因為 IDD Status Changed 具備 Indicate 屬性,所以在正式將 Indicate 加入 IddStatusChanged 類別前,必須先知道 Indicate 是什麼。Indicate 是 GATT Server 主動傳送資料給 GATT Client 的一種機制(相對於 Server 回應 Client 的 Read),它的流程如下:
這個流程裡有個重點,就是第三步的「收到 Confirm 後,送出下一筆 Indicate」。也就是說,GATT Server 理應不該在收到 GATT Client 的 Confirm 前,就送出下一筆 Indicate。
但在使用某些 BLE 函式庫時會發現,即使沒有等待 Confirm 就送出下一筆 Indicate,似乎也不會出錯。這是因為這些函式庫會自行控制流程,讓使用者產生可以同時送出多筆 Indicate 的錯覺。所以在正式使用一個尚不熟悉的 BLE 函式庫前,咱們先試試它對 Indicate 的支援程度如何。咱們使用以下程式來簡單測試 MicroPython 對 Indicate 的支援:
class IdsServer(ble.stack.Server):
async def run(self):
await _on_ntf_ind_task()
def _ble_isr(self, event, data):
if event == _IRQ_CENTRAL_CONNECT:
self.conn_handle, _, _ = data
micropython.schedule(_on_connect, None)
elif event == _IRQ_GATTS_INDICATE_DONE:
micropython.schedule(_on_indicate_done, None)
def _on_connect(addr_type):
common.logger.write("Connected")
asyncio.create_task(test())
def _on_indicate_done(unused):
common.logger.write("Indicate done")
async def _on_ntf_ind_task():
while True:
await asyncio.sleep_ms(100)
async def test():
common.logger.write("wait 3 seconds")
await asyncio.sleep(3)
data = bytearray(1)
for i in range(8):
data[0] = i + 1
ble.stack.indicate(
instance.conn_handle, instance._idd_status_changed.value_handle, data
)
當與 nRF Connect app 建立連線後,在 Thonny IDE 的 Shell 窗格會出現以下錯誤:
在送出第 5 筆 Indicate 時,出現記憶體不足的例外;而在 nRF Connect app 上,確實有收到前 4 筆 Indicate:
不過這邊有個奇異的現象,就是若只送出 4 筆 Indicate,那麼雖然不會有任何錯誤出現在 Shell 窗格上,但是只會顯示收到 1 筆「Indicate done」。這部分不知是因 nRF Connect app 沒有送出 Confirm,或者是因為 Confirm 太快,而所用的板子的效能不足,導致 MicroPython 捨棄了某些中斷。
如果在應用上,可以保證同時間的 Indicate 數量不會超過執行環境的上限,那並不須自行做流量控制。但本喵無法保證在 IDS 裡可以滿足此條件,所以會實作一個簡易版的流量控制,它一次只允許一個 Indicate。