RabbitMQ 是一套可靠且成熟的 Message Broker,支援多種傳輸協定,包含:AMQP、MQTT 等,同時提供了多種程式語言的 Client,是目前非常受歡迎的 Message Broker。
補充:RabbitMQ 最初是為了實現 AMQP 所打造的,所以 AMQP 也是 RabbitMQ 的核心協定。
AMQP 是一種傳輸協定,全名是 Advanced Message Queuing Protocol,從名稱中可以得知,這個協定具有 Queue 的功能,讓訊息可以先待在 Message Broker 中,達到緩衝的效果。
AMQP 協定有 生產者(Producer/Publisher)、消費者(Consumer) 與 Message Broker 三個角色,Producer 會負責將訊息發送到 Message Broker,它會再將訊息傳遞給 Consumer 或是由 Consumer 提出。
乍看之下與 MQTT 的實現方式差不多,但 Message Broker 的設計上有些不同,AMQP Message Broker 內有兩個角色,分別是 交換機(Exchange) 與 佇列(Queue)。下方是 AMQP 的運作示意圖:
當 Producer 將訊息發送給 Message Broker 時,實際上是先將訊息傳到內部的 Exchange,它再根據某種規則將訊息複製到特定的 Queue 裡面,這個規則又叫 綁定(Binding),當訊息進入 Queue 之後,會將訊息發送到訂閱的 Consumer,或是由 Consumer 從 Queue 中提出。
值得一提的是,AMQP 有設計 訊息確認(Message Acknowledgement) 機制,當 Consumer 收到訊息時,Consumer 要告知 Message Broker 已正確收到訊息並處理,Message Broker 就會將該訊息從 Queue 中移除,這可以確保該訊息有正確地處理。
Exchange 是一個訊息分配的角色,會根據不同種類的 Exchange 與 Binding 來分配訊息到零個、一個或多個 Queue 中。那 Exchange 有哪些不同的種類呢?事實上,AMQP 設計了四種不同的 Exchange,分別是 Direct Exchange、Topic Exchange、Fanout Exchange 與 Headers Exchange,這部分待會會做進一步的說明。
除了 Exchange 的種類之外,Exchange 還有一些重要的設定會影響它的運作,像是:
Name
:Exchange 的名稱。Durability
:Exchange 是否在 Message Broker 重啟後依然存在。Auto-delete
:是否在最後一個 Queue 解除綁定時刪除 Exchange。這類型的 Exchange 運作原理相當單純,每當 Producer 發送訊息到 Direct Exchange 時,這個訊息會附帶一個叫 路由鍵(Routing Key) 的字串,這個 Routing Key 會被用來決定訊息要傳送到哪個 Queue,決定的方式是 Queue 會與 Direct Exchange 綁定一個 綁定鍵(Binding Key),當 Routing Key 與 Binding Key 完全匹配,就會將訊息傳到該 Queue 中。
補充:RabbitMQ 會有一個預設的 Exchange,該 Exchange 是 Direct Exchange,它會自動與所有 Queue Binding,使用的 Binding Key 即該 Queue 的名稱。
這類型的 Exchange 運作原理跟 Direct Exchange 相似,同樣都使用 Routing Key 來決定要將訊息傳到哪些 Queue,但在 Topic Exchange 下,Binding Key 支援層級的功能,使用 點(.) 來表示資訊層級,透過劃分層級讓我們能使用 Wildcards 來撰寫匹配規則,當 Routing Key 符合 Binding Key 的 Pattern 時,就會將訊息傳到該 Queue 中。
注意:在這個情況下,Routing Key 也必須以
.
來替 Routing Key 劃分層級。
下方是幾個 Routing Key 的範例:
crypto.pow.btc
crypto.pos.eth
crypto.pow.doge
下方是採用 Wildcards 的 Binding Key 範例:
crypto.pow.*
crypto.#
會發現 Binding Key 有 *
與 #
兩種特殊符號,它們的用途如下:
*
:該字元的階層可以是任何字元,以 crypto.pow.*
來說,crypto.pow.btc
與crypto.pow.doge
都符合篩選條件。#
:該字元的階層與下方所有階層可以是任何字元,以 crypto.#
來說,上方三個 Routing Key 範例都符合篩選條件。這個類型的 Exchange 與前兩者不同,它 不會 使用 Routing Key 來決定訊息要分配到哪些 Queue,而是直接將訊息傳到 全部的 Queue 中。
這個類型的 Exchange 運作方式很特別,它 不會 使用 Routing Key 來決定訊息要分配到哪些 Queue,而是使用 標頭(Headers) 來決定,Queue 會與 Headers Exchange 綁定一組 Headers,這組 Headers 會附帶一個 x-match
的資訊讓 Headers Exchange 知道要如何分配訊息,當 Producer 傳送訊息到 Headers Exchange 時,會根據該訊息的 Headers 與 Queue 綁定的 Headers 做配對,如果 x-match
設定為 all
,則需要完全相等、如果設定為 any
,則只要有一個相等即可,符合條件就會將該訊息傳到該 Queue 中。
AMQP 下的 Queue 與其他 Message Broker 差不多,但它提供了一些與 Exchange 差不多的設定:
Name
:Queue 的名稱,為 UTF-8 編碼,最長可達 255
Bytes,如果指定為空字串則會由系統自動產生。Durable
:Queue 是否在 Message Broker 重啟後依然存在。Exclusive
:限制只能一個連線使用,當該連線關閉時,Queue 就會被刪除。Auto-delete
:當最後一個 Consumer 取消訂閱時刪除 Queue。不論是 Producer 還是 Consumer 都需要與 RabbitMQ 建立基於 TCP 的 連線(Connection)。Connection 的建立與關閉通常由應用程式控制,生命週期應與應用程式保持一致,也就是在應用程式啟動時建立、在應用程式結束時終止。
Connection 可以透過 通道(Channel) 的機制來實現多項操作,例如:傳送訊息、接收訊息、定義 Queue 等。Channel 是輕量級的,一個 Connection 可開啟多個 Channel,在使用上不用過度重用,建議針對不同工作任務使用不同的 Channel。使用 Channel 可降低建立過多 Connection 對 RabbitMQ 的負擔。
為了減緩大量訊息給應用程式帶來的負擔,Consumer 可透過 Channel 設定 Prefetch 數值,限制一次從 RabbitMQ 取得的未確認訊息數量上限。Prefetch 數值可根據實際狀況動態調整,以獲得最佳的資源利用率。
在終端機輸入下方指令以便從 Docker Hub 下載 RabbitMQ 的 Docker Image:
$ docker pull rabbitmq:management
透過下方指令將 RabbitMQ 架設在 5672
port,並將後台網站架設在 15672
port:
$ docker run --name <NAME> -p 5672:5672 -p 15672:15672 -d rabbitmq:management
架設完畢後,打開瀏覽器並輸入網址 http://localhost:15672,會看到 Admin 的後台網站:
補充:預設情況下,Admin 帳號密碼為
guest
。
在讓兩個服務透過 RabbitMQ 進行交換訊息之前,先試著用 RabbitMQ 提供的 rabbitmqadmin 以及後台網站來實際操作一次 RabbitMQ。首先,進入後台網站來建立 Queue、Exchange 以及 Binding,這邊我們先建立兩個 Queue,分別取名為 test_queue_1
與 test_queue_2
:
接著,建立一個 Topic Exchange,將其名稱設為 test_topic_exchange
:
有了 Queue 與 Exchange,就需要透過 Binding 將 test_queue_1
與 test_queue_2
綁定在 test_topic_exchange
上,並將他們的 Binding Key 分別設為 crypto.pow.*
與 crypto.#
:
環境準備好之後,開啟兩個終端機並透過下方指令進入 Container 中的 bash:
$ docker exec -it <NAME> /bin/bash
進入之後,先使用一個終端機輸入下方指令將訊息發送到 test_topic_exchange
,並指定 Routing Key 為 crypto.pos.eth
:
$ rabbitmqadmin publish exchange=test_topic_exchange routing_key=crypto.pos.eth payload=ETH
接著,透過另一個終端機輸入下方指令從 test_queue_1
提取訊息:
$ rabbitmqadmin get queue=test_queue_1 ackmode=ack_requeue_false
此時會收到沒有資料的訊息,原因是 test_queue_1
使用的 Binding Key 為 crypto.pow.*
,與我們送出的 crypto.pos.eth
不匹配:
接著,再透過下方指令從 test_queue_2
提取訊息:
$ rabbitmqadmin get queue=test_queue_2 ackmode=ack_requeue_false
此時會收到剛剛發送的訊息,因為 crypto.pos.eth
滿足 crypto.#
的條件:
NestJS 實作了 RabbitMQ Transporter,讓微服務應用程式可以用跟其他 Transporter 一樣的開發風格來使用 RabbitMQ。
注意:雖然保持了一致的開發風格,但 RabbitMQ Transporter 在設計上並沒有很好地運用 RabbitMQ 本身的機制,下面會針對這個部分做進一步的說明。
要使用 Rabbit Transporter 之前需要先安裝下方套件:
$ npm install amqplib amqp-connection-manager
補充:amqplib 是一套用於 Node.js 的 AMQP 客戶端,有興趣可以參考 官方文件,而 amqp-connection-manager 是一套包裝 amqplib 的函式庫,主要是用來處理連線狀態的,有興趣也可以參考 官方文件。
這裡建議額外安裝 amqplib
的型別定義檔:
$ npm install @types/amqplib -D
修改載入點 main.ts
的內容,將 transport
設定為 Transport.RMQ
,並根據架設的 RabbitMQ 位址來設定 urls
的資訊,以我們架設起來的 RabbitMQ 來說,會使用的是 amqp://
開頭的位址:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'test-queue',
queueOptions: {
durable: false,
},
},
},
);
await app.listen();
}
bootstrap();
上方範例除了在 options
使用了 urls
來指定 RabbitMQ 位址外,還指定了要使用 test-queue
這個 Queue,並針對該 Queue 做一些額外的設定,下方列出在 RabbitMQ Transporter 下的 options
較重要的設定屬性:
urls
:連線的位址。queue
:監聽的 Queue 名稱,如果 RabbitMQ 上面沒有該 Queue 則會自動產生一個。queueOptions
:Queue 的相關設定,詳細內容可以參考 amqplib 相關文件,裡面有提到 options
的設定項。noAck
:如果設為 false
,將啟用手動 Message Acknowledgement。headers
:為每個訊息添加的 Headers。prefetchCount
:設置 Channel 的 Prefetch 數值。注意:從上方列出的
options
內容可以得知,Rabbit Transporter 有個缺點,就是無法讓開發者指定 Exchange,它會直接使用屬於 Direct Exchange 的 預設 Exchange,也因為是預設 Exchange,所以 Routing Key 會需要用 Queue 的名稱,這樣才能達成匹配條件,這就表示無法享受到 Topic Exchange 的 Wildcards 特性,這就是 RabbitMQ Transporter 沒有很好運用 RabbitMQ 機制的主因。
RabbitMQ Transporter 也支援 Request-response 與 Event-based 訊息模式。下方是範例程式碼,在 AppController
內實作 sayHello
方法並套用 @MessagePattern
裝飾器,以及實作 listenCryptoBTC
方法並套用 @EventPattern
裝飾器:
import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@MessagePattern({ cmd: 'hello' })
sayHello(data: string) {
console.log(data);
return `Hello, ${data}`;
}
@EventPattern('crypto.pow.btc')
listenCryptoBTC(coin: { name: string; }) {
console.log(coin);
}
}
透過 rabbitmqadmin 進行測試,透過下方指令將訊息發送到 RabbitMQ,並設定 Routing Key 為前面定義的 Queue 名稱 - test-queue
,這裡需特別注意,payload
的部分會是一個可反序列化的字串,裡面會有 pattern
,用來讓 RabbitMQ Transporter 配對 Pattern,而 data
才是真正要傳送的資料:
$ rabbitmqadmin publish routing_key=test-queue payload="{\"pattern\":\"crypto.pow.btc\",\"data\":{\"name\":\"BTC\"}}"
注意:RabbitMQ Transporter 無法做到 Wildcards 的特性,所以
pattern
使用*
或#
都是 沒有效果 的。
此時在微服務應用程式的終端機會顯示發送的訊息:
Pattern 設定為可序列化物件的話,要如何透過 rabbitmqadmin 傳送訊息呢?這部分需參照上方傳送訊息的格式,pattern
需設定為序列化後的字串,以 { cmd: 'hello' }
來說,就是帶入 {"cmd":"hello"}
,data
則是本來要傳送的資料:
$ rabbitmqadmin publish routing_key=test-queue payload="{\"pattern\":{\"cmd\":\"hello\"},\"data\":\"HAO\"}"
此時在微服務應用程式的終端機會顯示發送的訊息:
假如要取得該請求的相關資訊,比如:Channel 實例,可以透過 @Ctx
裝飾器取得 RmqContext
。下方是範例程式碼:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
RmqContext
} from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('crypto.pow.btc')
listenCryptoBTC(
@Payload() item: { name: string; },
@Ctx() ctx: RmqContext
) {
console.log(ctx.getChannelRef());
console.log(item);
}
}
使用 rabbitmqadmin 進行測試,透過下方指令發送 pattern
為 crypto.pow.btc
的訊息:
$ rabbitmqadmin publish routing_key=test-queue payload="{\"pattern\":\"crypto.pow.btc\",\"data\":{\"name\":\"BTC\"}}"
此時在微服務應用程式的終端機會顯示發送的訊息以及 Channel 實例:
有些情況下,可能會希望由開發者來決定 Message Acknowledgement 的時機,首先,透過設定 noAck
為 false
來避免自動處理:
// ...
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
urls: ['amqp://localhost:5672'],
queue: 'test-queue',
noAck: false,
queueOptions: {
// ...
},
},
);
await app.listen();
}
// ...
來測試一下沒有 Message Acknowledgement 的情況,透過 rabbitmqadmin 發送訊息:
$ rabbitmqadmin publish routing_key=test-queue payload="{\"pattern\":\"crypto.pow.btc\",\"data\":{\"name\":\"BTC\"}}"
此時微服務應用程式依然正常運作,但我們只要重啟服務就會回放剛剛那則訊息:
接著來實現手動 Message Acknowledgement,在 Handler 的地方透過 @Ctx
裝飾器取得 RmqContext
,並從中取出 Channel 實例,我們需要透過 Channel 實例提供的 ack
方法來手動進行 Message Acknowledgement,它需帶入從 RmqContext
的 getMessage
方法取得的原始訊息:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
RmqContext
} from '@nestjs/microservices';
import { Channel, Message } from 'amqplib';
@Controller()
export class AppController {
@EventPattern('crypto.pow.btc')
listenCryptoBTC(
@Payload() item: { name: string; },
@Ctx() ctx: RmqContext
) {
const channel: Channel = ctx.getChannelRef();
const originMessage = ctx.getMessage() as Message;
channel.ack(originMessage);
console.log(item);
}
}
來測試一下有手動 Message Acknowledgement 的情況,透過 rabbitmqadmin 發送訊息:
$ rabbitmqadmin publish routing_key=test-queue payload="{\"pattern\":\"crypto.pow.btc\",\"data\":{\"name\":\"BTC\"}}"
此時微服務應用程式依然正常運作,重啟後不會回放剛剛的訊息,因為有順利完成 Message Acknowledgement:
修改 AppModule
的內容,透過 ClientsModule
建立 RabbitMQ Transporter 的 ClientProxy
:
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
// ...
@Module({
// ...
imports: [
ClientsModule.register([
{
name: 'RMQ_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'test-queue',
queueOptions: {
durable: false,
},
}
}
])
]
})
export class AppModule {}
前面有提到 RabbitMQ Transporter 也支援 Request-response 與 Event-based 訊息模式,所以可以使用 ClientProxy
的 send
與 emit
方法。下方是範例程式碼,修改 AppController
的內容,使用 @Inject
裝飾器注入 ClientProxy
,並設計 getHello
與 listenBTC
方法:
import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
@Inject('RMQ_SERVICE') private readonly rabbitMQService: ClientProxy
) {}
@Get()
getHello() {
return this.rabbitMQService.send({ cmd: 'hello' }, 'HAO');
}
@Get('btc')
listenBTC() {
return this.rabbitMQService.emit('crypto.pow.btc', { name: 'BTC' });
}
}
透過 Postman 使用 GET
方法存取 http://localhost:3000,會看到下方的結果:
透過 Postman 使用 GET
方法存取 http://localhost:3000/btc,在微服務應用程式的終端機會看到 { name: 'BTC' }
:
在某些情境下,可能會運用 RabbitMQ Headers 來傳遞一些資訊,針對這種情況,可以運用 NestJS 設計的 RmqRecordBuilder
來實現。以下方程式碼為例,建立一個物件其包含 x-version
為 1
,接著,經由 RmqRecordBuilder
把訊息與 Headers 整理成 Record,最後將它當作資料讓 ClientProxy
來處理:
import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
@Inject('RMQ_SERVICE') private readonly rabbitMQService: ClientProxy
) {}
@Get('btc')
listenBTC() {
const headers = {
'x-version': '1',
};
const record = new RmqRecordBuilder({ name: 'BTC' })
.setOptions({ headers })
.build();
return this.rabbitMQService.emit('crypto.pow.btc', record);
}
修改微服務應用程式的部分,在 AppController
的 listenCryptoBTC
方法裡透過 console.log
將放在原始訊息裡的 Headers 資訊印出:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
RmqContext
} from '@nestjs/microservices';
import { Channel, Message } from 'amqplib';
@Controller()
export class AppController {
@EventPattern('crypto.pow.btc')
listenCryptoBTC(
@Payload() item: { name: string; },
@Ctx() ctx: RmqContext
) {
const channel: Channel = ctx.getChannelRef();
const originMessage = ctx.getMessage() as Message;
channel.ack(originMessage);
console.log(originMessage.properties.headers);
}
}
透過 Postman 使用 GET
方法存取 http://localhost:3000/btc,在微服務應用程式的終端機會看到 {"x-version":"1"}
:
RabbitMQ 是一個可靠且成熟的 Message Broker,支援多種傳輸協定,包括核心協定 AMQP 和 MQTT。它設計了 Producer、Consumer 和 Message Broker 三個角色,並支援 Message Acknowledgement 以確保訊息正確處理。在 AMQP 中,Exchange 和 Queue 扮演著重要角色,Exchange 負責訊息的分配,它有四種類型:Direct、Topic、Fanout 和 Headers Exchange。而 Queue 用於暫存訊息,進而緩衝應用程式因大量訊息造成負擔。
NestJS 提供了 RabbitMQ Transporter 讓我們可以用跟其他 Transporter 一樣的開發風格來使用 RabbitMQ,針對要設置 Headers 的情況,NestJS 運用 Record Builder 這個角色的定位設計了 RmqRecordBuilder
,讓開發者可以運用它來設置 Headers。然而 RabbitMQ Transporter 存在一些限制,它並沒有很好地運用 RabbitMQ 的機制,甚至無法使用 Wildcards 的方式來訂閱,如果要深度使用 RabbitMQ 的話需要再三評估。