SUB/PUB 模式經常用來實作全頻廣播或通知等功能,今天我們會介紹如何完成這個模式,以及小小透漏一下這模式還可以衍生出新的用途
正如同第五天文章所說的,SUB/PUB 是 Subscribe (訂閱) / Publishe (發布) 模式,很常用在事件驅動的架構中,各種案例在 NodeJS 比比皆是。下面就是一個 nodeJS 電子信件發送通知的範例。
const EventEmitter = require('events');
// 創建一個自定義事件發布者
class EmailNotificationService extends EventEmitter {
sendEmailNotification(subscriber, eventType, eventData) {
// 模擬發送電子郵件通知的操作
console.log(`給 ${subscriber} 發送了一封關於事件 ${eventType} 的電子郵件通知`);
console.log(`事件詳細信息:${eventData}`);
}
}
const emailService = new EmailNotificationService();
// 訂閱者訂閱了事件通知
emailService.on('notification', (subscriber, eventType, eventData) => {
emailService.sendEmailNotification(subscriber, eventType, eventData);
});
// 模擬事件觸發,並傳入參數
const eventSubscriber = 'user@example.com';
const eventType = '新產品上架';
const eventDetails = '新產品上架通知:現在有一款全新的產品可供購買。';
emailService.emit('notification', eventSubscriber, eventType, eventDetails);
emailService 透過 on() 訂閱 了 notification
這個 topic 主題,然後在後面的 function (subscriber, eventType, eventData) { emailService.sendEmailNotification(subscriber, eventType, eventData) }
則是代表用來處理每個新通知的處理常式。
最後是透過 emit() 發布 新的通知,後面的 eventSubscriber, eventType, eventDetails 則是本次通知的訂閱者、事件類別、事件內容。
再細一點的說明: on('topic', handler) 只有作到事件綁定,這是 SUB/PUB 模式成立的前置動作,呼叫完 on() 之後,PUB 端發送的事件就會被送到所有監聽(訂閱)該事件的 SUB 端。
跟昨天差不多,我們一樣先從一個基本的案例出發,逐步完善 SUB/PUB 模式
# PUB
async def publish():
uri = "ws://localhost:8765" # WebSockets 服務器地址和端口
async with websockets.connect(uri) as websocket:
while True:
message = input("請輸入要發布的消息:")
await websocket.send(message)
asyncio.get_event_loop().run_until_complete(publish())
# SUB
async def subscribe():
uri = "ws://localhost:8765" # WebSockets 服務器地址和端口
topic = "news" # 訂閱的主題
async with websockets.connect(uri) as websocket:
await websocket.send(f"subscribe {topic}")
while True:
message = await websocket.recv()
print(f"收到消息:{message}")
asyncio.get_event_loop().run_until_complete(subscribe())
這邊由 PUB 端主動送出訊息,SUB 端也會不斷地去接收訊息。但仔細觀察會注意到這只是單一 SUB/PUB 的案例,而且上面的例子沒有處理 SUB 端想要訂閱的請求,顯然是不足的
# 存储订阅者的集合,每个主题都有一个订阅者列表
subscriptions = {}
async def publish(websocket, path):
async for message in websocket:
# 解析客户端发送的消息
if message.startswith("subscribe"):
_, topic = message.split()
if topic not in subscriptions:
subscriptions[topic] = set()
subscriptions[topic].add(websocket)
elif message.startswith("unsubscribe"):
_, topic = message.split()
if topic in subscriptions and websocket in subscriptions[topic]:
subscriptions[topic].remove(websocket)
elif message.startswith("publish"):
_, topic, content = message.split(maxsplit=2)
if topic in subscriptions:
for subscriber in subscriptions[topic]:
await subscriber.send(content)
這邊的實作大概有了雛型,上述例子讓 publish() 可以讓使用者處理 subscribe, unsubscribe 的請求,然後其實那兩個請求是可以作成 REQ/RES (API-liked) 的形式
相對於前面訂閱主題/接收通知的案例,也有一種用法是把通知當成一種「控制訊號」(command)。為什麼這裡要談控制訊號呢?因為在回合制遊戲中,玩家其實是不能任意進行活動的。玩家必須等到主機告知輪到他的回合了,他才能進行操作。這種就很適合把回合開始的通知當成控制訊號來用。大家其實可以再針對自己的需要進行調整
上面這個疑問是遊戲在發送通知的時候,有些通知(玩家抽牌結果)只會發給行動玩家,有些通知則會發送給非行動玩家。我覺得這是企圖控制 topic 發送範圍的想法,比較不是 SUB/PUB 想要關注的部分
關於這個部分,我們可以留到實作玩家回合制的時候再好好地聊
今天我們學會如何寫出 SUB/PUB 模式的 websocket 通訊,以及學會一個把通知當作控制訊號的用法。接下來還有什麼 websocket 相關的概念或知識呢?讓我們期待明天的分享吧~