iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0
Software Development

以MicroPython在ESP32上實作Insulin Delivery Service系列 第 27

Day 27 - IDD Command Control Point - 啟用 Bolus

  • 分享至 

  • xImage
  •  

昨天咱們為 IDD Status Reader Control Point 增添了 Get Active Bolus IDsGet Active Bolus Delivery 兩功能,但因系統未曾啟用 Bolus,所以無法讀取到有意義的訊息。因此,今天咱們就來讓 Bolus 運作起來。

1. 注射模式

IDS 支援三種 Bolus 注射模式:

Type Strategy
Fast 以一個系統固有速率來注射胰島素
Extended 以使用者設定的胰島素劑量與指定的持續時間來注射胰島素
Multiwave 先以 Fast 策略來注射,完成後,再以 Extended 策略注射

由上表可以知道,對於胰島素的注射,只需實現兩種策略:Fast 和 Extended,而 Multiwave 只要組合 Fast 和 Extended 兩種策略就好。咱們可以使用策略模式來實作,如:

class Delivery:
    def deliver(self, timestamp: int, bolus, fn_consume) -> bool:
        # 回傳值:
        # True:  傳輸中
        # False: 已結束
        raise NotImplementedError

    def finish(self):
        pass

class Bolus:
    def __init__(self):
        # 儲存傳輸策略
        # 愈晚執行的項目愈早加入
        self._deliveries: list[Delivery] | None = None

    def deliver(self, timestamp: int, fn_consume):
        # 每秒呼叫一次

        while self._deliveries:
            # 取得優先序最高的傳輸策略
            d = self._deliveries[-1]

            if d.deliver(timestamp, self, fn_consume):
                # 傳輸中
                return
            else:
                # 此傳輸策略已完成
                self._deliveries.pop()
                d.finish(self)

        self.finish()

然後實作延時和傳輸策略:

class DelayDelivery(Delivery):
    def deliver(self, timestamp: int, bolus: Bolus, fn_consume) -> bool:
        # 每秒呼叫一次。
        # 若 Delay Remaining Time 大於 0,則消耗 1 秒鐘的量,並回傳 True;否則回傳 False。

        if bolus.delay_remaining == 0:
            return False

        bolus.state = core.insulin.consts.BOLUS_STATE_DELAY
        bolus.delay_remaining -= 1

        return True

class FastDelivery(Delivery):
    def __init__(self, iu_per_sec: float):
        self._iu_per_sec = common.fixedfloat.FixedFloat(iu_per_sec)

    def deliver(self, timestamp: int, bolus: Bolus, fn_consume) -> bool:
        # 每秒呼叫一次。
        # 若 Fast Remaining Amount 大於 0,則消耗 1 秒鐘的量,並回傳 True;否則回傳 False。

        if bolus.fast_remaining == common.fixedfloat.FixedFloat.ZERO:
            return False

        bolus.state = core.insulin.consts.BOLUS_STATE_DELIVERING

        if bolus.fast_remaining >= self._iu_per_sec:
            amount = self._iu_per_sec
        else:
            amount = bolus.fast_remaining

        amount = fn_consume(timestamp, amount)

        if amount != common.fixedfloat.FixedFloat.ZERO:
            if bolus.start_timestamp is None:
                bolus.start_timestamp = timestamp

            bolus.fast_remaining -= amount

            common.eventbus.publish(
                core.events.EVENT_BOLUS_DELIVERING, timestamp, amount
            )

        return True

這裡的實作都是平鋪直敘,較需要說明的是:

  • fn_consume(timestamp, amount)
    fn_consume 表示一個函數變數,它的作用是從藥劑瓶取出指定藥劑量來注射,回傳值是實際取出的份量,因為有可能目前藥劑剩餘量已不足。
  • 送出事件 EVENT_BOLUS_DELIVERING
    此事件表示此次傳輸的 Bolus 劑量。因為 IDS 規定,每當累計達某劑量時(由廠商定義),必須傳送 Active Bolus Status ChangedIDD Status Changed
    此行為是可以在 DeliveryBolus 類別裡處理,但為了不讓類別間依賴太緊密,所以交由 Event Bus 機制處置。

