咱們昨天終於邁進 AnnunciationManager
的第一步 —— 取得最高優先權的警示通知了,但咱們並不清楚 警示通知
和 AnnunciationManager
與 IDD Annunciation Status
之間到底有什麼情愛糾葛,所以讓咱們先釐清他們之間的關係吧:
AnnunciationManager
通報有新的出包 A 了AnnunciationManager
無可奈何下把 A 吃下肚AnnunciationManager
反芻後,把最緊急的出包 T 吐向 IDD Annunciation Status
IDD Annunciation Status
收到 T 後,把 T 狠狠地砸向 GATT Client看官一臉嫌棄:這什麼噁心說明 ... (´π`)
總之呢 ~ AnnunciationManager
類別的責任有 3 個:
上面流程可以很容易地轉換為程式碼,比如:
def trigger():
annunc = create_some_annunc()
annunc_mgr.add(annunc)
class AnnunciationManager:
def add(self, annunc):
annunc_fired.append(annunc)
top = self.get_top_priority_annunc()
annunc_status.send(top)
class IddAnnunciationStatus:
def send(self, annunc):
ble.indicate(annunc)
非常直觀的設計,但組件間彼此的關係太緊密了。還記得第 18 天說的 Event Bus 嗎?為了降低組件間的相依性,由事件觸發者送出訊息,訂閱者則處理訊息,發送者和訂閱者彼此互不知道對方的存在。咱們當然也可以將此應用在警示通知系統上,首先定義事件:
# 新的 annunciation 產生
# args:
# annunc: BaseAnnunciation
EVENT_ANNUNCIATION_ADDING = 2
# 送出 annunciation
# args:
# annunc: BaseAnnunciation
EVENT_ANNUNCIATION_SENDING = 3
然後因為 AnnunciationManager
要接收新的警示通知,所以要讓它訂閱 EVENT_ANNUNCIATION_ADDING
:
class AnnunciationManager:
def __init__(self, config: config.Config):
self._config = config
common.eventbus.subscribe(
core.events.EVENT_ANNUNCIATION_ADDING, self._on_annunciation_adding
)
def _on_annunciation_adding(self, annunc: core.annunc.base.BaseAnnunciation):
"""增加新的 annunciation。
注意:
若 annunc.flags 包含 ANNUNCIATION_PRESENT,
即使 annunc.id 不為 0,
此函數都將重設 annunciation.id。"""
if annunc.flags and annunc.flags & core.annunc.consts.ANNUNCIATION_PRESENT != 0:
annunc.id = self._get_next_id()
self._config.annunc_fired.append(annunc)
top = self.get_top_priority_annunc()
if top is not None and top.id == annunc.id:
common.eventbus.publish(core.events.EVENT_ANNUNCIATION_SENDING, top)
common.eventbus.publish(
core.events.EVENT_IDD_STATUS_ADDING,
core.events.ANNUNCIATION_STATUS_CHANGED,
)
EVENT_ANNUNCIATION_ADDING
的是 _on_annunciation_adding()
_on_annunciation_adding()
除了將警示通知儲存起來,若新的項目具有最高優先權,還會送出兩個事件:
EVENT_ANNUNCIATION_SENDING
,由 IDD Annunciation Status 送出警報通知。EVENT_IDD_STATUS_ADDING
,由 IDD Status Changed 處理警報通知的狀態已改變。AnnunciationManager
已經把新增警示通知完成了,接下來可以把匯入的警示通知 Config.annunc_ready
一一加入到 AnnunciationManager
:
class AnnunciationManager:
def __init__(self, config: config.Config):
self._ready_idx = 0
def _send_ready_annuncs(self, sys_timestamp: int):
cnt = len(self._config.annunc_ready)
while self._ready_idx < cnt:
annunc = self._config.annunc_ready[self._ready_idx]
if annunc.fired_timestamp < 0:
self._ready_idx += 1
elif annunc.fired_timestamp <= sys_timestamp:
common.eventbus.publish(core.events.EVENT_ANNUNCIATION_ADDING, annunc)
self._ready_idx += 1
else:
break
此方法的設計概念是:只要警示通知的觸發時間不小於 0,且小於等於目前的系統計數器,就會將其加入到 AnnunciationManager
。
因為 _send_ready_annuncs()
需要傳入系統計數器,所以在 AnnunciationManager
加入一個方法,讓外部能在每次更新計數器時呼叫:
class AnnunciationManager:
def refresh(self, sys_timestamp: int):
# 送出使用者測試用的 Annunciation
self._send_ready_annuncs(sys_timestamp)
因為咱們已經設計過 IDD Status Changed 和 IDD Status 這 2 個和 IDD Annunciation Status 有一樣存取屬性的 characteristics 了,所以要設計 IDD Annunciation Status 應該是非常簡單:
class IddAnnunciationStatus(
ble.mixin.IndicateMixin, ble.mixin.ReadMixin, ble.stack.Characteristic
):
def __init__(self, annunc_mgr: core.annunc.manager.AnnunciationManager):
ble.stack.Characteristic.__init__(self, 0x2B22, read=True, indicate=True)
ble.mixin.IndicateMixin.__init__(self)
self._annunc_mgr = annunc_mgr
common.eventbus.subscribe(
core.events.EVENT_ANNUNCIATION_SENDING, self._on_annunciation_sending
)
def _build_indicate_payload(
self, buf: bytearray | memoryview, arg: core.annunc.base.BaseAnnunciation
) -> int:
return arg.to_bytes(buf)
def _build_read_rsp(self, buf: bytearray | memoryview) -> int:
annunc = self._annunc_mgr.get_top_priority_annunc()
if annunc:
return annunc.to_bytes(buf)
else:
buf[0] = 0
return 1
def _on_annunciation_sending(self, annunc: core.annunc.base.BaseAnnunciation):
ble.stack.BleTxScheduler().add(
ble.stack.ACT_INDICATE, self.send_data, self.value_handle, annunc
)
_on_annunciation_sending()
來處理事件 EVENT_ANNUNCIATION_SENDING
_build_read_rsp()
會傳回優先權最高的警示通知的位元組型式,若沒有警示通知,則傳回 [0]_build_indicate_payload()
會傳回警示通知的位元組型式為了更好的使用 Indicate,咱們將 Day 15 - IDD Status Changed (3) 之 上 Indicate Buff 的 IndicateMixin
改為:
# For Notify and Indicate
_nibuf = bytearray(20)
_nibuf_mv = memoryview(_nibuf)
class IndicateMixin:
def send_data(self, conn_handle: int | None, value_handle: int, arg):
if conn_handle is not None and self._ind_enabled:
n = self._build_indicate_payload(_nibuf_mv, arg)
self._after_build_tx_data()
ble.stack.indicate(conn_handle, value_handle, _nibuf_mv[:n])
return True
return False
def _build_indicate_payload(self, buf: bytearray | memoryview, arg) -> int:
return 0
在 Day 05 - 製作外部設定檔 裡,咱們已經設計過 build_config.py
來建置設定檔,所以可以修改它來加入測試用的警示通知的設定檔,例如:
def _build_config():
c = config.Config()
c.annunc_ready.append(BaseAnnunciation(BATTERY_LOW, fired_timestamp=30))
c.annunc_ready.append(
TemperatureAnnunciation(
27.5,
TemperatureAnnunciation.FLAGS_UNIT_CELSIUS,
TemperatureAnnunciation.CONTEXT_DEVICE,
fired_timestamp=50,
)
)
c.annunc_ready.append(BaseAnnunciation(OCCLUSION_DETECTED, fired_timestamp=70))
c.annunc_fired.append(BaseAnnunciation(BATTERY_FULL, id=20001))
c.annunc_fired.append(BaseAnnunciation(RESERVOIR_EMPTY, id=20002))
with open(config.CONFIG_PATH, "w", encoding="utf-8") as fp:
# MicroPython 不支援這些命名參數
json.dump(c.to_dict(), fp, ensure_ascii=False, indent=4)
咱們加入 Battery Low、Temperature 和 Occlusion Detected 到未觸發列表裡,以及 Battery Full 和 Reservoir Empty 到已觸發列表中,然後執行 _build_config()
後,會得到如下內容:
{
"annunc_snoozing_time": 180,
"annunc_ready": [
{
"flags": 1,
"id": 0,
"type": 195,
"status": 51,
"fired_timestamp": 30,
"remaining_snoozed_time": 0
},
{
"flags": 7,
"id": 0,
"type": 869,
"status": 51,
"fired_timestamp": 50,
"remaining_snoozed_time": 0,
"value": 27.5,
"temperature_flags": 1,
"context": 15,
"lower_bound": null,
"upper_bound": null
},
{
"flags": 1,
"id": 0,
"type": 60,
"status": 51,
"fired_timestamp": 70,
"remaining_snoozed_time": 0
}
],
"annunc_fired": [
{
"flags": 1,
"id": 20001,
"type": 240,
"status": 51,
"fired_timestamp": -1,
"remaining_snoozed_time": 0
},
{
"flags": 1,
"id": 20002,
"type": 90,
"status": 51,
"fired_timestamp": -1,
"remaining_snoozed_time": 0
}
]
}
已觸發警示通知是:
Type | ID | EMWR |
---|---|---|
Battery Full | 20001 | Reminder |
Reservoir Empty | 20002 | Maintenance |
而待發送的通知為:
Type | ID | EMWR | Fired Timestamp |
---|---|---|---|
Battery Low | 20003 | Warning | 30 |
Temperature | 20004 | Reminder | 50 |
Occlusion Detected | 20005 | Maintenance | 70 |
當執行以上程式時會發現:
總算把 IDS 必備的 5 個 characteristics 中的 4 個寫完了,進度完全落後 ing ...
明天應該可以進入 IDD Status Reader Control Point 了 ... 吧 ... ㄟ ... 完全沒把握
(>﹏<)