昨天咱們製作了 Reset Status 和 Get Total Daily Insulin Status 兩個指令,今天咱們來設計 IDS 的核心功能:Get Active Bolus IDs 和 Get Active Bolus Delivery。
在實作之前,先簡單說明一下 Bolus 是什麼:
簡單地說,Bolus 是在用餐階段注射,針對食物或血糖修正的短期快速給藥。
有了概念後,來看看 Bolus 有哪些欄位:
| 欄位 | 描述 | 
|---|---|
| Flags | 指示 Delay Time、Template Number 或 Activation Type 是否存在,以及此 Bolus 存在的理由 | 
| ID | Bolus 獨一無二的編號 | 
| Type | Bolus 的種類,因應用途分為 Fast、Extended 和 Multiwave | 
| Fast Amount | Fast 的劑量 | 
| Extended Amount | Extended 的劑量 | 
| Duration | Extended 的注射總時長 | 
| Delay Time | 在開始注射前的閒置時間 | 
| Template Number | 使用的 Bolus 範本編號 | 
| Activation Type | Bolus 注射的額外資訊,比如注射是因 使用者自訂 或 系統建議 | 
咱們先定義 Bolus 使用到的欄位:
class Bolus:
    def __init__(self):
        self.created_timestamp = 0
        self.start_timestamp: int | None = None
        self.state = core.insulin.consts.BOLUS_STATE_IDLE
        self.flags = 0
        self.id = 0
        self.type = 0
        # Unit: IU
        self.fast_programmed: common.fixedfloat.FixedFloat | None = None
        self.fast_remaining: common.fixedfloat.FixedFloat | None = None
        self.extended_programmed: common.fixedfloat.FixedFloat | None = None
        self.extended_remaining: common.fixedfloat.FixedFloat | None = None
        # Unit: seconds
        self.duration_programmed: int | None = None
        self.duration_remaining: int | None = None
        self.delay_programmed: int | None = None
        self.delay_remaining: int | None = None
        self.template: int | None = None
        self.activation: int | None = None
        # 儲存傳輸策略
        self._deliveries: list | None = None
看官會發現裡面有 4 種使用到以 _programmed 和 _remaining 命名的變數,咱們當然可以將它們改為類別,以更好管理。
Bolus 類別裡有些成員變數不在規格書的記載內,咱們會等使用到時再解釋。
Get Active Bolus IDs 指令沒有 operand,而它回應的 operand 格式如下:
此指令會回應目前處於活動狀態的 Bolus ID,所以咱們在 Bolus 類別裡增加以下方法:
class Bolus:
    def is_active(self) -> bool:
        return self.state > BOLUS_STATE_IDLE
BOLUS_STATE_CANCELLED = micropython.const(-2)
BOLUS_STATE_FINISHED = micropython.const(-1)
BOLUS_STATE_IDLE = micropython.const(0)
BOLUS_STATE_DELAY = micropython.const(1)
BOLUS_STATE_DELIVERING = micropython.const(2)
咱們設計了幾個狀態值,讓處於活動狀態的值都是大於 0,所以可以簡單地判斷 Bolus 是否進行中。
接著在 InsulinManager 類別新增 get_active_bolus_ids():
class InsulinManager:
    def __init__(self, config: config.Config):
        self._boluses: list[core.insulin.bolus.Bolus] = []
    def get_active_bolus_ids(self):
        return tuple(b.id for b in self._boluses if b.is_active())
那麼可以開始實作 Get Active Bolus IDs 了:
async def _on_get_active_bolus_ids(self, operand: bytes):
    try:
        if not self._check_condition(
            self._is_bolus_supported,
            _GET_ACTIVE_BOLUS_IDS,
            _OP_CODE_NOT_SUPPORTED,
        ):
            return
        if not self._check_condition(
            len(operand) == 0, _GET_ACTIVE_BOLUS_IDS, _INVALID_OPERAND
        ):
            return
        ids = self._insulin_mgr.get_active_bolus_ids()
        ble.stack.BleTxScheduler().add(
            ble.stack.ACT_INDICATE,
            self.send_data,
            self.value_handle,
            (_respond, ids),
        )
    finally:
        self.reset_run_flag()
