MQTT 是一個基於 TCP/IP、極輕量的傳輸協定,具有可靠性高、擴展性高、占用頻寬小等特性,主要用於 物聯網(Internet of Things, IoT) 設備的訊息傳遞。
MQTT 採用 Publish/Subscribe 機制來交換訊息,會有 Subscriber、Publisher 與 Message Broker 這三個角色,只要 Subscriber 訂閱 Publisher 指定的 主題(Topic),當 Publisher 發送了一則該 Topic 的訊息時,Message Broker 會將訊息轉發給 Subscriber。
MQTT Topic 是 Publisher 與 Subscriber 溝通的介面,格式為 UTF-8 編碼的字串,需特別注意,總長度不得超過 65535 位元。它有一項特色是 階層(Level) 的概念,使用 斜線(/) 來表示資訊層級。下方是幾個 MQTT Topic 範例:
home/bedroom1/light
home/bedroom1/tv
home/bedroom2/audio
home/bedroom2/light
MQTT Topic 的 Level 設計是有意義的,試想如果 Subscriber 要訂閱某個資源下的所有資源,不太可能逐一訂閱,以上方 MQTT Topic 為例,如果要訂閱 home/bedroom1
下所有設備的資訊,就要訂閱 home/bedroom1/light
與 home/bedroom1/tv
,那如果home/bedroom1
下有上百種設備,用這種逐一訂閱的方式就顯得不切實際,所以 MQTT Topic 針對 Subscriber 設計了一些 萬用字元(Wildcards),讓 Subscriber 可以透過它們來減少逐一訂閱的成本。下方是使用 Whidcards 的 Topic:
home/+/light
home/bedroom1/#
會發現有 +
與 #
兩種特殊符號,它們的用途如下:
+
:該字元的階層可以是任何字元,以 home/+/light
來說,home/bedroom1/light
和 home/bedroom2/light
都符合篩選條件。#
:該字元的階層與下方所有階層可以是任何字元,以 home/bedroom1/#
來說,home/bedroom1/
和 home/bedroom1/tv
都符合篩選條件。MQTT 定義了三種 服務品質(Quality of Service) 來根據不同環境、資訊重要程度等因素調整傳輸品質,分別是 QoS0、QoS1 以及 QoS2。
QoS0 的品質下,訊息 最多傳輸一次,並不保證訊息會送達,這樣的好處是不用花費資源去檢查是否有正確送達,所以是三種品質中 最快的;但壞處就是遇到網路不穩、斷線等情況也不會因此重新發送訊息,造成 訊息遺失。
QoS1 的品質下,訊息 至少傳輸一次,保證訊息會送達,那是怎麼做到保證送達的呢?以 Publisher 與 Message Broker 來說,在 Publisher 送出訊息後,會等待 Message Broker 回覆以確認該訊息有正確送達,假如 Publisher 遲遲收不到回覆的話,將會再傳送一次,但這也就衍伸了 重複發送 的問題,因為無法保證 Message Broker 回覆的時候網路是穩定的。
QoS2 的品質下,訊息 確實傳輸一次,保證訊息會送達且不會重複發送,是三種品質中 最安全的,那是怎麼做到的呢?同樣以 Publisher 與 Message Broker 來說,在 Publisher 送出訊息後,會等待 Message Broker 回覆以確認訊息有正確送達,當 Publisher 收到回覆時,會再將 釋放訊息(Publish Release, PUBREL) 發送給 Message Broker,當 Message Broker 收到後,會將訊息發送給 Subscriber,並發送 發佈完成訊息(Publish Complete, PUBCOMP) 給 Publisher,此時 Publisher 會將暫存的訊息清除,完成整個傳輸過程。可以看到這整個過程需要較多的傳輸步驟,所以它是三種品質中 最慢的。
市面上有許多 MQTT Message Broker,像是:Mosquitto、EMQX、NanoMQ等,每一種都有其特點,在生產環境下可以選擇較適合的解決方案。這邊我們選擇使用由 Eclipse 基金會維護的 Mosquitto,它的特色是非常輕量、可以跨平台使用且很容易上手,非常適合作為入門的 MQTT Message Broker。
補充:此篇教學並不會很詳細解說 Mosquitto 的細節,主要只是使用 Mosquitto 當作 MQTT Message Broker,讓我們可以基於它來完成後續的教學,並從中實際體驗 MQTT 的運作方式,如果對 Mosquitto 有興趣的話,可以參考 官方文件,或是線上其他資源。
在終端機輸入下方指令以便從 Docker Hub 下載 Mosquitto 的 Docker Image:
$ docker pull eclipse-mosquitto
為了方便我們體驗 Mosquitto 的基礎用法,就不針對驗證的部分做處理,但 Mosquitto 在第二版之後預設不允許匿名連線,所以需要將相關參數打開,透過 Docker 相關指令可以讓我們抽換內部檔案,進而調整設定內容,先在本機產生名稱為 mosquitto.conf
的設定檔:
$ mkdir <FOLDER>
$ touch <FOLDER>/mosquitto.conf
接著,調整 mosquitto.conf
的內容,將 allow_anonymous
設為 true
,並設定 listener
為 1883
,表示要將服務掛在 1883
port,最後再將 protocol
設定為 mqtt
:
allow_anonymous true
listener 1883
protocol mqtt
透過下方指令將 Mosquitto 架設在 1883
port,並指定要抽換的檔案路徑:
$ docker run --name <NAME> -p 1883:1883 -v /<FOLDER>:/mosquitto/config -d eclipse-mosquitto
接著,我們可以進入該 Container 下指令,進而使用 Mosquitto 的相關指令來操作 Mosquitto。透過下方指令執行 Container 內的 shell:
$ docker exec -it <NAME> sh
如此一來,便可以透過 Mosquitto Command 來操作 Mosquitto。透過下方指令可以顯示幫助清單:
$ mosquitto -h
在讓兩個服務透過該機制進行交換訊息之前,可以先試著用 Mosquitto Command 來操作,首先,打開兩個終端機並進入 Container 中的 shell,在其中一個終端機輸入下方指令來訂閱名稱為 home/bedroom1/light
的 Topic:
$ mosquitto_sub -v -t home/bedroom1/light
補充:
-v
的用途是顯示所有詳細資訊,-t
即 Topic。
接著,透過另一個終端機輸入下方指令發送 Topic 為 home/bedroom1/light
的訊息:
$ mosquitto_pub -t home/bedroom1/light -m "Hello World"
此時 Subscriber 會收到訊息:
接著,來測試一下使用 Wildcards 進行訂閱,透過下方指令訂閱符合 home/+/light
的 Topic:
$ mosquitto_sub -v -t home/+/light
此時發送 Topic 為 home/bedroom2/light
的訊息:
$ mosquitto_pub -t home/bedroom2/light -m "Hello World"
Subscriber 會順利收到發送的訊息:
再來測試一下另一種 Wildcards,透過下方指令訂閱符合 home/bedroom1/#
的 Topic:
$ mosquitto_sub -v -t home/bedroom1/#
此時發送 Topic 為 home/bedroom1/light
的訊息:
$ mosquitto_pub -t home/bedroom1/light -m "Hello World"
Subscriber 會順利收到發送的訊息:
NestJS 實作了 MQTT Transporter,讓微服務應用程式可以用跟其他 Transporter 一樣的開發風格來使用 MQTT,如果有物聯網需求或是在網路較嚴苛的情況下,使用 MQTT 會是不錯的選擇。
要使用 MQTT Transporter 之前,需要先安裝下方套件:
$ npm install mqtt
補充:mqtt 是一套用於 Node.js 或瀏覽器的 MQTT 客戶端,有興趣可以參考 官方文件。
修改載入點 main.ts
的內容,將 transport
設定為 Transport.MQTT
,並根據架設的 MQTT Message Broker 位址來設定 url
的資訊,url
會以 mqtt://
開頭:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
}
},
);
await app.listen();
}
bootstrap();
上方範例在 options
只使用了 url
,事實上,MQTT Transporter 的 options
即 mqtt
的 options
,有興趣的話可以參考 mqtt 官方文件。
MQTT Transporter 也支援 Request-response 與 Event-based 訊息模式。下方是範例程式碼,在 AppController
內實作 sayHello
方法並套用 @MessagePattern
裝飾器,以及實作 listenDeviceEvent
方法並套用 @EventPattern
裝飾器:
import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@MessagePattern({ cmd: 'hello' })
sayHello(data: string) {
console.log(data);
return `Hello, ${data}`;
}
@EventPattern('home/+/devices/#')
listenDeviceEvent(device: { name: string; }) {
console.log(device);
}
}
使用 Mosquitto Command 來進行測試,透過下方指令發送 Topic 為 home/bedroom1/devices/light
的訊息:
$ mosquitto_pub -t home/bedroom1/devices/light -m '{ "name": "led" }'
此時在微服務應用程式的終端機會顯示發送的訊息,因為 Topic 有符合訂閱的篩選條件:
Pattern 設定為可序列化物件的話,要如何透過 Mosquitto Command 來發送訊息呢?這部分的處理方式跟 Redis CLI 是相同的,將該可序列化物件轉成 JSON 字串即可,以上方程式碼來說,我們要將 { cmd: 'hello' }
轉成字串 '{"cmd":"hello"}'
當作 Topic 名稱:
$ mosquitto_pub -t '{"cmd":"hello"}' -m "Hello World"
此時在微服務應用程式的終端機會顯示發送的訊息:
假如要取得該請求的相關資訊,比如:Topic 名稱,可以透過 @Ctx
裝飾器取得 MqttContext
。下方是範例程式碼:
import { Controller } from '@nestjs/common';
import {
Ctx,
EventPattern,
MessagePattern,
Payload,
MqttContext
} from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('home/+/devices/#')
listenDeviceEvent(
@Payload() device: { name: string },
@Ctx() ctx: MqttContext
) {
console.log(ctx.getTopic());
console.log(device);
}
}
使用 Mosquitto Command 進行測試,透過下方指令發送 Topic 為 home/bedroom1/devices/light
的訊息:
$ mosquitto_pub -t home/bedroom1/devices/light -m '{ "name": "led" }'
此時在微服務應用程式的終端機會顯示發送的訊息以及 Topic 名稱:
修改 AppModule
的內容,透過 ClientsModule
建立 MQTT Transporter 的 ClientProxy
:
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
// ...
@Module({
// ...
imports: [
ClientsModule.register([
{
name: 'MQTT_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
}
}
])
]
})
export class AppModule {}
MQTT Transporter 也支援 Request-response 與 Event-based 訊息模式,所以可以使用 ClientProxy
的 send
與 emit
方法。下方是範例程式碼,修改 AppController
的內容,使用 @Inject
裝飾器注入 ClientProxy
,並設計 getHello
與 emitDeviceEvent
方法:
import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
@Inject('MQTT_SERVICE') private readonly mqttService: ClientProxy
) {}
@Get()
getHello() {
return this.mqttService.send({ cmd: 'hello' }, 'HAO');
}
@Get('emitDeviceEvent')
emitDeviceEvent() {
return this.mqttService.emit('home/bedroom1/devices/light', { name: 'led' });
}
}
透過 Postman 使用 GET
方法存取 http://localhost:3000,會看到下方的結果:
透過 Postman 使用 GET
方法存取 http://localhost:3000/emitDeviceEvent,在微服務應用程式的終端機會看到 { name: 'led' }
:
在某些情境下,可能會需要根據傳遞的訊息內容來調整相關配置,如:QoS、額外資訊等,針對這種情況,NestJS 設計了 Record Builder 這個角色,讓我們可以針對不同種 Transporter 來建立適合該 Transporter 的配置。以下方程式碼為例,透過 MqttRecordBuilder
設置 QoS 為 2
,並將建立好的 Record 當作資料讓 ClientProxy
來處理:
import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
@Inject('MQTT_SERVICE') private readonly mqttService: ClientProxy
) {}
@Get()
getHello() {
const record = new MqttRecordBuilder('HAO')
.setQoS(2)
.build();
return this.mqttService.send({ cmd: 'hello' }, record);
}
}
由於 MQTT 採用 Publish/Subscribe 機制,故需要透過兩個 Topic 來完成 Request-response 的運作,NestJS 會讓微服務應用程式訂閱指定 Topic 名稱的訊息,比如:devices/light
,在回覆訊息時,會傳送 Topic 為 devices/light/reply
的訊息,NestJS 客戶端會訂閱該 Topic 以獲取回應。
補充:針對 Publish/Subscribe 機制,NestJS 都是採用相同的概念來處理 Request-response,差異只在於實現的方法,像上一篇 Redis Pub/Sub 是用兩個通道來實現的,而 MQTT 則是兩個 Topic。
MQTT 是一種輕量且可靠的傳輸協定,它採用 Publish/Subscribe 模式,包括 Publisher、Subscriber 和 Message Broker 三個角色,以 Topic 作為溝通的介面。Topic 支援階層結構,並且可以使用 +
與 #
來簡化訂閱的方式。MQTT 還定義了三種 QoS 等級,以滿足不同環境下的需求。
NestJS 提供了 MQTT Transporter 讓我們可以輕鬆地在微服務架構中使用 MQTT 來進行通訊,並且與其他 Transporter 有相同的開發風格,縱使轉換成其他的 Transporter 也不會有太大的改動。針對不同訊息採用不同配置的情況,NestJS 設計了 Record Builder 這個角色把特定的配置方式封裝起來,盡可能保持一致的開發體驗,以 MQTT 來說,可以透過 MqttRecordBuilder
來調整 MQTT 相關的配置,如:QoS。