昨天本喵說到,為了避免太快送出 Indicate 導致的失敗 (在還沒收到前一筆 Indicate 的 Confirm 前,就送出下一筆 Indicate),所以會製作一個簡易的流量管控,現在咱們就開始吧~
咱們讓 BleTxScheduler 類別負責管理 Notify 和 Indicate 的送出。這邊要稍微說明一下 Notify 是什麼,其實它和 Indicate 的機制很像,只是送出後,不需要等待 GATT Client 的 Confirm。所以基本上可以不管不顧地拼命送,但對方是否真的有收到就未可知了。
咱們在 BleTxScheduler 使用了單例模式,這表示無論呼叫 BleTxScheduler() 多少次,都會得到同一實例:
class BleTxScheduler:
    _instance = None
    _initialized = False
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    def __init__(self, size=8) -> None:
        if self._initialized:
            return
        # 儲存執行 Notify/Indicate 的函數與其參數
        self._queue_ntf_ind: collections.deque[tuple] = collections.deque((), size)
        # 通知系統可以送出下一筆 Indication
        self._ind_done_flag = asyncio.ThreadSafeFlag()
        register_irq_handler(self._ble_isr)
        self._initialized = True
為什麼要使用單例模式呢?因為所有要執行 Notify 和 Indicate 的地方都需要它,若不用單例模式,那每個要使用的模組或類別都必須傳入它的一個參考,這將花費不少記憶體,而且也會增加模組和類別的介面的複雜度。
當然咱們也不一定需要單例模式,還記得 IddServer 類別嗎?當時咱們就只是用個全域變數來使用它。那麼在 BleTxScheduler 上,若使用全域變數,是否可以減少記憶體的使用量呢?咱們可以類似這樣設計:
# ble/stack.py
bleTxScheduler = None
class BleTxScheduler:
    @classmethod
    def init_inst(cls):
        global bleTxScheduler
        if bleTxScheduler is None:
            bleTxScheduler = cls()
# ble/server.py
class IdsServer(ble.stack.Server):
    def __init__(self):
        ble.stack.BleTxScheduler.init_inst()
    async def run(self):
        await ble.stack.bleTxScheduler.run()
async def test():
    ble.stack.bleTxScheduler.add(
        ble.stack.ACT_INDICATE,
        indicate,
        instance._idd_status_changed.value_handle,
        data,
    )
測試結果如下:
| MicroPython 1.25.0 | 單例模式 | 全域變數 | 說明 | 
|---|---|---|---|
| Free Heap RAM (bytes) | 135648 | 134688 | 愈大愈好 | 
| max new split | 102400 | 102400 | 愈大愈好 | 
| No. of 1-blocks | 304 | 324 | 愈小愈好 | 
| No. of 2-blocks | 86 | 91 | 愈小愈好 | 
| max blk sz | 72 | 72 | 愈大愈好 | 
| max free sz | 1051 | 922 | 愈大愈好 | 
| MicroPython 1.26.0 | 單例模式 | 全域變數 | 說明 | 
|---|---|---|---|
| Free Heap RAM (bytes) | 138928 | 137776 | 愈大愈好 | 
| max new split | 106496 | 106496 | 愈大愈好 | 
| No. of 1-blocks | 320 | 340 | 愈小愈好 | 
| No. of 2-blocks | 91 | 96 | 愈小愈好 | 
| max blk sz | 72 | 72 | 愈大愈好 | 
| max free sz | 924 | 865 | 愈大愈好 | 
可以看到,在 BleTxScheduler,單例模式在記憶體使用上,可說是完勝全域變數,但這並非通例。比如 IdsServer 類別,當 BleTxScheduler 使用單例模式時,IdsServer 在兩種模式下各有勝負:
| MicroPython 1.25.0 | 單例模式 | 全域變數 | 說明 | 
|---|---|---|---|
| Free Heap RAM (bytes) | 135168 | 135648 | 愈大愈好 | 
| max new split | 102400 | 102400 | 愈大愈好 | 
| No. of 1-blocks | 304 | 304 | 愈小愈好 | 
| No. of 2-blocks | 85 | 86 | 愈小愈好 | 
| max blk sz | 72 | 72 | 愈大愈好 | 
| max free sz | 1042 | 1051 | 愈大愈好 | 
| MicroPython 1.26.0 | 單例模式 | 全域變數 | 說明 | 
|---|---|---|---|
| Free Heap RAM (bytes) | 138544 | 138928 | 愈大愈好 | 
| max new split | 106496 | 106496 | 愈大愈好 | 
| No. of 1-blocks | 319 | 320 | 愈小愈好 | 
| No. of 2-blocks | 90 | 91 | 愈小愈好 | 
| max blk sz | 72 | 72 | 愈大愈好 | 
| max free sz | 908 | 924 | 愈大愈好 | 
因為綜合來看,IdsServer 使用全域變數方案時,在記憶體使用上看似更好,所以先前才會讓其以全域變數存在。
如先前所述,這些測試都只是一種參考而已,有時一點小變動,就會使結果完全反轉。所以除非真有必要,不要太執著於此,而是先專注於整體的規劃。
BleTxScheduler 類別有個成員變數 _queue_ntf_ind,它負責儲存要執行 Notify 或 Indicate 所需的資訊,現在就是將這些資訊儲存到 _queue_ntf_ind:
def add(self, type: int, fn, value_handle, arg):
    # 佇列項目型態為 tuple。
    # type: 指示此操作是 Notify 或 Indicate。
    # fn: 執行 Notify/Indicate 的實作。
    #     fn(conn_handle: int | None, value_handle: int, arg) -> bool
    # value_handle: 相關 Characteristic 的 handle
    # arg: fn 需要的參數。
    # 此函數只能在主執行緒被呼叫,否則將有同步問題。
    self._queue_ntf_ind.append((type, fn, value_handle, arg))
