iT邦幫忙

2025 iThome 鐵人賽

DAY 19
0
Software Development

以MicroPython在ESP32上實作Insulin Delivery Service系列 第 19

Day 19 - 傳輸 IDD Status & 避免浮點數誤差

  • 分享至 

  • xImage
  •  

既然是胰島素輸送裝置,那最重要的資訊當然是目前藥劑量剩餘多少,以及裝置是否在運行等狀態了。而負責傳輸這些資訊的就是 IDD Status,今天咱們就來完成它吧~

1. 資料欄位

IDD Status 的欄位組成如下:
https://ithelp.ithome.com.tw/upload/images/20250819/20177799lxJQBCs2MX.png

欄位 說明
Therapy Control State 指示現在是否在施打藥劑,如 Stop、Pause 和 Run
Operational State 指示裝置的運作狀態,如 Standby、Priming、Waiting 和 Ready
Reservoir Remaining Amount 目前剩餘藥劑量
Flags 指示藥劑容器是否已經與裝置連接

為了方便測試,咱們把 IDD Status 所需欄位放在 Config 類別裡:

class Config:
    def __init__(self):
        self.idd_status_therapy_ctrl_state = ble.ids.consts.THERAPY_STATE_STOP
        self.idd_status_operational_state = ble.ids.consts.OPERATIONAL_STATE_OFF
        self.idd_status_flags = ble.ids.consts.IDD_STATUS_FLAGS_RESERVOIR_ATTACHED

        # 目前 reservoir 殘餘量
        # Unit: IU
        self.reservoir_remaining = common.fixedfloat.FixedFloat(200)

除了 reservoir_remaining 外,其餘變數的型態都是 int,那為什麼 reservoir_remaining 的型態是 FixedFloat?規格書的型態不是 SFLOAT 嗎?就算 SFLOAT 不適合人類閱讀,但不是用 float 就好了嗎?

其實不使用 float 是為了避免精度誤差導致的不一致,舉一個假設的例子(此範例未必會在現實中發生):

reservoir_remaining 為 1.0 IU,假設每次減少 0.2 IU:

次序 reservoir_remaining 預期
1 0.8 0.8
2 0.6 0.6
3 0.400001 0.4
4 0.200001 0.2
5 0.000001 0.0

這在胰島素注射時的一些狀態判斷會產生一些問題。當然,要解決是很容易,比如:

if abs(reservoir_remaining - some_target) < SOME_THRESHOLD:
    # do something

只是即使包裝成函數,每次都要特別呼叫,也是蠻瑣碎的。而且在一些情境下,如列印,也要特別處理,因此才會設計 FixedFloat

除了增加這些變數,不要忘了與 JSON 之間的轉換前須先轉為字典型式:

class Config:
    def to_dict(self):
        return {
            ...

            "idd_status_therapy_ctrl_state": self.idd_status_therapy_ctrl_state,
            "idd_status_operational_state": self.idd_status_operational_state,
            "idd_status_flags": self.idd_status_flags,
            "reservoir_remaining": self.reservoir_remaining.to_json(),
        }

    @classmethod
    def from_dict(cls, d: dict):
        ...

        obj.idd_status_therapy_ctrl_state = d["idd_status_therapy_ctrl_state"]
        obj.idd_status_operational_state = d["idd_status_operational_state"]
        obj.idd_status_flags = d["idd_status_flags"]
        obj.reservoir_remaining = common.fixedfloat.FixedFloat.from_json(
            d["reservoir_remaining"]
        )

        ...

2. FixedFloat

因為咱們希望精度是到小數點第 7 位,所以會對傳進的 intfloat 型態的數值乘上 10^7

_FLOAT_SCALE = 10_000_000

class FixedFloat:
    @classmethod
    def _to_raw(cls, value) -> int:
        if isinstance(value, FixedFloat):
            return value._raw

        elif isinstance(value, int):
            return value * _FLOAT_SCALE

        elif isinstance(value, float):
            return math.floor(value * _FLOAT_SCALE + 0.5)

        else:
            raise TypeError("Unsupported operand type")

    def __init__(self, value):
        self._raw = self._to_raw(value)

在咱們的 IDS server 裡,因會對其進行加減法,所以必須覆寫其相關方法:

