NATS 是一套輕量、安全且高效的訊息傳遞系統,經常用於微服務之間的溝通、物聯網設備的訊息傳遞、雲端原生(Cloud Native) 應用等。
NATS 採用 Publish/Subscribe 機制來交換訊息,Subscriber 會訂閱某個特定的 主題(Subject),當 Publisher 發送該 Subject 的訊息時,NATS 會將訊息轉發給 Subscriber。
NATS Subject 是 Publisher 與 Subscriber 溝通的介面,格式為 ASCII 編碼的字串,建議使用字元為 a
到 z
、A
到 Z
及 0
到 9
。它具有 階層(Hierarchy) 的概念,使用 點(.) 來表示資訊層級。下方是幾個 NATS Subject 範例:
shop.order.created
shop.order.cancelled
shop.product.created
shop.product.updated
NATS Subject 的 Hierarchy 設計可以讓 Subscriber 透過 Wildcards 來訂閱符合條件的 Subject,大幅降低逐一訂閱的成本。下方是採用 Wildcards 的 Subject:
shop.*.created
shop.>
會發現有 *
與 >
兩種特殊符號,它們的用途如下:
*
:該字元的階層可以是任何字元,以 shop.*.created
來說,shop.order.created
與shop.product.created
都符合篩選條件。>
:該字元的階層與下方所有階層可以是任何字元,以 shop.>
來說,上方四個範例的 Subject 都符合篩選條件。補充:如果有看過前一篇介紹 MQTT 的話,會發現 Subject 的整體設計跟 MQTT Topic 大同小異,僅差在名詞與使用的字元、編碼不同,但概念基本上是差不多的。
實務上經常會遇到需要水平擴展應用程式的情況,假如我們擴展了某個 Subscriber,此時會發生一個狀況,就是這些擴展出來的 Subscriber 會訂閱相同的 Subject,當 Publisher 發送訊息時,很可能導致 重工 的問題發生,所以 NATS 提供 佇列群組(Queue Groups) 的功能來實現 負載平衡(Load Balance)。
Queue Groups 的運作原理很簡單,Subscriber 在訂閱的時候,告訴 NATS 自己屬於哪個 Queue Group,當 Publisher 發送訊息時,NATS 會隨機分配給其中一個 Subscriber。
NATS 除了使用 Publish/Subscribe 的方式來交換訊息,它還提供了 Request/Reply 的機制。NATS 實現 Request/Reply 的方式也是基於 Publish/Subscribe,當 Subscriber 收到訊息時,會將要回覆的訊息發送到回覆用的 Subject,這個 Subject 稱為 inbox。
NATS 有提供三種服務品質,分別是:最多傳輸一次、至少傳輸一次 及 確實傳輸一次。要採用某一種服務品質取決於使用的是 Core NATS 或是使用 NATS 的 JetStream 附加功能。
補充:如果有看過前一篇介紹 MQTT 的話,會發現 NATS 提供的三種 QoS 與 MQTT 定義的 QoS0、QoS1、QoS2 有相同的意義。
Core NATS 指的就是 NATS 本身,它並沒有持久性處理的功能,所以只提供最多傳輸一次的 QoS,假如 Subscriber 在 Publisher 發送訊息後才訂閱 Subject,就會發生 訊息遺失 的問題。
補充:雖然 Core NATS 不提供持久性處理,但可以運用 Request/Reply 機制讓開發者透過 Reply 來自行處理失敗等情況。
NATS JetStream 是內建於 NATS 中的附加功能,實現了持久性處理,提供至少傳輸一次及確實傳輸一次的 QoS。
補充:NATS JetStream 比起 Core NATS 有更多更複雜的概念,為了不讓本篇文的內容失焦,故不做進一步說明,有興趣可以參考官方文件。
在終端機輸入下方指令以便從 Docker Hub 下載 NATS 的 Docker Image:
$ docker pull nats
在啟動 NATS 之前,需要預先準備可以操作 NATS 的環境,這邊推薦使用 nats-box 這個輕量級的 NATS 工具程式,它以 Docker Image 的方式提供給開發者使用,是非常方便的工具。透過下方指令從 Docker Hub 下載它的 Docker Image:
$ docker pull natsio/nats-box
NATS 跟 nats-box 都會以 Container 的方式運作,為了讓它們之間可以進行通訊,故需要建立一個 Docker Bridge Network 來建立通訊的橋樑:
$ docker network create <NETWORK_NAME>
接下來就可以架設 NATS 了,透過下方指令將 NATS 架設在 4222
port 並運行於 <NETWORK_NAME>
上:
$ docker run --name <NAME> --network <NETWORK_NAME> -p 4222:4222 -d nats
在讓兩個服務透過 NATS 進行交換訊息之前,先試著用 nats-box 來操作。在終端機輸入下方指令啟動 nats-box 並將它運行於 <NETWORK_NAME>
上:
$ docker run --network <NETWORK_NAME> --rm -it natsio/nats-box
接著,開啟一個新的終端機並輸入下方指令來找出 NATS 的 IP 位址:
$ docker network inspect <NETWORK_NAME>
找到 IP 之後,回到啟動 nats-box 的終端機並使用下方指令訂閱 Subject 為 order.created
的訊息:
$ nats sub -s nats://<NATS_SERVER_IP>:4222 order.created
接著,使用新的終端機啟動 nats-box 並輸入下方指令來發送 Subject 為 order.created
的訊息:
$ nats pub -s nats://<NATS_SERVER_IP>:4222 order.created Hello
此時 Subscriber 會收到訊息:
接著,來測試一下使用 Wildcards 進行訂閱,透過下方指令訂閱符合 shop.*.created
的 Subject:
$ nats sub -s nats://<NATS_SERVER_IP>:4222 shop.*.created
此時發送 Subject 為 shop.order.created
的訊息:
$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created Hello
Subscriber 會順利收到發送的訊息:
最後來測試一下使用 Queue Group 負載平衡,在兩個終端機啟動 nats-box 並輸入下方指令訂閱 shop.order.created
的 Subject:
$ nats sub -s nats://<NATS_SERVER_IP>:4222 --queue <QUEUE_NAME> shop.order.created
此時發送 Subject 為 shop.order.created
的訊息:
$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created Hello
只有一個 Subscriber 會收到發送的訊息:
NestJS 實作了 NATS Transporter,讓微服務應用程式可以用跟其他 Transporter 一樣的開發風格來使用 NATS。
要使用 NATS Transporter 之前需要先安裝下方套件:
$ npm install nats
補充:nats 是一套用於 Node.js 的 NATS 客戶端,有興趣可以參考 官方文件。
修改載入點 main.ts
的內容,將 transport
設定為 Transport.NATS
,並根據架設的 NATS 位址來設定 servers
的資訊:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
}
},
);
await app.listen();
}
bootstrap();
上方範例在 options
只使用了 servers
,事實上,NATS Transporter 的 options
內容十分豐富,詳細設定可以參考 nats 官方文件。
NATS Transporter 也支援 Request-response 與 Event-based 訊息模式。下方是範例程式碼,在 AppController
內實作 sayHello
方法並套用 @MessagePattern
裝飾器,以及實作 onCreated
方法並套用 @EventPattern
裝飾器:
import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@MessagePattern({ cmd: 'hello' })
sayHello(data: string) {
console.log(data);
return `Hello, ${data}`;
}
@EventPattern('shop.*.created')
onCreated(item: { name: string; }) {
console.log(item);
}
}
使用 nats-box 來進行測試,透過下方指令發送 Subject 為 shop.order.created
的訊息:
$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created '{"name":"noodle"}'
此時在微服務應用程式的終端機會顯示發送的訊息,因為 Subject 有符合訂閱的篩選條件:
Pattern 設定為可序列化物件的話,要如何透過 nats-box 傳送訊息呢?這部分的處理方式與 Mosquitto Command 相同,將該可序列化物件轉成 JSON 字串即可,以上方程式碼來說,我們要將 { cmd: 'hello' }
轉成字串 '{"cmd":"hello"}'
當作 Subject 名稱:
$ nats request -s nats://<NATS_SERVER_IP>:4222 "{\"cmd\":\"hello\"}" HAO
補充:
request
指令會自動產生inbox
並訂閱,當 Subscriber 透過該inbox
回覆訊息時,終端機會出現該訊息。
此時會發現微服務應用程式的終端機顯示了錯誤訊息,原因是 NATS Transporter 在預設情況下,使用的是 nats
的 JSONCodec
來反序列化訊息的,表示訊息必須要是 可反序列化的字串:
補充:從 NestJS 的原始碼即可得知,針對 NATS 的訊息會使用
nats
的JSONCodec
來反序列化訊息,有興趣的話可以參考這裡。
如果我們期望在發送訊息時使用不可反序列化的字串並且在 Controller
拿到該訊息,如上方範例的操作方式,這時候要如何處理呢?我們需要自訂 Deserializer
來針對訊息做客製化的處理。下方是範例程式碼,新增一個 NatsCustomDeserializer
的 class
並實作 Deserializer
介面:
import { Deserializer } from '@nestjs/microservices';
import { NatsRequestJSONDeserializer } from '@nestjs/microservices/deserializers';
import { StringCodec, JSONCodec } from 'nats';
export class NatsCustomDeserializer implements Deserializer {
// 實例化預設的 Deserializer
private originDeserializer = new NatsRequestJSONDeserializer();
deserialize(value: Uint8Array, options?: Record<string, any>) {
try {
return this.originDeserializer.deserialize(value, options);
} catch (e) {
const CUSTOM_VALUE_PROP_NAME = '__custom_value__';
// 用 `StringCodec` 進行訊息解碼
const decodedValue = StringCodec().decode(value);
// 使用 `JSONCodec` 重新進行編碼
const encodedValue = JSONCodec().encode({ [CUSTOM_VALUE_PROP_NAME]: decodedValue });
const handledRequest = this.originDeserializer.deserialize(encodedValue, options);
const customValue = handledRequest.data[CUSTOM_VALUE_PROP_NAME];
// 覆寫 `data`,改成使用 `__custom_value__` 的資料,並添加 `id` 讓 Request-response 能順利運作
return { ...handledRequest, data: customValue, id: crypto.randomUUID() };
}
}
}
預設情況下是使用 NatsRequestJSONDeserializer
,這邊自訂的 Deserializer
設計思路是基於它來進行例外處理,假如無法成功反序列化,那就改成使用 nats
的 StringCodec
來解碼,為了讓原本的流程可以順利運作,這裡自訂了一個 __custom_value__
的屬性將本來的訊息做一層包裝,再將它帶給 NatsRequestJSONDeserializer
進行處理,處理完畢後,將本來的 data
屬性值進行覆寫,改成使用 data
裡面的 __custom_value__
,這裡需要特別注意,需要在回傳時多帶 id
,這樣 Request-response 才能正常運作。
實作完 NatsCustomDeserializer
後,要在 main.ts
套用它:
// ...
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
deserializer: new NatsCustomDeserializer(),
}
},
);
// ...
}
// ...
此時再次執行上方的指令,就會順利在微服務應用程式的終端機顯示訊息:
而執行 nats-box 的終端機也會顯示收到的回覆訊息:
假如要取得該請求的相關資訊,比如:Subject 名稱,可以透過 @Ctx
裝飾器取得 NatsContext
。下方是範例程式碼:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
NatsContext
} from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('shop.*.created')
onCreated(
@Payload() item: { name: string; },
@Ctx() ctx: NatsContext
) {
console.log(ctx.getSubject());
console.log(item);
}
}
使用 nats-box 來進行測試,透過下方指令發送 Subject 為 shop.order.created
的訊息:
$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created '{"name":"noodle"}'
此時在微服務應用程式的終端機會顯示發送的訊息以及 Subject 名稱:
修改 AppModule
的內容,透過 ClientsModule
建立 NATS Transporter 的 ClientProxy
:
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
// ...
@Module({
// ...
imports: [
ClientsModule.register([
{
name: 'NATS_SERVICE',
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
}
}
])
]
})
export class AppModule {}
NATS Transporter 也支援 Request-response 與 Event-based 訊息模式,所以可以使用 ClientProxy
的 send
與 emit
方法。下方是範例程式碼,修改 AppController
的內容,使用 @Inject
裝飾器注入 ClientProxy
,並設計 getHello
與 onOrderCreated
方法:
import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
@Inject('NATS_SERVICE') private readonly natsService: ClientProxy
) {}
@Get()
getHello() {
return this.natsService.send({ cmd: 'hello' }, 'HAO');
}
@Get('orderCreated')
onOrderCreated() {
return this.natsService.emit('shop.order.created', { name: 'test' });
}
}
透過 Postman 使用 GET
方法存取 http://localhost:3000,會看到下方的結果:
透過 Postman 使用 GET
方法存取 http://localhost:3000/orderCreated,在微服務應用程式的終端機會看到 { name: 'test' }
:
在某些情境下,可能會運用 NATS Header 來傳遞一些資訊,針對這種情況,可以運用 NestJS 設計的 NatsRecordBuilder
來實現。以下方程式碼為例,透過 nats
的 headers
來建立 Header 資訊,這邊我們定義 x-version
為 1
,並經由 NatsRecordBuilder
把訊息與 Header 整理成 Record,最後將它當作資料讓 ClientProxy
來處理:
import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy, NatsRecordBuilder } from '@nestjs/microservices';
import * as nats from 'nats';
@Controller()
export class AppController {
constructor(
@Inject('NATS_SERVICE') private readonly natsService: ClientProxy
) {}
@Get('orderCreated')
onOrderCreated() {
const headers = nats.headers();
headers.set('x-version', '1');
const record = new NatsRecordBuilder({ name: 'test' })
.setHeaders(headers)
.build();
return this.natsService.emit('shop.order.created', record);
}
}
修改微服務應用程式的部分,在 AppController
的 onCreated
方法裡透過 console.log
將 Header 資訊印出:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
NatsContext
} from '@nestjs/microservices';
import { MsgHdrsImpl } from 'nats';
@Controller()
export class AppController {
@EventPattern('shop.*.created')
onCreated(
@Payload() item: { name: string; },
@Ctx() ctx: NatsContext
) {
const headers: MsgHdrsImpl = ctx.getHeaders();
const version = headers.get('x-version');
console.log(version);
}
}
透過 Postman 使用 GET
方法存取 http://localhost:3000/orderCreated,在微服務應用程式的終端機會看到 1
:
NATS 是一套輕量、安全且高效的訊息傳遞系統,它採用 Publish/Subscribe 模式,以 Subject 作為溝通的介面。NATS 不僅提供一般 Publish/Subscribe 的方式傳遞訊息,還提供了 Request/Reply 的機制,甚至還設計了 Queue Groups 來實現負載平衡,如果有需要還可以使用 NATS JetStream 來使用更可靠的 QoS。NATS 提供了一定的彈性讓開發者可以根據不同應用場景來調整設置,整體而言,是一個不錯的訊息傳遞系統。
NestJS 提供了 NATS Transporter 讓我們可以用跟其他 Transporter 一樣的開發風格來使用 NATS,針對要設置 NATS Header 的情況,NestJS 運用 Record Builder 這個角色的定位設計了 NatsRecordBuilder
,讓開發者可以運用它來設置 NATS Header,如此一來,就不會破壞本來發送訊息的方式了。