iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0
Software Development

以MicroPython在ESP32上實作Insulin Delivery Service系列 第 24

Day 24 - 建立 Control Point 基礎類別

  • 分享至 

  • xImage
  •  

昨天咱們已經有一個 IDD Status Reader Control Point 的雛形了,雖然可以沿著這雛形設計下去,但是 IDD Status Reader Control Point、IDD Command Control Point 和 IDD Record Access Control Point 都具有相似的命令結構,比如以下是 IDD Status Reader Control Point 的命令格式:
https://ithelp.ithome.com.tw/upload/images/20250824/20177799M7Al0aGCJq.png

為了能減少重複邏輯的實作,所以先設計一個 IDS 專用的基礎 Control Point 類別來處理這些事情。

1. 基礎 Control Point

class BaseCP(ble.mixin.WriteMixin, ble.mixin.IndicateMixin, ble.stack.Characteristic):
    def __init__(self, uuid: int | str):
        ble.stack.Characteristic.__init__(self, uuid, write=True, indicate=True)
        ble.mixin.IndicateMixin.__init__(self)

    def _check_att_error(self, data: bytes) -> int:
        if ble.global_var.is_cp_in_progress:
            return ble.consts.ATT_ERR_PROCEDURE_ALREADY_IN_PROGRESS

        elif not self._ind_enabled:
            return ble.consts.ATT_ERR_CCCD_IMPROPERLY_CONFIGURED

        elif len(data) < 2:
            return ble.consts.ATT_ERR_VALUE_NOT_ALLOWED

        else:
            return 0

    def _get_operand(self, data: bytes) -> bytes:
        return data[2:]

    def _respond_error(self, opcode: int, error_code: int):
        ble.stack.BleTxScheduler().add(
            ble.stack.ACT_INDICATE,
            self.send_data,
            self.value_handle,
            (self._build_response_code_payload, opcode, error_code),
        )

    def _build_response_code_payload(
        self, buf: bytearray | memoryview, opcode: int, error_code: int
    ) -> int:
        raise NotImplementedError

    def _build_indicate_payload(self, buf: bytearray | memoryview, arg) -> int:
        fn, *args = arg
        return fn(buf, *args)
  • 為了得到指令的 operand 部分,咱們定義了 _get_operand()
  • 因為 Control Point 時常需要使用 Response Code 來回應,所以設計了 _respond_error()_build_response_code_payload()
  • 因每個 Control Point 的 Response Code 的實作不相同,所以 _build_response_code_payload() 須由子類別負責設計
  • 覆寫 IndicateMixin._build_indicate_payload() 來建立每個指令要回覆的訊息。

然後建立 IDD Status Reader Control Point 類別:

class IddStatusReaderCP(BaseCP):
    def __init__(self):
        super().__init__(0x2B24)
        self._fn_on_opcode = self._on_opcode

    def _parse_write_data(self, data: bytes):
        ble.global_var.is_cp_in_progress = True
        micropython.schedule(self._fn_on_opcode, data)

    def _on_opcode(self, data: bytes):
        common.logger.write(
            f"IDD Status Reader CP: {common.utils.array_to_hex_str(data)}"
        )

        opcode = data[0] | data[1] << 8
        asyncio.create_task(self._on_not_supported_opcode(opcode))

    async def _on_not_supported_opcode(self, opcode: int):
        self._respond_error(opcode, _OP_CODE_NOT_SUPPORTED)

        state = machine.disable_irq()
        ble.global_var.is_cp_in_progress = False
        machine.enable_irq(state)

    def _build_response_code_payload(
        self, buf: bytearray | memoryview, opcode: int, error_code: int
    ) -> int:
        common.utils.write_uint16(buf, 0, _RESPONSE_CODE)
        common.utils.write_uint16(buf, 2, opcode)
        buf[4] = error_code
        return 5
  • _parse_write_data() 是在 ISR 裡被呼叫,而 ISR 裡不應做耗時的作業,所以使用 micropython.schedule() 呼叫 _on_opcode(),讓其執行 Op Code 的相關作業。
  • micropython.schedule() 安排的函數也不能長時間占用執行時間,而有些 Op Code 可能會執行較久,所以安排協程來處理。
  • 在協程存取 ble.global_var.is_cp_in_progress 前,須先禁用中斷,因為此變數會在 ISR 裡被存取。
  • 如果能確保 ble.global_var.is_cp_in_progress = False 是原子操作,且在中斷函數裡只是讀取,那便不須要停用中斷。

