昨天咱們為 IDD Status Reader Control Point 增添了 Get Active Bolus IDs
和 Get Active Bolus Delivery
兩功能,但因系統未曾啟用 Bolus,所以無法讀取到有意義的訊息。因此,今天咱們就來讓 Bolus 運作起來。
IDS 支援三種 Bolus 注射模式:
Type | Strategy |
---|---|
Fast | 以一個系統固有速率來注射胰島素 |
Extended | 以使用者設定的胰島素劑量與指定的持續時間來注射胰島素 |
Multiwave | 先以 Fast 策略來注射,完成後,再以 Extended 策略注射 |
由上表可以知道,對於胰島素的注射,只需實現兩種策略:Fast 和 Extended,而 Multiwave 只要組合 Fast 和 Extended 兩種策略就好。咱們可以使用策略模式來實作,如:
class Delivery:
def deliver(self, timestamp: int, bolus, fn_consume) -> bool:
# 回傳值:
# True: 傳輸中
# False: 已結束
raise NotImplementedError
def finish(self):
pass
class Bolus:
def __init__(self):
# 儲存傳輸策略
# 愈晚執行的項目愈早加入
self._deliveries: list[Delivery] | None = None
def deliver(self, timestamp: int, fn_consume):
# 每秒呼叫一次
while self._deliveries:
# 取得優先序最高的傳輸策略
d = self._deliveries[-1]
if d.deliver(timestamp, self, fn_consume):
# 傳輸中
return
else:
# 此傳輸策略已完成
self._deliveries.pop()
d.finish(self)
self.finish()
然後實作延時和傳輸策略:
class DelayDelivery(Delivery):
def deliver(self, timestamp: int, bolus: Bolus, fn_consume) -> bool:
# 每秒呼叫一次。
# 若 Delay Remaining Time 大於 0,則消耗 1 秒鐘的量,並回傳 True;否則回傳 False。
if bolus.delay_remaining == 0:
return False
bolus.state = core.insulin.consts.BOLUS_STATE_DELAY
bolus.delay_remaining -= 1
return True
class FastDelivery(Delivery):
def __init__(self, iu_per_sec: float):
self._iu_per_sec = common.fixedfloat.FixedFloat(iu_per_sec)
def deliver(self, timestamp: int, bolus: Bolus, fn_consume) -> bool:
# 每秒呼叫一次。
# 若 Fast Remaining Amount 大於 0,則消耗 1 秒鐘的量,並回傳 True;否則回傳 False。
if bolus.fast_remaining == common.fixedfloat.FixedFloat.ZERO:
return False
bolus.state = core.insulin.consts.BOLUS_STATE_DELIVERING
if bolus.fast_remaining >= self._iu_per_sec:
amount = self._iu_per_sec
else:
amount = bolus.fast_remaining
amount = fn_consume(timestamp, amount)
if amount != common.fixedfloat.FixedFloat.ZERO:
if bolus.start_timestamp is None:
bolus.start_timestamp = timestamp
bolus.fast_remaining -= amount
common.eventbus.publish(
core.events.EVENT_BOLUS_DELIVERING, timestamp, amount
)
return True
這裡的實作都是平鋪直敘,較需要說明的是:
fn_consume(timestamp, amount)
fn_consume
表示一個函數變數,它的作用是從藥劑瓶取出指定藥劑量來注射,回傳值是實際取出的份量,因為有可能目前藥劑剩餘量已不足。EVENT_BOLUS_DELIVERING
Active Bolus Status Changed
的 IDD Status Changed
。Delivery
或 Bolus
類別裡處理,但為了不讓類別間依賴太緊密,所以交由 Event Bus
機制處置。為了方便建立 Fast Bolus,咱們建立一個輔助類別:
class FastBolus(Bolus):
def __init__(
self,
created_timestamp: int,
amount: float,
iu_per_min: float,
*,
delay: int | None = None,
template: int | None = None,
activation: int | None = None,
reason: int | None = None,
):
# delay: unit is minutes
super().__init__()
self.created_timestamp = created_timestamp
self.type = core.insulin.consts.BOLUS_TYPE_FAST
self.fast_programmed = common.fixedfloat.FixedFloat(amount)
self.fast_remaining = common.fixedfloat.FixedFloat(self.fast_programmed)
self.template = template
self.activation = activation
self._set_flags(
delay=delay, template=template, activation=activation, reason=reason
)
delivery = FastDelivery(iu_per_min / 60)
if delay:
self.delay_programmed = delay * 60
self.delay_remaining = self.delay_programmed
self._deliveries = [delivery, DelayDelivery()]
else:
self._deliveries = [delivery]
iu_per_min
表示每分鐘
施打多少 IU。FastDelivery
所需參數是每秒
施打多少 IU,所以須將 iu_per_min
除以 60。delay
不為 0
時,表示須延後多少分鐘
才能施打胰島素,而 self.delay_programmed
表示延後多少秒
,所以在指定給 self.delay_programmed
前,須先將 delay
乘以 60
。self._deliveries
所儲存的項目是排列在愈後面的愈早執行,所以將 DelayDelivery()
放在最後面。為了啟動 Bolus,在 InsulinManager
增加以下方法:
class InsulinManager:
def activate_bolus(self, timestamp: int, bolus: core.insulin.bolus.Bolus):
bolus.id = self._get_next_id()
if bolus.delay_programmed:
bolus.state = core.insulin.consts.BOLUS_STATE_DELAY
else:
bolus.state = core.insulin.consts.BOLUS_STATE_DELIVERING
self._boluses.append(bolus)
common.eventbus.publish(
core.events.EVENT_IDD_STATUS_ADDING,
core.events.ACTIVE_BOLUS_STATUS_CHANGED,
)
activate_bolus()
將新的 bolus 資訊更新後,將它加入 self._boluses
,然後通知狀態已更新。
而實際傳輸 bolus,則是由 _deliver_boluses()
負責:
class InsulinManager:
def _deliver_boluses(self, timestamp: int):
updated_boluses = []
for bolus in self._boluses:
if bolus.is_active():
bolus.deliver(timestamp, self._consume_reservoir)
if bolus.is_active():
updated_boluses.append(bolus)
else:
common.eventbus.publish(
core.events.EVENT_IDD_STATUS_ADDING,
core.events.TOTAL_DAILY_INSULIN_STATUS_CHANGED,
)
self._boluses = updated_boluses
傳給 bolus.deliver()
的 _consume_reservoir
會取出需求的藥劑量:
class InsulinManager:
def _consume_reservoir(
self, timestamp: int, amount: common.fixedfloat.FixedFloat
) -> common.fixedfloat.FixedFloat:
# 嘗試從 reservoir 取出 amount,並回傳實際使用量。
if self._config.reservoir_remaining >= amount:
self._config.reservoir_remaining -= amount
else:
amount = self._config.reservoir_remaining
self._config.reservoir_remaining.set_value(0)
if amount != common.fixedfloat.FixedFloat.ZERO:
common.eventbus.publish(
core.events.EVENT_RESERVOIR_REMAINING_CHANGED,
timestamp,
amount,
self._config.reservoir_remaining,
)
return amount
在 InsulinManager
類別的工作已大致完成,_deliver_boluses()
是啟動 bolus 的關鍵,但由誰呼叫它呢?咱們讓 InsulinManager
提供一個與外界的溝通介面:
class InsulinManager:
def deliver(self, timestamp: int):
"""每秒呼叫一次。"""
self._deliver_boluses(timestamp)
為什麼不讓外界直接呼叫 _deliver_boluses()
呢?那是因為若未來支援 Basal Rate
,也會在 InsulinManager.deliver()
呼叫相關程式。而且,通常裝置必須處在某種狀態下才能傳輸胰島素,所以會統一在 InsulinManager.deliver()
裡判斷後才呼叫注射函數。
然後在 ble/server.py
加入:
class IdsServer(ble.stack.Server):
async def run(self):
timer = machine.Timer(0, period=1000, callback=_on_timer)
await ble.stack.BleTxScheduler().run()
def _on_timer(timer):
ble.global_var.timestamp += 1
micropython.schedule(_on_timestamp_changed, ble.global_var.timestamp)
def _on_timestamp_changed(timestamp: int):
asyncio.create_task(_on_timestamp_changed_async(timestamp))
async def _on_timestamp_changed_async(timestamp: int):
_insulin_mgr.deliver(timestamp)
這樣每秒都會呼叫 InsulinManager.deliver()
,就能更新 bolus 的狀態了。
最後,咱們來實作 IDD Command Control Point 的 Set Bolus
指令。
因為 IDD Command Control Point 和 IDD Status Reader Control Point 的結構完全一樣,所以可以直接複製昨天的 IddStatusReaderCP
和 E2EIddStatusReaderCP
,改變類別名和 UUID
,將不屬於 IDD Command Control Point 的指令刪除後,即可使用。
先看 Set Bolus 指令的 Operand 的欄位結構:
然後是回應的 Operand 結構:
因為只是將 operand
的欄位拆解出來,所以不列出拆解過程:
class IddCommandCP(BaseCP):
async def _on_set_bolus(self, timestamp: int, operand: bytes):
def _respond(buf: bytearray | memoryview, bolus_id: int) -> int:
common.utils.write_uint16(buf, 0, _SET_BOLUS_RESPONSE)
common.utils.write_uint16(buf, 2, bolus.id)
return 4
...
try:
if bolus_type == core.insulin.consts.BOLUS_TYPE_FAST:
bolus = core.insulin.bolus.FastBolus(
timestamp,
fast_amount,
self._config.fast_iu_per_min,
delay=delay,
template=template_number,
activation=activation_type,
reason=flags,
)
elif bolus_type == core.insulin.consts.BOLUS_TYPE_EXTENDED:
bolus = core.insulin.bolus.ExtendedBolus(
timestamp,
extended_amount,
duration,
delay=delay,
template=template_number,
activation=activation_type,
reason=flags,
)
else:
bolus = core.insulin.bolus.MultiwaveBolus(
timestamp,
fast_amount,
self._config.fast_iu_per_min,
extended_amount,
duration,
delay=delay,
template=template_number,
activation=activation_type,
reason=flags,
)
self._insulin_mgr.activate_bolus(timestamp, bolus)
ble.stack.BleTxScheduler().add(
ble.stack.ACT_INDICATE,
self.send_data,
self.value_handle,
(_respond, bolus.id),
)
finally:
self.reset_run_flag()
如此,當咱們將 IddCommandCP
與 E2EIddCommandCP
加入 IDS 後,GATT Client 即可藉由 Set Bolus
指令使相應的 bolus 運作。