class FixedFloat:
    @classmethod
    def _from_raw(cls, raw: int):
        obj = cls(0)
        obj._raw = raw
        return obj

    def __add__(self, other):
        return self._from_raw(self._raw + self._to_raw(other))

    def __sub__(self, other):
        return self._from_raw(self._raw - self._to_raw(other))

    def __iadd__(self, other):
        self._raw += self._to_raw(other)
        return self

    def __isub__(self, other):
        self._raw -= self._to_raw(other)
        return self

接著為其加上比較運算子:

class FixedFloat:
    def __eq__(self, other):
        return self._raw == self._to_raw(other)

    def __lt__(self, other):
        return self._raw < self._to_raw(other)

    def __le__(self, other):
        return self._raw <= self._to_raw(other)

    def __gt__(self, other):
        return self._raw > self._to_raw(other)

    def __ge__(self, other):
        return self._raw >= self._to_raw(other)

    def __ne__(self, other):
        return self._raw != self._to_raw(other)

FixedFloat 是以 float 型態被使用,所以須覆寫 __float__(),且其實體會在 JSON 格式之間轉換,所以定義了 to_json()from_json()

class FixedFloat:
    def __float__(self):
        return self._raw / _FLOAT_SCALE

    def to_json(self):
        return float(self)

    @classmethod
    def from_json(cls, data: float):
        return cls(data)

3. IDD Status 類別

先前咱們已經製作過 IDD Status Changed 類別了,而 IDD Status 基本和其一樣:

class IddStatus(ble.mixin.IndicateMixin, ble.mixin.ReadMixin, ble.stack.Characteristic):
    def __init__(self, config: config.Config):
        ble.stack.Characteristic.__init__(self, 0x2B21, read=True, indicate=True)
        ble.mixin.IndicateMixin.__init__(self)

        self._config = config

    def _build_indicate_payload(self, arg) -> bytes:
        n = self._build_read_rsp(ble.mixin._rbuf_mv)
        return bytes(ble.mixin._rbuf_mv[:n])

    def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
        buf[0] = self._config.idd_status_therapy_ctrl_state
        buf[1] = self._config.idd_status_operational_state

        remaining = common.sfloat.float_to_sfloat(
            float(self._config.reservoir_remaining)
        )
        common.utils.write_uint16(buf, 2, remaining)

        buf[4] = self._config.idd_status_flags

        return 5

class E2EIddStatus(ble.mixin.E2ETxMixin, IddStatus):
    def __init__(self, config: config.Config):
        ble.mixin.E2ETxMixin.__init__(self)
        IddStatus.__init__(self, config)

    def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
        data_len = super()._build_read_rsp(buf)
        buf[data_len] = self._tx_counter.value
        ble.e2e.Crc.fill_crc(buf, data_len + 1, data_len + 3)
        return data_len + 3

大概很多看官對 E2EIddStatusE2EIddStatusChanged 幾乎一樣,尤其是 _build_read_rsp(),會看得很不順眼吧!資料後面需要加上 E2E-Counter 和 E2E-CRC 的 characteristic 共有 7 個(IDD Features 和 IDD History Data 所需的 E2E-Protection 欄位有點不同),若每次都要重寫一樣的程式碼,既浪費記憶體,又容易出錯,所以可以用一個函數來做一樣的事,比如:

def fill_e2e(buf: bytearray | memoryview, fn_get_data, counter: ble.e2e.Counter) -> int:
    # 將 E2E-Counter 和 E2E-CRC 附加到資料後面,適用:
    # IDD Status Changed、
    # IDD Status、
    # IDD Annunciation Status、
    # IDD Status Reader Control Point、
    # IDD Command Control Point、
    # IDD Command Data、
    # IDD Record Access Control Point。

    # fn_get_data(buf) -> int
    # 將資料填充到 buf,並傳回資料長度

    data_len = fn_get_data(buf)
    buf[data_len] = counter.value
    ble.e2e.Crc.fill_crc(buf, data_len + 1, data_len + 3)

    return data_len + 3

使用時如下:

def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
    return ble.ids.utils.fill_e2e(buf, super()._build_read_rsp, self._tx_counter)

但每個 characteristic 還是需要完全一樣的程式片段,至於這樣是否有比較好,或有更好的做法,就各抒己見吧。


上一篇
Day 18 - Event Bus - 讓 IDD Status Changed 優雅地執行吧
下一篇
Day 20 - 警告!警告!警告!基礎 Annunciation!
系列文
以MicroPython在ESP32上實作Insulin Delivery Service31
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言