咱們讓此函數只能在主執行緒被執行,亦即不能在中斷函數裡呼叫。因為 Notify 和 Indicate 都是 GATT Server 主動送出訊息,所以本就不必特地在中斷裡去呼叫。當然,是可以設計成在計時器中斷裡送出 Notify / Indicate,不過為了讓計時器中斷處理程序可以儘快完成,所以會用 micropython.schedule() 來安排執行。
為什麼 fn 所接收的參數是 conn_handle、value_handle 和 arg 呢?其實這是依賴於 ble.stack.indicate(conn_handle, value_handle, data) 和 ble.stack.notify(conn_handle, value_handle, data)。那為什麼是用 arg 這個名稱,而不是 data 呢?這是因為咱們預期 fn 會利用 arg 來創建要傳輸的資料。也就是 fn 會先藉由 arg 建立傳送的資料後,再使用 ble.stack.indicate() 或 ble.stack.notify() 傳送資料。
接下來就是讓排程器不斷由佇列裡取出要送出的項目,若是 Indicate,就等待 _ind_done_flag 因收到 _IRQ_GATTS_INDICATE_DONE 而解除等待:
async def run(self):
    """因 _queue_ntf_ind 只會在主執行緒被存取,所以無須同步。"""
    while True:
        while self._queue_ntf_ind:
            type, fn, value_handle, arg = self._queue_ntf_ind.popleft()
            successful = fn(_conn_handle, value_handle, arg)
            if successful and type == ACT_INDICATE:
                try:
                    await asyncio.wait_for(self._ind_done_flag.wait(), 5)
                except asyncio.TimeoutError:
                    common.logger.write("Indicate timeout!")
        await asyncio.sleep_ms(100)
def _ble_isr(self, event, data):
    if event == _IRQ_GATTS_INDICATE_DONE:
        self._ind_done_flag.set()
可能有看官會覺得為什麼要在佇列為空的時候,使用 sleep_ms()?這樣不是浪費執行片段嗎?其實完全可以如下這樣做:
def __init__(self, size=8) -> None:
    # 用來通知 _queue_ntf_ind 不為空
    self._new_item_event = asyncio.Event()
def add(self, type: int, fn, value_handle, arg):
    self._queue_ntf_ind.append((type, fn, value_handle, arg))
    self._new_item_event.set()
async def run(self):
    """因 _queue_ntf_ind 只會在主執行緒被存取,所以無須同步。"""
    while True:
        while self._queue_ntf_ind:
            ...
        # 如果沒有資料,就等到有資料為止
        self._new_item_event.clear()
        await self._new_item_event.wait()
但這只能在 micropython 1.26.0 運作,在 micropython 1.25.0 會無法啟用 run() 協程,這可能是 micropython 1.25.0 的問題。
因 BleTxScheduler 已是單例,所以只要如下使用即可:
class IdsServer(ble.stack.Server):
    async def run(self):
        await ble.stack.BleTxScheduler().run()
async def test():
    for i in range(8):
        data = bytearray(1)
        data[0] = i + 1
        ble.stack.BleTxScheduler().add(
            ble.stack.ACT_INDICATE,
            indicate,
            instance._idd_status_changed.value_handle,
            data,
        )
Notify / Indicate 排程完成了,明天咱們就能將它應用到 IDD Statys Changed characteristic 上。