iT邦幫忙

2025 iThome 鐵人賽

DAY 25
0
Software Development

以MicroPython在ESP32上實作Insulin Delivery Service系列 第 25

Day 25 - 給 IDD Status Reader Control Point 掛上點果實吧 (1)

  • 分享至 

  • xImage
  •  

昨天咱們已經把 IDD Status Reader Control Point 的架構完成了,但它對所有輸入的資料都是回傳 Op Code not supported,就像光禿禿的樹一樣。所以現在就來實作一些指令,為 IDD Status Reader Control Point 增添實質功能吧。

雖然昨天咱們已經看過了,但在開始前,還是複習一下 IDD Status Reader Control Point 的指令格式:
https://ithelp.ithome.com.tw/upload/images/20250825/20177799wZg2Mzs3ps.png

因為咱們已經將 BaseCPE2ECPMixin 完成了,IddStatusReaderCP 的基礎也已奠定好,所以要實作特定 Op Code,就只是在 _on_opcode() 方法裡,指派協程去執行而已,比如:

class IddStatusReaderCP(BaseCP):
    def _on_opcode(self, data: bytes):
        mv_data = memoryview(data)
        opcode = mv_data[0] | mv_data[1] << 8
        operand = self._get_operand(mv_data)

        if opcode == _RESET_STATUS:
            asyncio.create_task(self._on_reset_status(operand))

        elif opcode == _GET_ACTIVE_BOLUS_IDS:
            asyncio.create_task(self._on_get_active_bolus_ids(operand))

        elif opcode == _GET_ACTIVE_BOLUS_DELIVERY:
            asyncio.create_task(self._on_get_active_bolus_delivery(operand))

        elif opcode == _GET_TOTAL_DAILY_INSULIN_STATUS:
            asyncio.create_task(self._on_get_total_daily_insulin_status(operand))

        else:
            asyncio.create_task(self._on_not_supported_opcode(opcode))

其他 IDS 的 Control Point 也基本上都是使用此形式來設計。

1. Reset Status

Reset Status 指令的 Operand 欄位如下:
https://ithelp.ithome.com.tw/upload/images/20250825/20177799cWjgcMFe29.png

這裡的 Flags 就是 IDD Status Changed 所使用的欄位:

Flags fit Bit Definition
0 Therapy Control State Changed
1 Operational State Changed
2 Reservoir Status Changed
3 Annunciation Status Changed
4 Total Daily Insulin Status Changed
5 Active Basal Rate Status Changed
6 Active Bolus Status Changed
7 History Event Recorded
All other bits RFU

因為在第 18 天的 Event Bus 裡,IddStatusChanged 類別已經完成了事件 EVENT_IDD_STATUS_RESETTING 的實作,所以要實現 Reset Status 這指令非常簡單:

async def _on_reset_status(self, operand: bytes):
    try:
        if not self._check_condition(
            len(operand) == 2, _RESET_STATUS, _INVALID_OPERAND
        ):
            return

        flags = operand[1] << 8 | operand[0]

        if not self._check_condition(
            flags & 0xFF00 == 0, _RESET_STATUS, _INVALID_OPERAND
        ):
            return

        common.eventbus.publish(core.events.EVENT_IDD_STATUS_RESETTING, flags)

        self._respond_error(_RESET_STATUS, _SUCCESS)

    finally:
        state = machine.disable_irq()
        ble.global_var.is_cp_in_progress = False
        machine.enable_irq(state)
  1. 首先咱們判斷 operand 的資料長度是否為 2 bytes,若是,則繼續執行;否則回應錯誤碼 Invalid Operand
  2. 然後取得要重設的 flags 值,並且檢查 flags 是否在規定範圍內,若否(即 RFU 位元被設定),則回應 Invalid Operand
  3. 接著咱們送出事件 EVENT_IDD_STATUS_RESETTING,表示要重設 flags 所指定的狀態。
  4. 最後送出回應 Success
  5. 在離開方法前,要將 is_cp_in_progress 設為 False,表示 Control Point 的行為已結束。

2. Get Total Daily Insulin Status

此指令沒有 operand,而它的回應結構中,operand 會包含每天的 Bolus、Basal、和二者總和的注射量。要注意的是,因 SFLOAT 精度的原因,總和欄位未必一定等於 Bolus 和 Basal 二欄位的總合:
https://ithelp.ithome.com.tw/upload/images/20250825/20177799RwudaKRuyv.png

為了實現這個指令,咱們先在 Config 類別增加兩個變數:

class Config:
    def __init__(self):
        # 已累積的每天 Bolus 施打量
        # Unit: IU
        self.total_daily_bolus_delivered = common.fixedfloat.FixedFloat(0)

        # 已累積的每天 Basal 施打量
        # Unit: IU
        self.total_daily_basal_delivered = common.fixedfloat.FixedFloat(0)

    def to_dict(self):
        return {
            ...

            "total_daily_bolus_delivered": self.total_daily_bolus_delivered.to_json(),
            "total_daily_basal_delivered": self.total_daily_basal_delivered.to_json(),
        }

    @classmethod
    def from_dict(cls, d: dict):
        ...

        obj.total_daily_bolus_delivered = common.fixedfloat.FixedFloat.from_json(
            d["total_daily_bolus_delivered"]
        )
        obj.total_daily_basal_delivered = common.fixedfloat.FixedFloat.from_json(
            d["total_daily_basal_delivered"]
        )

        return obj

雖然咱們是可以在此指令裡直接使用 Config.total_daily_bolus_deliveredConfig.total_daily_bolus_delivered,但為了統一管理,會建立 InsulinManager 類別來管理胰島素相關操作:

class InsulinManager:
    def __init__(self, config: config.Config):
        self._config = config

    def get_total_daily_insulin_status(self):
        return (
            float(self._config.total_daily_bolus_delivered),
            float(self._config.total_daily_basal_delivered),
            float(
                self._config.total_daily_bolus_delivered
                + self._config.total_daily_basal_delivered
            ),
        )

若已遺忘 FixedFloat,或不明白為什麼它可以執行相加動作的看官,請參考第 19 天的 避免浮點數誤差

材料都已準備好了,那就來實現 Get Total Daily Insulin Status 這指令吧:

async def _on_get_total_daily_insulin_status(self, operand: bytes):
    try:
        if not self._check_condition(
            len(operand) == 0, _GET_TOTAL_DAILY_INSULIN_STATUS, _INVALID_OPERAND
        ):
            return

        bolus, basal, total = self._insulin_mgr.get_total_daily_insulin_status()

        ble.stack.BleTxScheduler().add(
            ble.stack.ACT_INDICATE,
            self.send_data,
            self.value_handle,
            (_respond, bolus, basal, total),
        )

    finally:
        state = machine.disable_irq()
        ble.global_var.is_cp_in_progress = False
        machine.enable_irq(state)
  1. 首先,還是先檢查指令的 operand 欄位是否符合規定,若不是,則回應錯誤碼 Invalid Operand
  2. 然後由 InsulinManager.get_total_daily_insulin_status() 取得每天胰島素的相關注射劑量。
  3. 最後以 (回應函數, 參數 1, 參數 2 ...) 的形式將 (_respond, bolus, basal, total) 加入到 BleTxScheduler,讓系統送出 Indicate。
    忘了 BleTxScheduler 的看官,請參考第 14 天的 Notify & Indicate 排程器
  4. 離開此方法前,不要忘了重設 ble.global_var.is_cp_in_progress。如果討厭每次都要寫那麼多一樣的程式的話,就定義一個方法來處理這件事吧。

最後來看如何組成回應的內容 _respond()

    def _respond(
        buf: bytearray | memoryview,
        bolus: float,
        basal: float,
        total: float,
    ) -> int:
        common.utils.write_uint16(buf, 0, _GET_TOTAL_DAILY_INSULIN_STATUS_RESPONSE)

        t = common.sfloat.float_to_sfloat(bolus)
        common.utils.write_uint16(buf, 2, t)

        t = common.sfloat.float_to_sfloat(basal)
        common.utils.write_uint16(buf, 4, t)

        t = common.sfloat.float_to_sfloat(total)
        common.utils.write_uint16(buf, 6, t)

        return 8

_respond() 只是簡單照著規格書上的欄位來存放資料而已,可能有疑惑的是:

Get Total Daily Insulin Status 小節一開始的欄位組成圖不是只有 3 個欄位嗎?怎麼 _respond() 一開始要先存放 _GET_TOTAL_DAILY_INSULIN_STATUS_RESPONSE 這個東西?

那是因為小節一開始給的是 Operand 的欄位圖,而 _GET_TOTAL_DAILY_INSULIN_STATUS_RESPONSEOp Code。此章一開始的整體指令組成有說明,回應的資料開頭必須包含 Op Code


雖然 Get Total Daily Insulin Status 目前讀回的欄位都是 0,也沒有將相關資訊在子夜時重設,但本喵會在之後的說明中完成 ... 可能 ...也許 ... 希望 ... 能講到 ... 吧 ...
(|||゚д゚)


上一篇
Day 24 - 建立 Control Point 基礎類別
下一篇
Day 26 - 給 IDD Status Reader Control Point 掛上點果實吧 (2) 之 Bolus
系列文
以MicroPython在ESP32上實作Insulin Delivery Service31
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言