需求分析
後台同時對接了網頁,微信公眾號,iOS 以及 Android客戶端。在某些特定場景下,比如一個用戶接收到其他用戶的提問,我們就需要向這個用戶推送一條消息 (設備狀態)。用戶或者在手機上收到了一條彈窗通知,或者在網頁上看到了消息圖標顯示小紅點 (告警消息)。
公眾號消息推送使用客服接口推送消息。
移動端的消息推送使用國內某些知名的推送平台。在用戶從APP登錄的時候,APP會主動向推送平台設置自己的ID。後台將消息發送到推送平台時指明這個ID即可。
網頁的消息推送一般常見的實現方法有輪詢,長連接,WebSocket等等。在這裡我們使用的是WebSocket 。
消息傳遞的基本流程
後台服務器在某些情況下生成了一條消息, 首先將消息保存到本地數據庫,這樣客戶端可以調用API顯示消息列表。隨後消息被放入任務隊列,任務隊列將消息通過推送平台發送至APP,通過微信公眾號後台發送至用戶微信客戶端。
為了將消息通過WebSocket發送至在線的用戶手中,我們先將消息發佈到 Redis。訂閱了 Redis 的 Node 收到消息,將消息通過 WebSocket 傳遞至與之連接的瀏覽器。
一個大致的消息流如下圖所示:
Redis的發佈訂閱機制
所謂的 Publish/Subscribe,可以讓發佈者將消息發佈至某一個 channel,所有訂閱了這個 channel 的訂閱者就可以立即收到這個消息。在 Redis 的發佈訂閱機制裡面,一個消息可以被發佈至多個channel,訂閱者也可以同時訂閱多個channel的消息。
127.0.0.1:6379> subscribe message-channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "message-channel"
3) (integer) 1
為了訂閱一個名為 message-channel 的消息,我們可以在 Redis 命令行下執行
127.0.0.1:6379> publish message-channel "This is a message"
(integer) 1
127.0.0.1:6379>
消息發佈之後,原本訂閱的那個終端就可以收到消息了:
使用 Python 程式語言發佈消息·Node訂閱消息
向 Redis 發布的消息的數據類型必須為 byte,如果我們需要傳遞複雜的數據結構,就需要將數據 dump 為 json 格式。
import json
from redis import StrictRedis
client = StrictRedis()
data = {
"uid": "user id",
"message": {
"content": "You have a message",
"message_id": "message id"
}
}
client.publish("message-channel", json.dumps(data))
訂閱 message-channel 的 Node 在獲取消息之後,將消息體解析,獲取到裡面的 user id ,根據這個 id 決定消息發送的對象。如果此時用戶不在線,消息就不會被發出。
var redis = require('redis');
var redisListener = redis.createClient();
redisListener.subscribe(config.get('redis_message_channel'));
redisListener.on('message', function(channel, data){
console.log('get a redis message', channel, data);
var data = JSON.parse(data);
io.sockets.in(data.uid).emit('message', data.message);
})
創建WebSocket的服務
本文對 Socket.IO 以及 WebSocket 沒有加以嚴格的區分,但嚴格的來說,Socket.IO 並非完全是 WebSocket。Socket.IO 是一個封裝了 WebSocket 協議的庫,隱藏了底層協議的細節,提供比較高層次的功能。它首先嘗試創建一個長連接,在可能的情況之下嘗試將連接升級到更加輕量級的 WebSocket。此外它還提供了一些更加高級的功能,比如斷線檢測,斷線重連等。
Socket.IO 的服務需要使用它自帶的 client 去連接服務,瀏覽器默認的 WebSocket 對象是不能用的。
如下代碼可以創建一個簡單的Socket.IO服務
var app = require('express')();
var server = require('http').createServer(app);
var io = require('socket.io')(server);
io.on('connection', function(){
});
server.listen(3000);
此時 socket 會運行一個自帶的 http 服務,你可以打開 http://127.0.0.1:3000/, 開啓調試工具並執行如下代碼:
// 加载socket.io 这个库
var script = document.createElement('script');
script.src = 'http://127.0.0.1:3000/socket.io/socket.io.js';
document.body.append(script);
// 连接到服务器,将任何收到的消息log到console
var socket = io.connect();
socket.on('message', function(data){
console.log(data);
})
WebSocket的權限驗證
對於每一個 WebSocket 連接,需要驗證連接人的身份,驗證後才能夠向這個連接發送消息。
一般的 HTTP 請求協議可以通過驗證 cookie,或者在HTTP頭部放置 token 達到驗證的目的。WebSocket 也可以用類似的方法,不同之處在於 WebSocket 只需要在連接建立時驗證一次即可。注意此時 WebSocket 服務以及後端的 HTTP 服務必須在同一個網域底下,不然後端服務的 cookie 不會被傳遞給 WebSocket 服務。一個可行的做法是使用 nginx 同時反向代理後端的 HTTP 以及WebSocket。
後端的 Node 服務接收到連接請求之後,將 cookie 轉發給Web服務做驗證。轉發給 Web 做驗證的原因在於 WebSocket 常用於高併發的場景,應該避免 Node 服務直接請求數據庫。
比如我們使用cookie驗證用戶,那麼我們可以這樣:
var cookie = require('cookie');
io.on('connection', function(){
var cookies = cookie.parse(socket.handshake.headers.cookie || '');
//將cookie通過http協議發送至後端服務器驗證。
var uid = validateCookie(cookies);
if(uid){
//加入一個房間,房間號即為用戶id
socket.join(uid);
}
else{
socket.disconnect()
}
});
如果驗證失敗,主動關閉連接,或者通知客戶端關閉。如果驗證成功,我們可以讓這個連接監聽加入一個專門的 room,為了簡單起見我們直接使用戶的 id。這樣,從Redis獲取的消息體裡面也有用戶 id,我們可以據此將指定的消息送入指定用戶的瀏覽器裡面。
可能存在的問題
按照我們的需求,任何一個消息最終只會被分發給一個用戶。而 Socket.IO 的設計初衷則是基於聊天室的。它認為一個消息有可能會被分發給一個聊天室里的所有用戶。在這個矛盾之下,你會發現這個系統的水平擴展並不是很方便。
目前這個系統的消息會被發佈到單一的 Redis channel,並且只有一個 Node 進程在處理所有的連接。考慮連接過多,單一進程無法處理的情況,為了擴展,一般的做法無非是:
1增加單一機器上面的 Node 進程數。
2增加多台物理機器,每台物理機器運行多個 Node 服務。
上面的(1比較容易實現,簡單的來說,每一個用戶的 WebSocket 會被隨機分配到任何一個 Node 進程。所有 Node 進程訂閱同一個 Redis channel。這樣一個消息會被所有 Node 查看到,然後可能會被其中一個進程傳遞給自己正在連接的用戶。
在(1這個方法里,消息如果僅僅被相關的 Node 進程捕獲就好了,畢竟最終只會有一個 Node 進程處理這個消息。但退一步講,哪怕消息被傳遞給了所有Node進程也應該不會有太大性能的問題。考慮實現(2的機制,難道對於一個消息也要將它發佈多多台物理機器的多個 Node 進程上面去?
一個可行的擴展方法是基於 uid 做一致性 hash,客戶端的連接按照 uid 被 hash 到指定的 Node 進程。Node 進程按照同樣的算法處理指定 uid 的消息,當然這個已經超出了本文的討論範圍。