iT邦幫忙

2025 iThome 鐵人賽

DAY 15
0
Software Development

以MicroPython在ESP32上實作Insulin Delivery Service系列 第 15

Day 15 - IDD Status Changed (3) 之 上 Indicate Buff

  • 分享至 

  • xImage
  •  

昨天咱們已經將 Notify & Indicate 流量控制完成了,接下來就是給 IDD Status Changed 上 Indicate 的 Buff~

1. Indicate 介面

就像 ReadMixin 一樣,Indicate 也可以抽離出共同部分:

class IndicateMixin:
    def send_data(self, conn_handle: int | None, value_handle: int, arg):
        # 如果已經連線
        if conn_handle is not None:
            data = self._build_indicate_payload(arg)
            self._after_build_tx_data()

            if data is not None:
                ble.stack.indicate(conn_handle, value_handle, data)
                return True

        return False

    def _build_indicate_payload(self, arg) -> bytes | bytearray | None:
        return arg

    def _after_build_tx_data(self):
        pass
  • _build_indicate_payload(): 建立待傳送的資料
  • _after_build_tx_data():處理取得資料後的行為
  • send_data():整合 _build_indicate_payload()_after_build_tx_data(),然後送出 Indicate。send_data() 就是呼叫 BleTxScheduler.add() 時,傳入的 fn

2. 上 Indicate Buff

IddStatusChanged 賦予 Indicate,就像給它上 Read Buff 一樣簡單:

  • IddStatusChanged 繼承 IndicateMixin
  • 覆寫 IndicateMixin._build_indicate_payload()
  • 其餘部分無須改變
class IddStatusChanged(
    ble.mixin.IndicateMixin, ble.mixin.ReadMixin, ble.stack.Characteristic
):
    def __init__(self, config: config.Config):
        ble.stack.Characteristic.__init__(self, 0x2B20, read=True, indicate=True)

        self._config = config

    def _build_indicate_payload(self, arg) -> bytes:
        n = self._build_read_rsp(ble.mixin._rbuf_mv)
        return bytes(ble.mixin._rbuf_mv[:n])

    def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
        common.utils.write_uint16(buf, 0, self._config.idd_status_changed_flags)
        return 2

class E2EIddStatusChanged(ble.mixin.E2ETxMixin, IddStatusChanged):
    def __init__(self, config: config.Config):
        ble.mixin.E2ETxMixin.__init__(self)
        IddStatusChanged.__init__(self, config)

    def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
        data_len = super()._build_read_rsp(buf)

        buf[data_len] = self._tx_counter.value
        ble.e2e.Crc.fill_crc(buf, data_len + 1, data_len + 3)

        return data_len + 3

_build_indicate_payload() 裡有個不好的設計,直接使用 ble.mixin._rbuf_mv,這原本是只給 ReadMixin 類別內部使用,並沒有要公開(請參考 IDD Features 的部分)。其實一開始的設計如下:

def _build_indicate_payload(self, arg) -> bytearray:
    return self._build_read_rsp()

def _build_read_rsp(self) -> bytearray:
    buf = bytearray(2)
    common.utils.write_uint16(buf, 0, self._config.idd_status_changed_flags)
    return buf

最初 _build_read_rsp() 所用的 buf 是在函數內部產生,只是因為 _build_read_rsp() 必須在中斷裡被呼叫,而中斷裡不該配置記憶體,後來權宜之下,才把

_build_read_rsp(self) -> bytearray

改為

_build_read_rsp(self, buf: bytearray | memoryview) -> int

導致 _build_indicate_payload() 必須改變實作。當然也可以這樣做:

def _build_indicate_payload(self, arg) -> bytes:
    # IDD Status Changed 的資料長度為 2 或 5 bytes
    buf = bytearray(5)
    n = self._build_read_rsp(buf)
    return bytes(buf[:n])

但因為 IDD Status Changed Indicate 頗頻繁,每次都在 _build_indicate_payload() 裡配置 3 次記憶體(bytearray()buf[:n]bytes()),會否有不好的影響?所以才沿用 ble.mixin._rbuf_mv,這樣至少只需配置 1 次記憶體。

3. 測試 Indicate

為了測試 IDD Status Changed 的 Indicate,咱們讓 IdsServer 可以每秒發送一次 IDD Status Changed:

class IdsServer(ble.stack.Server):
    async def run(self):
        timer = machine.Timer(0, period=1000, callback=_on_timer)

def _on_timer(timer):
    '''計時器中斷函數'''
    micropython.schedule(_on_timestamp_changed, None)

def _on_timestamp_changed(unused):
    '''由 micropython.schedule() 安排執行'''
    ble.stack.BleTxScheduler().add(
        ble.stack.ACT_INDICATE,
        instance._idd_status_changed.send_data,
        instance._idd_status_changed.value_handle,
        None,
    )

這裡可能會讓人疑惑,為什麼不在中斷函數 _on_timer() 裡直接呼叫 BleTxScheduler().add(),而要透過 micropython.schedule()?這是因為:

  1. BleTxScheduler().add()BleTxScheduler.run() 都會存取同一個資源
  2. BleTxScheduler.run() 是 async 函數
  3. 在中斷函數裡,存取主執行緒也在使用的共用資源可能造成 race condition 或資料損毀
  4. 若不使用 micropython.schedule(),則 BleTxScheduler.run() 存取資源前須先停止中斷
  5. BleTxScheduler().add() 不只在中斷函數裡被使用,它存取資源前,也須停止中斷

因為有上述理由,所以使用 micropython.schedule() 是最方便的方法。

另外要提一點,micropython.schedule(fn, arg) 所安排的函數 fn 並不適合長時間運作,也就是說,若 fn 不能快速結束,那需要用 asyncio.create_task() 之類的方法安排執行。

如此,一切就緒。當 IDS server 與 nRF Connect app 建立連線後,便會每秒鐘收到 IDD Status Changed 的 Indicate。


「結束了嗎?結束了嗎?結束了吧!」看官們紛紛不耐煩地吆喝!
好~好~好~
看官至上~
IDD Status Changed 就先這樣吧~


上一篇
Day 14 - IDD Status Changed (2) 之 Notify & Indicate 排程器
下一篇
Day 16 - CCCD Monitor - 為 MicroPython 藍牙新增功能吧!
系列文
以MicroPython在ESP32上實作Insulin Delivery Service31
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言