2. Fast Bolus

為了方便建立 Fast Bolus,咱們建立一個輔助類別:

class FastBolus(Bolus):
    def __init__(
        self,
        created_timestamp: int,
        amount: float,
        iu_per_min: float,
        *,
        delay: int | None = None,
        template: int | None = None,
        activation: int | None = None,
        reason: int | None = None,
    ):
        # delay: unit is minutes

        super().__init__()

        self.created_timestamp = created_timestamp
        self.type = core.insulin.consts.BOLUS_TYPE_FAST
        self.fast_programmed = common.fixedfloat.FixedFloat(amount)
        self.fast_remaining = common.fixedfloat.FixedFloat(self.fast_programmed)
        self.template = template
        self.activation = activation

        self._set_flags(
            delay=delay, template=template, activation=activation, reason=reason
        )

        delivery = FastDelivery(iu_per_min / 60)

        if delay:
            self.delay_programmed = delay * 60
            self.delay_remaining = self.delay_programmed
            self._deliveries = [delivery, DelayDelivery()]
        else:
            self._deliveries = [delivery]
  • iu_per_min 表示每分鐘施打多少 IU。
  • FastDelivery 所需參數是每秒施打多少 IU,所以須將 iu_per_min 除以 60。
  • delay 不為 0 時,表示須延後多少分鐘才能施打胰島素,而 self.delay_programmed 表示延後多少,所以在指定給 self.delay_programmed 前,須先將 delay 乘以 60
  • 因需要先延後才施打,而 self._deliveries 所儲存的項目是排列在愈後面的愈早執行,所以將 DelayDelivery() 放在最後面。

3. 啟用 Bolus

為了啟動 Bolus,在 InsulinManager 增加以下方法:

class InsulinManager:
    def activate_bolus(self, timestamp: int, bolus: core.insulin.bolus.Bolus):
        bolus.id = self._get_next_id()

        if bolus.delay_programmed:
            bolus.state = core.insulin.consts.BOLUS_STATE_DELAY
        else:
            bolus.state = core.insulin.consts.BOLUS_STATE_DELIVERING

        self._boluses.append(bolus)

        common.eventbus.publish(
            core.events.EVENT_IDD_STATUS_ADDING,
            core.events.ACTIVE_BOLUS_STATUS_CHANGED,
        )

activate_bolus() 將新的 bolus 資訊更新後,將它加入 self._boluses,然後通知狀態已更新。

而實際傳輸 bolus,則是由 _deliver_boluses() 負責:

class InsulinManager:
    def _deliver_boluses(self, timestamp: int):
        updated_boluses = []

        for bolus in self._boluses:
            if bolus.is_active():
                bolus.deliver(timestamp, self._consume_reservoir)

            if bolus.is_active():
                updated_boluses.append(bolus)
            else:
                common.eventbus.publish(
                    core.events.EVENT_IDD_STATUS_ADDING,
                    core.events.TOTAL_DAILY_INSULIN_STATUS_CHANGED,
                )

        self._boluses = updated_boluses

傳給 bolus.deliver()_consume_reservoir 會取出需求的藥劑量:

class InsulinManager:
    def _consume_reservoir(
        self, timestamp: int, amount: common.fixedfloat.FixedFloat
    ) -> common.fixedfloat.FixedFloat:
        # 嘗試從 reservoir 取出 amount,並回傳實際使用量。

        if self._config.reservoir_remaining >= amount:
            self._config.reservoir_remaining -= amount
        else:
            amount = self._config.reservoir_remaining
            self._config.reservoir_remaining.set_value(0)

        if amount != common.fixedfloat.FixedFloat.ZERO:
            common.eventbus.publish(
                core.events.EVENT_RESERVOIR_REMAINING_CHANGED,
                timestamp,
                amount,
                self._config.reservoir_remaining,
            )

        return amount

4. 啟動 InsulinManager

InsulinManager 類別的工作已大致完成,_deliver_boluses() 是啟動 bolus 的關鍵,但由誰呼叫它呢?咱們讓 InsulinManager 提供一個與外界的溝通介面:

