昨天咱們已經將 Notify & Indicate 流量控制完成了,接下來就是給 IDD Status Changed 上 Indicate 的 Buff~
就像 ReadMixin
一樣,Indicate 也可以抽離出共同部分:
class IndicateMixin:
def send_data(self, conn_handle: int | None, value_handle: int, arg):
# 如果已經連線
if conn_handle is not None:
data = self._build_indicate_payload(arg)
self._after_build_tx_data()
if data is not None:
ble.stack.indicate(conn_handle, value_handle, data)
return True
return False
def _build_indicate_payload(self, arg) -> bytes | bytearray | None:
return arg
def _after_build_tx_data(self):
pass
_build_indicate_payload()
: 建立待傳送的資料_after_build_tx_data()
:處理取得資料後的行為send_data()
:整合 _build_indicate_payload()
和 _after_build_tx_data()
,然後送出 Indicate。send_data()
就是呼叫 BleTxScheduler.add()
時,傳入的 fn
。給 IddStatusChanged
賦予 Indicate,就像給它上 Read Buff 一樣簡單:
IddStatusChanged
繼承 IndicateMixin
IndicateMixin._build_indicate_payload()
class IddStatusChanged(
ble.mixin.IndicateMixin, ble.mixin.ReadMixin, ble.stack.Characteristic
):
def __init__(self, config: config.Config):
ble.stack.Characteristic.__init__(self, 0x2B20, read=True, indicate=True)
self._config = config
def _build_indicate_payload(self, arg) -> bytes:
n = self._build_read_rsp(ble.mixin._rbuf_mv)
return bytes(ble.mixin._rbuf_mv[:n])
def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
common.utils.write_uint16(buf, 0, self._config.idd_status_changed_flags)
return 2
class E2EIddStatusChanged(ble.mixin.E2ETxMixin, IddStatusChanged):
def __init__(self, config: config.Config):
ble.mixin.E2ETxMixin.__init__(self)
IddStatusChanged.__init__(self, config)
def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
data_len = super()._build_read_rsp(buf)
buf[data_len] = self._tx_counter.value
ble.e2e.Crc.fill_crc(buf, data_len + 1, data_len + 3)
return data_len + 3
_build_indicate_payload()
裡有個不好的設計,直接使用 ble.mixin._rbuf_mv
,這原本是只給 ReadMixin
類別內部使用,並沒有要公開(請參考 IDD Features 的部分)。其實一開始的設計如下:
def _build_indicate_payload(self, arg) -> bytearray:
return self._build_read_rsp()
def _build_read_rsp(self) -> bytearray:
buf = bytearray(2)
common.utils.write_uint16(buf, 0, self._config.idd_status_changed_flags)
return buf
最初 _build_read_rsp()
所用的 buf
是在函數內部產生,只是因為 _build_read_rsp()
必須在中斷裡被呼叫,而中斷裡不該配置記憶體,後來權宜之下,才把
_build_read_rsp(self) -> bytearray
改為
_build_read_rsp(self, buf: bytearray | memoryview) -> int
導致 _build_indicate_payload()
必須改變實作。當然也可以這樣做:
def _build_indicate_payload(self, arg) -> bytes:
# IDD Status Changed 的資料長度為 2 或 5 bytes
buf = bytearray(5)
n = self._build_read_rsp(buf)
return bytes(buf[:n])
但因為 IDD Status Changed Indicate 頗頻繁,每次都在 _build_indicate_payload()
裡配置 3 次記憶體(bytearray()
、buf[:n]
和 bytes()
),會否有不好的影響?所以才沿用 ble.mixin._rbuf_mv
,這樣至少只需配置 1 次記憶體。
為了測試 IDD Status Changed 的 Indicate,咱們讓 IdsServer
可以每秒發送一次 IDD Status Changed:
class IdsServer(ble.stack.Server):
async def run(self):
timer = machine.Timer(0, period=1000, callback=_on_timer)
def _on_timer(timer):
'''計時器中斷函數'''
micropython.schedule(_on_timestamp_changed, None)
def _on_timestamp_changed(unused):
'''由 micropython.schedule() 安排執行'''
ble.stack.BleTxScheduler().add(
ble.stack.ACT_INDICATE,
instance._idd_status_changed.send_data,
instance._idd_status_changed.value_handle,
None,
)
這裡可能會讓人疑惑,為什麼不在中斷函數 _on_timer()
裡直接呼叫 BleTxScheduler().add()
,而要透過 micropython.schedule()
?這是因為:
BleTxScheduler().add()
和 BleTxScheduler.run()
都會存取同一個資源BleTxScheduler.run()
是 async 函數micropython.schedule()
,則 BleTxScheduler.run()
存取資源前須先停止中斷BleTxScheduler().add()
不只在中斷函數裡被使用,它存取資源前,也須停止中斷因為有上述理由,所以使用 micropython.schedule()
是最方便的方法。
另外要提一點,micropython.schedule(fn, arg)
所安排的函數 fn
並不適合長時間運作,也就是說,若 fn
不能快速結束,那需要用 asyncio.create_task()
之類的方法安排執行。
如此,一切就緒。當 IDS server 與 nRF Connect app 建立連線後,便會每秒鐘收到 IDD Status Changed 的 Indicate。
「結束了嗎?結束了嗎?結束了吧!」看官們紛紛不耐煩地吆喝!
好~好~好~
看官至上~
IDD Status Changed 就先這樣吧~