昨天我們演示了CQRS的系統演化,將一個單體架構一步步演化成一個事件驅動架構。在事件驅動架構中,有一個靈魂角色是訊息佇列,扮演訊息存儲和傳遞的角色。
但有這麼多訊息佇列系統和實踐方法,又該如何在其中進行選擇?在這篇文章中,我們會仔細介紹訊息佇列的各個考量點,並且以Redis作為示範來解釋每個考量點的影響。
此外,Redis雖然廣泛被作為快取使用,但實際上以Redis為核心的訊息佇列實踐也不少見,因此,這篇文章也能作為一個快速入門。
在選擇一個訊息佇列時有許多面向要考慮:
在這節我會解釋每個面向。
這裡的傳播方式指的是一個訊息要怎麼送到目的地,有兩種傳播型態:
一對一很容易理解,當訊息生產者發送一個訊息進入訊息佇列,這個訊息只會傳送給一個消費者收到。
一對多則是當生產者送出一個訊息後,該訊息會被多個消費者收到。值得一提的是,生產者只送出一個訊息,但是這個訊息會被訊息佇列複製給多個消費者。這樣的行為也稱為扇出。
抵達方式很有趣,多數的訊息佇列都有自己的抵達保證。有三種常見的保證:
至多一次相對容易達成。基本上只要是訊息佇列,至少都有這個保證。
訊息消費者可能收到一次訊息,或根本沒收到。沒收到的情況有幾種,首先,訊息因為網路傳輸的問題遺失了,或者,雖然消費者有收到,但在處理的過程中出現錯誤,導致訊息沒正確完成。
在至多一次保證下,訊息遺失就再也找不回來了。
至少一次通常在一些主流的訊息佇列上很常見,例如RabbitMQ或Kafka等。相較於至多一次,至少一次是一個較強的保證。訊息佇列可以保證每個訊息都有被處理。
儘管如此,訊息也有可能被處理很多次。舉例來說,一個消費者在處理完後並沒有知會訊息佇列,因此訊息佇列又將訊息遞送一次。在至少一次的保證下,保持訊息處理的冪等性是非常重要的。
僅有一次是最嚴格的保證。在僅有一次保證下,所有的訊息只會被處理一次。即使是熱門的訊息佇列也不一定有支持這樣的保證,例如RabbiMQ就沒有。但若是正確設定和使用Kafka,是可以做到僅有一次的,只是會犧牲掉效能。
持久化指的是當訊息送進佇列後會不會消失。這也有三種持久化型態:
我們都知道這些代表什麼意思。
但有趣的是,使用硬碟持久化會比較慢嗎?答案是不一定。
這取決於持久化是怎麼實作的,Kafka利用LSM-tree和零複製的技術做到即便存硬碟也不會損失效能,甚至效能比存記憶體的RabbitMQ更好。
另一個存硬碟也不損失效能的例子是Cassandra資料庫,他同樣利用LSM-tree的方式提升效能,甚至讓Cassandra是一個很擅長寫入的資料庫。
雙軌制是一個特殊模式,為了提升效能,訊息佇列首先將訊息寫入記憶體,並定期寫入硬碟以保障持久化。RabbitMQ就是個雙軌制典型的例子,值得一提的是,RabbitMQ也可以設定成只存硬碟。
我認為,水平擴展是一個訊息佇列最重要的考量。
處理訊息通常很花時間,因此我們必須要啟用更多消費者來處理訊息,也就是水平擴展。
要支援水平擴展有個很重要的特性稱為消費者群組。消費者群組讓處理訊息的消費者不再是一個,而可以是多個,同時,消費者群組也可以套用在一對多的情況。使一個訊息送至多個消費者群組,而不是一個。
討論完訊息佇列的一些特性,讓我們來看看Redis如何成為一個訊息佇列,並且檢視每個訊息佇列特性的意涵。
Redis要作為訊息佇列有三種做法:
我們會一個一個介紹,最後提供一個總結。
Pub/Sub是一個常見的通知方案,這個功能幾乎跟Redis一起誕生。消費者透過SUBSCRIBE
訂閱一個主題,在生產者PUBLISH
訊息進主題後就會收到。
身為一個傳統Pub/Sub的功能,這當然也支援扇出的機制。更有甚者,也可以透過PSUBSCRIBE
做到一定程度的訊息路由。
但是,Pub/Sub並不是Redis的熱門功能。最大的問題是,訊息只有至多一次保證。當訊息送出,但消費者沒正確接到或成功處理,這個訊息就會消失。
更糟糕的是,Redis並沒有持久化訊息。所以當Redis故障,所有的訊息就直接消失。
讓我們總結一下Pub/Sub:
列表在Redis中是一個有用的資料結構,透過列表也可以簡單做到先進先出佇列(FIFO)。秘訣是使用BLPOP
等待訊息進入列表,但建議加個超時限制。
根據上圖,我們可以看到如果有多個消費者在等待同一個列表,那麼他們就會自動變成一個消費者群組而不需要額外設定。
另一方面,列表無法把訊息扇出。一但一個訊息被消費者接走,其他人就再也收不到了,即便這個訊息沒被成功處理。
儘管如此,列表是能夠在記憶體中持久化訊息的,此外,如果有開啟AOF
或RDB
那麼訊息可以備份在硬碟上。不過我必須要說,Redis並不是完全持久化。
總結一下:
介紹完Pub/Sub和串流,我們注意到這兩個都不怎麼好,這兩個方案都有自己的缺點。因此,串流作為解決方案在Redis 5.0版本之後被提出。
因為串流複雜多了,因此我們先總結一下串流的特性:
就結果來說,串流解決Pub/Sub和列表所有碰到的問題,並且額外提供至少一次保證。
這個圖形長相像是Pub/Sub但工作流程卻更接近列表。
生產者可以在任何時間透過XADD
將訊息送入Redis,可以想像成是有一個列表把所有訊息保存下來。消費者也可以在任何時間透過XREAD
將訊息取出,在XREAD
後面用到的運算元指的是要從哪開始拿訊息。
$
: 無論之前有多少訊息,我都只從現在開始拿。0-0
: 總是從頭開始拿。<id>
: 從一個特定的訊息開始拿。除了單一消費者外,串流也支援消費者群組。
為了做到至少一次保證,就像多數訊息佇列一樣,消費者必須在成功處理訊息後利用XACK
知會Redis
<
是一個特殊的運算元,指的是從群組內還沒有人拿過的訊息開始。
解釋完整個流程,我用一個實際的Nodejs
範例示範消費者的正確行為。
let lastid = "0-0";
let checkBacklog = true
while (true) {
const myid = checkBacklog ? lastid : ">";
const items = await redis.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',StreamName,myid);
if (!items) continue;
checkBacklog = !(items[0][1].length == 0);
items[0][1].forEach(elem => {
const [id, fields] = elem;
await processMessage(id,fields);
await redis.xack(StreamName,GroupName,id);
lastid = id;
});
}
請注意,所有的消費者都有他自己的名字ConsumerName
。
首先,消費者先從最前頭讀取,用來判斷自己最後的位置。如果回應是空陣列表示消費者可以從lastid
知道該從哪開始。最後知會Redis結束的id
。
在分散式系統中,我們很難命名一個消費者。舉例來說,消費者可能在k8s
中執行,那我們該如何命名每個pod
?就算我們可以在一開始鎖定名字,我們該怎麼面對水平擴展後產生的新消費者?因此,在分散式系統中命名是不實際的。
就算如此,我們也不能用uuid
來命名消費者,然後用過就丟。因為Redis會維護一個名字和最後位置的對應表。如果每次都建立一個隨機名字,那麼那張對應表就會越來越大,最糟的是,那些被取出卻沒知會的訊息再也不可能被處理。
好在,Redis提供一個方法讓人認領待處理的訊息。工作流程如下:
因此,整個消費者的啟動流程如下:
XPENDING StreamName GroupName
XCLAIM StreamName GroupName <ConsumerName in uuid> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
min-idle-time
是一個有用的機制,我們可以避免很多消費者同時認領一個訊息。當第一個消費者開始認領訊息時,這個訊息就不再是閒置了,因此其餘消費者也無法重複認領。
透過正確使用XCLAIM
,我們就可以使用隨機名字也不用擔心對應表會不斷長大,因為訊息會被別人認領走,而對應表中沒用的紀錄會被刪除。
當串流內保存越來越多訊息,記憶體用量會變成Redis的災難。
如果我們去看Redis的串流手冊,會發現有一個XDEL
可以用。但XDEL
實際上並不會真的刪除訊息,而是將訊息標註成不可用,但訊息還是在那。
那我們該如何避免記憶體洩漏?
我們可以在XADD
時使用MAXLEN
這個參數:
XADD StreamName MAXLEN 1000 * foo bar
但有件事得要知道,MAXLEN
會影響Redis的效能,他會卡住Redis的主執行緒直到清除完畢,此時其他Redis命令都無法執行。如果有許多訊息不斷流入,那Redis會很忙於維持MAXLEN
。
有個替代方案,與其固定長度,不如讓Redis自己決定該要留多長,並在Redis的空閑時間處理。因此,命令會變成:
XADD StreamName MAXLEN ~ 1000 * foo bar
這個~
符號表示最大長度約略是1000,但可以是900也可以是1300,Redis會自己選擇適合的長度並在適合的時間處理。
讓我們總結一下這三個方案。
Pub/Sub | 列表 | 串流 | |
---|---|---|---|
扇出 | V | V | |
遞送保證 | At-most-once | At-most-once | At-least-once |
水平擴展 | V | V | |
持久化 | AOF, RDB | AOF, RDB | |
複雜度 | low | low | high |
有一個剛沒提到的特性,複雜度,這指的不只是技術的複雜度,同時也是實作生產者的複雜度。
就我觀點來說,這每個方案都有自己的優缺點,也有自己適合的場景。
值得一提的是,為什麼Redis串流是鬆散的串流處理?
因為Redis的消費者群組不如Kafka,他無法保證訊息的順序。若是無法保證訊息順序,那在一個高流量的環境,也會很難成功做到水平擴展。