class InsulinManager:
    def deliver(self, timestamp: int):
        """每秒呼叫一次。"""
        self._deliver_boluses(timestamp)

為什麼不讓外界直接呼叫 _deliver_boluses() 呢?那是因為若未來支援 Basal Rate,也會在 InsulinManager.deliver() 呼叫相關程式。而且,通常裝置必須處在某種狀態下才能傳輸胰島素,所以會統一在 InsulinManager.deliver() 裡判斷後才呼叫注射函數。

然後在 ble/server.py 加入:

class IdsServer(ble.stack.Server):
    async def run(self):
        timer = machine.Timer(0, period=1000, callback=_on_timer)
        await ble.stack.BleTxScheduler().run()

def _on_timer(timer):
    ble.global_var.timestamp += 1
    micropython.schedule(_on_timestamp_changed, ble.global_var.timestamp)

def _on_timestamp_changed(timestamp: int):
    asyncio.create_task(_on_timestamp_changed_async(timestamp))

async def _on_timestamp_changed_async(timestamp: int):
    _insulin_mgr.deliver(timestamp)

這樣每秒都會呼叫 InsulinManager.deliver(),就能更新 bolus 的狀態了。

5. IDD Command Control Point

最後,咱們來實作 IDD Command Control Point 的 Set Bolus 指令。

因為 IDD Command Control Point 和 IDD Status Reader Control Point 的結構完全一樣,所以可以直接複製昨天的 IddStatusReaderCPE2EIddStatusReaderCP,改變類別名和 UUID,將不屬於 IDD Command Control Point 的指令刪除後,即可使用。

先看 Set Bolus 指令的 Operand 的欄位結構:
https://ithelp.ithome.com.tw/upload/images/20250827/20177799ujOwFeLBsK.png

然後是回應的 Operand 結構:
https://ithelp.ithome.com.tw/upload/images/20250827/20177799UJBFFQVTQN.png

因為只是將 operand 的欄位拆解出來,所以不列出拆解過程:

class IddCommandCP(BaseCP):
    async def _on_set_bolus(self, timestamp: int, operand: bytes):
        def _respond(buf: bytearray | memoryview, bolus_id: int) -> int:
            common.utils.write_uint16(buf, 0, _SET_BOLUS_RESPONSE)
            common.utils.write_uint16(buf, 2, bolus.id)
            return 4

        ...

        try:
            if bolus_type == core.insulin.consts.BOLUS_TYPE_FAST:
                bolus = core.insulin.bolus.FastBolus(
                    timestamp,
                    fast_amount,
                    self._config.fast_iu_per_min,
                    delay=delay,
                    template=template_number,
                    activation=activation_type,
                    reason=flags,
                )

            elif bolus_type == core.insulin.consts.BOLUS_TYPE_EXTENDED:
                bolus = core.insulin.bolus.ExtendedBolus(
                    timestamp,
                    extended_amount,
                    duration,
                    delay=delay,
                    template=template_number,
                    activation=activation_type,
                    reason=flags,
                )

            else:
                bolus = core.insulin.bolus.MultiwaveBolus(
                    timestamp,
                    fast_amount,
                    self._config.fast_iu_per_min,
                    extended_amount,
                    duration,
                    delay=delay,
                    template=template_number,
                    activation=activation_type,
                    reason=flags,
                )

            self._insulin_mgr.activate_bolus(timestamp, bolus)

            ble.stack.BleTxScheduler().add(
                ble.stack.ACT_INDICATE,
                self.send_data,
                self.value_handle,
                (_respond, bolus.id),
            )

        finally:
            self.reset_run_flag()

如此,當咱們將 IddCommandCPE2EIddCommandCP 加入 IDS 後,GATT Client 即可藉由 Set Bolus 指令使相應的 bolus 運作。


上一篇
Day 26 - 給 IDD Status Reader Control Point 掛上點果實吧 (2) 之 Bolus
下一篇
Day 28 - 留下足跡吧 - History Data
系列文
以MicroPython在ESP32上實作Insulin Delivery Service31
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言