iT邦幫忙

2025 iThome 鐵人賽

DAY 22
0
Software Development

以MicroPython在ESP32上實作Insulin Delivery Service系列 第 22

Day 22 - 怒吼吧!IDD Annunciation Status!

  • 分享至 

  • xImage
  •  

1. 警示通知的事件

咱們昨天終於邁進 AnnunciationManager 的第一步 —— 取得最高優先權的警示通知了,但咱們並不清楚 警示通知AnnunciationManagerIDD Annunciation Status 之間到底有什麼情愛糾葛,所以讓咱們先釐清他們之間的關係吧:
https://ithelp.ithome.com.tw/upload/images/20250822/20177799r9GeSFssBk.png

  1. 某時某地 P 捅了婁子,觸發了警示通知 A
  2. P 慌忙地向 AnnunciationManager 通報有新的出包 A 了
  3. AnnunciationManager 無可奈何下把 A 吃下肚
  4. AnnunciationManager 反芻後,把最緊急的出包 T 吐向 IDD Annunciation Status
  5. IDD Annunciation Status 收到 T 後,把 T 狠狠地砸向 GATT Client

看官一臉嫌棄:這什麼噁心說明 ... (´π`)

總之呢 ~ AnnunciationManager 類別的責任有 3 個:

  1. 儲存警示通知
  2. 更新警示通知的狀態
  3. 將最高優先權的警示通知交由 IDD Annunciation Status 送出

上面流程可以很容易地轉換為程式碼,比如:

def trigger():
    annunc = create_some_annunc()
    annunc_mgr.add(annunc)

class AnnunciationManager:
    def add(self, annunc):
        annunc_fired.append(annunc)
        top = self.get_top_priority_annunc()
        annunc_status.send(top)

class IddAnnunciationStatus:
    def send(self, annunc):
        ble.indicate(annunc)

非常直觀的設計,但組件間彼此的關係太緊密了。還記得第 18 天說的 Event Bus 嗎?為了降低組件間的相依性,由事件觸發者送出訊息,訂閱者則處理訊息,發送者和訂閱者彼此互不知道對方的存在。咱們當然也可以將此應用在警示通知系統上,首先定義事件:

# 新的 annunciation 產生
# args:
# annunc: BaseAnnunciation
EVENT_ANNUNCIATION_ADDING = 2

# 送出 annunciation
# args:
# annunc: BaseAnnunciation
EVENT_ANNUNCIATION_SENDING = 3

然後因為 AnnunciationManager 要接收新的警示通知,所以要讓它訂閱 EVENT_ANNUNCIATION_ADDING

class AnnunciationManager:
    def __init__(self, config: config.Config):
        self._config = config

        common.eventbus.subscribe(
            core.events.EVENT_ANNUNCIATION_ADDING, self._on_annunciation_adding
        )

    def _on_annunciation_adding(self, annunc: core.annunc.base.BaseAnnunciation):
        """增加新的 annunciation。
        注意:
        若 annunc.flags 包含 ANNUNCIATION_PRESENT,
        即使 annunc.id 不為 0,
        此函數都將重設 annunciation.id。"""

        if annunc.flags and annunc.flags & core.annunc.consts.ANNUNCIATION_PRESENT != 0:
            annunc.id = self._get_next_id()

        self._config.annunc_fired.append(annunc)
        top = self.get_top_priority_annunc()

        if top is not None and top.id == annunc.id:
            common.eventbus.publish(core.events.EVENT_ANNUNCIATION_SENDING, top)
            common.eventbus.publish(
                core.events.EVENT_IDD_STATUS_ADDING,
                core.events.ANNUNCIATION_STATUS_CHANGED,
            )
  • 處理事件 EVENT_ANNUNCIATION_ADDING 的是 _on_annunciation_adding()
  • _on_annunciation_adding() 除了將警示通知儲存起來,若新的項目具有最高優先權,還會送出兩個事件:
    • EVENT_ANNUNCIATION_SENDING,由 IDD Annunciation Status 送出警報通知。
    • EVENT_IDD_STATUS_ADDING,由 IDD Status Changed 處理警報通知的狀態已改變。

2. 送出外部匯入的警示通知

AnnunciationManager 已經把新增警示通知完成了,接下來可以把匯入的警示通知 Config.annunc_ready 一一加入到 AnnunciationManager

class AnnunciationManager:
    def __init__(self, config: config.Config):
        self._ready_idx = 0

    def _send_ready_annuncs(self, sys_timestamp: int):
        cnt = len(self._config.annunc_ready)

        while self._ready_idx < cnt:
            annunc = self._config.annunc_ready[self._ready_idx]

            if annunc.fired_timestamp < 0:
                self._ready_idx += 1

            elif annunc.fired_timestamp <= sys_timestamp:
                common.eventbus.publish(core.events.EVENT_ANNUNCIATION_ADDING, annunc)
                self._ready_idx += 1

            else:
                break

此方法的設計概念是:只要警示通知的觸發時間不小於 0,且小於等於目前的系統計數器,就會將其加入到 AnnunciationManager

因為 _send_ready_annuncs() 需要傳入系統計數器,所以在 AnnunciationManager 加入一個方法,讓外部能在每次更新計數器時呼叫:

class AnnunciationManager:
    def refresh(self, sys_timestamp: int):
        # 送出使用者測試用的 Annunciation
        self._send_ready_annuncs(sys_timestamp)

3. IDD Annunciation Status

因為咱們已經設計過 IDD Status Changed 和 IDD Status 這 2 個和 IDD Annunciation Status 有一樣存取屬性的 characteristics 了,所以要設計 IDD Annunciation Status 應該是非常簡單:

class IddAnnunciationStatus(
    ble.mixin.IndicateMixin, ble.mixin.ReadMixin, ble.stack.Characteristic
):
    def __init__(self, annunc_mgr: core.annunc.manager.AnnunciationManager):
        ble.stack.Characteristic.__init__(self, 0x2B22, read=True, indicate=True)
        ble.mixin.IndicateMixin.__init__(self)

        self._annunc_mgr = annunc_mgr

        common.eventbus.subscribe(
            core.events.EVENT_ANNUNCIATION_SENDING, self._on_annunciation_sending
        )

    def _build_indicate_payload(
        self, buf: bytearray | memoryview, arg: core.annunc.base.BaseAnnunciation
    ) -> int:
        return arg.to_bytes(buf)

    def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
        annunc = self._annunc_mgr.get_top_priority_annunc()

        if annunc:
            return annunc.to_bytes(buf)
        else:
            buf[0] = 0
            return 1

    def _on_annunciation_sending(self, annunc: core.annunc.base.BaseAnnunciation):
        ble.stack.BleTxScheduler().add(
            ble.stack.ACT_INDICATE, self.send_data, self.value_handle, annunc
        )
  • 使用 _on_annunciation_sending() 來處理事件 EVENT_ANNUNCIATION_SENDING
  • _build_read_rsp() 會傳回優先權最高的警示通知的位元組型式,若沒有警示通知,則傳回 [0]
  • _build_indicate_payload() 會傳回警示通知的位元組型式

為了更好的使用 Indicate,咱們將 Day 15 - IDD Status Changed (3) 之 上 Indicate BuffIndicateMixin 改為:

# For Notify and Indicate
_nibuf = bytearray(20)
_nibuf_mv = memoryview(_nibuf)

class IndicateMixin:
    def send_data(self, conn_handle: int | None, value_handle: int, arg):
        if conn_handle is not None and self._ind_enabled:
            n = self._build_indicate_payload(_nibuf_mv, arg)
            self._after_build_tx_data()
            ble.stack.indicate(conn_handle, value_handle, _nibuf_mv[:n])
            return True

        return False

    def _build_indicate_payload(self, buf: bytearray | memoryview, arg) -> int:
        return 0

4. 建立測試用的設定檔

Day 05 - 製作外部設定檔 裡,咱們已經設計過 build_config.py 來建置設定檔,所以可以修改它來加入測試用的警示通知的設定檔,例如:

def _build_config():
    c = config.Config()

    c.annunc_ready.append(BaseAnnunciation(BATTERY_LOW, fired_timestamp=30))
    c.annunc_ready.append(
        TemperatureAnnunciation(
            27.5,
            TemperatureAnnunciation.FLAGS_UNIT_CELSIUS,
            TemperatureAnnunciation.CONTEXT_DEVICE,
            fired_timestamp=50,
        )
    )
    c.annunc_ready.append(BaseAnnunciation(OCCLUSION_DETECTED, fired_timestamp=70))

    c.annunc_fired.append(BaseAnnunciation(BATTERY_FULL, id=20001))
    c.annunc_fired.append(BaseAnnunciation(RESERVOIR_EMPTY, id=20002))

    with open(config.CONFIG_PATH, "w", encoding="utf-8") as fp:
        # MicroPython 不支援這些命名參數
        json.dump(c.to_dict(), fp, ensure_ascii=False, indent=4)

咱們加入 Battery Low、Temperature 和 Occlusion Detected 到未觸發列表裡,以及 Battery Full 和 Reservoir Empty 到已觸發列表中,然後執行 _build_config() 後,會得到如下內容:

{
    "annunc_snoozing_time": 180,
    "annunc_ready": [
        {
            "flags": 1,
            "id": 0,
            "type": 195,
            "status": 51,
            "fired_timestamp": 30,
            "remaining_snoozed_time": 0
        },
        {
            "flags": 7,
            "id": 0,
            "type": 869,
            "status": 51,
            "fired_timestamp": 50,
            "remaining_snoozed_time": 0,
            "value": 27.5,
            "temperature_flags": 1,
            "context": 15,
            "lower_bound": null,
            "upper_bound": null
        },
        {
            "flags": 1,
            "id": 0,
            "type": 60,
            "status": 51,
            "fired_timestamp": 70,
            "remaining_snoozed_time": 0
        }
    ],
    "annunc_fired": [
        {
            "flags": 1,
            "id": 20001,
            "type": 240,
            "status": 51,
            "fired_timestamp": -1,
            "remaining_snoozed_time": 0
        },
        {
            "flags": 1,
            "id": 20002,
            "type": 90,
            "status": 51,
            "fired_timestamp": -1,
            "remaining_snoozed_time": 0
        }
    ]
}

5. 執行

已觸發警示通知是:

Type ID EMWR
Battery Full 20001 Reminder
Reservoir Empty 20002 Maintenance

而待發送的通知為:

Type ID EMWR Fired Timestamp
Battery Low 20003 Warning 30
Temperature 20004 Reminder 50
Occlusion Detected 20005 Maintenance 70

當執行以上程式時會發現:

  1. 一開始讀取 IDD Annunciation Status 時,傳回的是 ID 為 20002 的 Reservoir Empty,而非 ID 為 20001 的 Battery Full。這是因為 Maintenance 的優先權高於 Reminder。
  2. 在時間為 30 和 50 秒時,應該要觸發 Battery Low 和 Temperature,但都沒有 Indicate 送出,即使讀取,也依然是 Reservoir Empty,這也是因為 Maintenance 高於 Warning 和 Reminder 的關係。
  3. 在時間為 70 秒時,收到 Occlusion Detected 和 IDD Status Changed 的 Indicate,這是因為雖然都是 Maintenance,但 Occlusion Detected 的觸發時間更晚,優先權更高。

總算把 IDS 必備的 5 個 characteristics 中的 4 個寫完了,進度完全落後 ing ...
明天應該可以進入 IDD Status Reader Control Point 了 ... 吧 ... ㄟ ... 完全沒把握
(>﹏<)


上一篇
Day 21 - Temperature 通知 & 優先權管理
下一篇
Day 23 - 命令 IDS Server 做事 - Write Control Point
系列文
以MicroPython在ESP32上實作Insulin Delivery Service31
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言