Op Code not supported
operand 的長度InsulinManager.get_active_bolus_ids() 得到活躍中的 Bolus IDis_cp_in_progress,但不像昨天是直接設定,而是呼叫特定方法來完成對於 Bolus 支援度的檢查,可以在 Config 類別新增以下內容:
class Config:
    def _refresh(self, flags):
        self.is_fast_bolus_supported = bool(flags & 0x0020)
        self.is_extended_bolus_supported = bool(flags & 0x0040)
        self.is_multiwave_bolus_supported = bool(flags & 0x0080)
    def is_bolus_supported(self) -> bool:
        return (
            self.is_fast_bolus_supported
            or self.is_extended_bolus_supported
            or self.is_multiwave_bolus_supported
        )
至於回應內容的建構如下:
def _respond(buf: bytearray | memoryview, ids: tuple[int, ...]) -> int:
    # IDS 規定只傳回最舊的 7 個 Bolus ID
    bolus_cnt = min(len(ids), 7)
    common.utils.write_uint16(buf, 0, _GET_ACTIVE_BOLUS_IDS_RESPONSE)
    buf[2] = bolus_cnt
    i = 3
    for j in range(bolus_cnt):
        common.utils.write_uint16(buf, i, ids[j])
        i += 2
    return i
7 個i 不只計算 Bolus ID 要存放的位置,也表示最後資料的長度。Get Active Bolus Delivery 是根據 Bolus ID 和 Bolus Value Selection 來取得對應的胰島素劑量,它的 operand 定義如下:
Bolus Value Selection:
| Description | Value | 
|---|---|
| Programmed | 0x0F | 
| Remaining | 0x33 | 
| Delivered | 0x3C | 
| RFU | All not already defined Hamming code values within 0x00 and 0xFF | 
| Prohibited | All other values | 
因要根據 Bolus ID 來取得對應的 Bolus,所以在 InsulinManager 增加以下方法:
class InsulinManager:
    def get_active_bolus(self, id):
        return next(
            (b for b in self._boluses if b.is_active() and b.id == id),
            None
        )
已經取得 Bolus 了,那麼來看看要回應什麼:
為了回應此指令,咱們根據定義在 Bolus 新增方法:
class Bolus:
    def to_bytes(self, buf: bytearray | memoryview, offset: int, selection: int) -> int:
        data_len = 10
        buf[offset] = self.flags
        common.utils.write_uint16(buf, offset + 1, self.id)
        buf[offset + 3] = self.type
        if selection == core.insulin.consts.BOLUS_SELECTION_PROGRAMMED:
            ...
        elif selection == core.insulin.consts.BOLUS_SELECTION_REMAINING:
            ...
        elif selection == core.insulin.consts.BOLUS_SELECTION_DELIVERED:
            ...
        else:
            return 0
        if self.template is not None:
            buf[offset + data_len] = self.template
            data_len += 1
        if self.activation is not None:
            buf[offset + data_len] = self.activation
            data_len += 1
        return data_len
Bolus.to_bytes() 只是單純地填寫資料,沒有需要說明之處。而 Get Active Bolus Delivery 就和之前的流程一樣:
async def _on_get_active_bolus_delivery(self, operand: bytes):
    try:
        if not self._check_condition(
            self._is_bolus_supported,
            _GET_ACTIVE_BOLUS_DELIVERY,
            _OP_CODE_NOT_SUPPORTED,
        ):
            return
        if not self._check_condition(
            len(operand) == 3, _GET_ACTIVE_BOLUS_DELIVERY, _INVALID_OPERAND
        ):
            return
        id = operand[1] << 8 | operand[0]
        bolus = self._insulin_mgr.get_active_bolus(id)
        if not self._check_condition(
            bolus is not None, _GET_ACTIVE_BOLUS_DELIVERY, _PROCEDURE_NOT_APPLICABLE
        ):
            return
        selection = operand[2]
        ble.stack.BleTxScheduler().add(
            ble.stack.ACT_INDICATE,
            self.send_data,
            self.value_handle,
            (_respond, bolus, selection),
        )
    finally:
        self.reset_run_flag()
唯一須要特別說明的是:
Procedure not applicable