昨天咱們已經將 Notify & Indicate 流量控制完成了,接下來就是給 IDD Status Changed 上 Indicate 的 Buff~
就像 ReadMixin 一樣,Indicate 也可以抽離出共同部分:
class IndicateMixin:
    def send_data(self, conn_handle: int | None, value_handle: int, arg):
        # 如果已經連線
        if conn_handle is not None:
            data = self._build_indicate_payload(arg)
            self._after_build_tx_data()
            if data is not None:
                ble.stack.indicate(conn_handle, value_handle, data)
                return True
        return False
    def _build_indicate_payload(self, arg) -> bytes | bytearray | None:
        return arg
    def _after_build_tx_data(self):
        pass
_build_indicate_payload(): 建立待傳送的資料_after_build_tx_data():處理取得資料後的行為send_data():整合 _build_indicate_payload() 和 _after_build_tx_data(),然後送出 Indicate。send_data() 就是呼叫 BleTxScheduler.add() 時,傳入的 fn。給 IddStatusChanged 賦予 Indicate,就像給它上 Read Buff 一樣簡單:
IddStatusChanged 繼承 IndicateMixin
IndicateMixin._build_indicate_payload()
class IddStatusChanged(
    ble.mixin.IndicateMixin, ble.mixin.ReadMixin, ble.stack.Characteristic
):
    def __init__(self, config: config.Config):
        ble.stack.Characteristic.__init__(self, 0x2B20, read=True, indicate=True)
        self._config = config
    def _build_indicate_payload(self, arg) -> bytes:
        n = self._build_read_rsp(ble.mixin._rbuf_mv)
        return bytes(ble.mixin._rbuf_mv[:n])
    def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
        common.utils.write_uint16(buf, 0, self._config.idd_status_changed_flags)
        return 2
class E2EIddStatusChanged(ble.mixin.E2ETxMixin, IddStatusChanged):
    def __init__(self, config: config.Config):
        ble.mixin.E2ETxMixin.__init__(self)
        IddStatusChanged.__init__(self, config)
    def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
        data_len = super()._build_read_rsp(buf)
        buf[data_len] = self._tx_counter.value
        ble.e2e.Crc.fill_crc(buf, data_len + 1, data_len + 3)
        return data_len + 3
_build_indicate_payload() 裡有個不好的設計,直接使用 ble.mixin._rbuf_mv,這原本是只給 ReadMixin 類別內部使用,並沒有要公開(請參考 IDD Features 的部分)。其實一開始的設計如下:
def _build_indicate_payload(self, arg) -> bytearray:
    return self._build_read_rsp()
def _build_read_rsp(self) -> bytearray:
    buf = bytearray(2)
    common.utils.write_uint16(buf, 0, self._config.idd_status_changed_flags)
    return buf
最初 _build_read_rsp() 所用的 buf 是在函數內部產生,只是因為 _build_read_rsp() 必須在中斷裡被呼叫,而中斷裡不該配置記憶體,後來權宜之下,才把
_build_read_rsp(self) -> bytearray
改為
_build_read_rsp(self, buf: bytearray | memoryview) -> int
導致 _build_indicate_payload() 必須改變實作。當然也可以這樣做:
def _build_indicate_payload(self, arg) -> bytes:
    # IDD Status Changed 的資料長度為 2 或 5 bytes
    buf = bytearray(5)
    n = self._build_read_rsp(buf)
    return bytes(buf[:n])
但因為 IDD Status Changed Indicate 頗頻繁,每次都在 _build_indicate_payload() 裡配置 3 次記憶體(bytearray()、buf[:n] 和 bytes()),會否有不好的影響?所以才沿用 ble.mixin._rbuf_mv,這樣至少只需配置 1 次記憶體。
為了測試 IDD Status Changed 的 Indicate,咱們讓 IdsServer 可以每秒發送一次 IDD Status Changed:
class IdsServer(ble.stack.Server):
    async def run(self):
        timer = machine.Timer(0, period=1000, callback=_on_timer)
def _on_timer(timer):
    '''計時器中斷函數'''
    micropython.schedule(_on_timestamp_changed, None)
def _on_timestamp_changed(unused):
    '''由 micropython.schedule() 安排執行'''
    ble.stack.BleTxScheduler().add(
        ble.stack.ACT_INDICATE,
        instance._idd_status_changed.send_data,
        instance._idd_status_changed.value_handle,
        None,
    )
這裡可能會讓人疑惑,為什麼不在中斷函數 _on_timer() 裡直接呼叫 BleTxScheduler().add(),而要透過 micropython.schedule()?這是因為:
BleTxScheduler().add() 和 BleTxScheduler.run() 都會存取同一個資源BleTxScheduler.run() 是 async 函數micropython.schedule(),則 BleTxScheduler.run() 存取資源前須先停止中斷BleTxScheduler().add() 不只在中斷函數裡被使用,它存取資源前,也須停止中斷因為有上述理由,所以使用 micropython.schedule() 是最方便的方法。
另外要提一點,micropython.schedule(fn, arg) 所安排的函數 fn 並不適合長時間運作,也就是說,若 fn 不能快速結束,那需要用 asyncio.create_task() 之類的方法安排執行。
如此,一切就緒。當 IDS server 與 nRF Connect app 建立連線後,便會每秒鐘收到 IDD Status Changed 的 Indicate。
「結束了嗎?結束了嗎?結束了吧!」看官們紛紛不耐煩地吆喝!
好~好~好~
看官至上~
IDD Status Changed 就先這樣吧~