那麼,現在咱們的 IDS Server 若與 nRF Connect app 進行寫入 IDD Status Reader Control Point 的測試,只要傳送任何大於 1 byte 的指令,IDS Server 都會回應 Op Code Not Supported 的訊息。

2. Control Point with E2E-Protection

現在為 IddStatusReaderCP 類別增加 E2E-Protection 的支援。先來看架構說明:

# IndicateMixin   WriteMixin
#           \      /
#            BaseCP
#              |
#       Control Point    E2ECPMixin = E2ETxMixin + E2ERxMixin
#              |         /
#          E2E Control Point
#
# IndicateMixin:
#     負責資料的組成和送出
#
# WriteMixin:
#     負責接收資料與檢查 ATT Error
#
# BaseCP:
#     IDS 所有 Control Point 的基礎類別
#
# Control Point:
#     執行 Op Code 的指令與送出回應
#
# E2ECPMixin:
#     實作部分 BaseCP 的功能,以加強 E2E 特有部分。
#     因若讓 E2ECPMixin 繼承 BaseCP 再覆寫,會產生繼承鏈的混亂。
#     維護寫入與傳送資料的 Counter 與 CRC 的計算
#     E2E 相關 ATT Error 的檢查
#     取出非 E2E 部分的資料
#     將送出的資料附加上 E2E 保護機制
#
# E2E Control Point:
#     將 Control Point 送出的資料附加上 E2E 保護機制

首先製作 E2ERxMixin

class E2ERxMixin:
    def __init__(self):
        self._rx_counter = ble.e2e.RxCounter()
        ble.stack.register_irq_handler(self._isr_e2e_rx_mixin)

    def _after_write(self):
        self._rx_counter.inc_counter()

    def check_rx_counter(self, received_counter: int) -> bool:
        return self._rx_counter.check(received_counter)

    def _isr_e2e_rx_mixin(self, event, data):
        if event == _IRQ_CENTRAL_DISCONNECT:
            self._rx_counter.reset()

它和第 12 天介紹的 E2ETxMixin 基本是一樣的,只是多了檢查 E2E-Counter 的部分。

接下來定義 E2ECPMixin

class E2ECPMixin(ble.mixin.E2ETxMixin, ble.mixin.E2ERxMixin):
    def __init__(self):
        ble.mixin.E2ERxMixin.__init__(self)
        ble.mixin.E2ETxMixin.__init__(self)

    def _check_att_error(self, data: bytes) -> int:
        data_len = len(data)

        if data_len < 5:
            return ble.consts.ATT_ERR_VALUE_NOT_ALLOWED

        crc = data[-1] << 8 | data[-2]
        target = ble.e2e.Crc.calc_crc(data, data_len - 2)

        if crc != target:
            return ble.consts.ATT_ERR_INVALID_CRC

        if not self.check_rx_counter(data[-3]):
            return ble.consts.ATT_ERR_INVALID_COUNTER

        return 0

    def _get_operand(self, data: bytes) -> bytes:
        return data[2:-3]

    def _fill_tx_e2e(self, buf: bytearray | memoryview, data_len: int) -> int:
        return ble.ids.utils.fill_e2e(buf, data_len, self._tx_counter)
  • 因為 E2E-Protection 有自己的 ATT Error 檢查機制,所以定義了 _check_att_error()
  • 因為 E2E-Protection 所使用的 operand 的範圍不同,所以須定義 _get_operand()
  • 為了在建構好的資料上覆加 E2E-Protection,所以定義 _fill_tx_e2e()

那麼,最後咱們來建立 E2E-Protection 版的 IDD Status Reader Control Point:

class E2EIddStatusReaderCP(E2ECPMixin, IddStatusReaderCP):
    def __init__(self):
        IddStatusReaderCP.__init__(self)
        E2ECPMixin.__init__(self)

    def _check_att_error(self, data: bytes) -> int:
        att_err = IddStatusReaderCP._check_att_error(self, data)
        if att_err:
            return att_err

        att_err = E2ECPMixin._check_att_error(self, data)
        if att_err:
            return att_err

        return 0

    def _build_indicate_payload(self, buf: bytearray | memoryview, arg) -> int:
        return self._fill_tx_e2e(buf, super()._build_indicate_payload(buf, arg))

如此,當啟用 E2E-Protection 後,便可以收到相應的資料了。


上一篇
Day 23 - 命令 IDS Server 做事 - Write Control Point
下一篇
Day 25 - 給 IDD Status Reader Control Point 掛上點果實吧 (1)
系列文
以MicroPython在ESP32上實作Insulin Delivery Service31
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言