為了使節點 (ServiceBrokers) 能夠互相通訊連結,需要設置一個 Transporter
來處理通訊傳輸事件。Moleculer 支援常見的 Transporter 來作為中央的訊息 Broker,可以透過發布/訂閱的方式進行訊息傳遞,提供遠端節點一個可靠的資料交換方式。
Fig. 1. Message Broker
微服務通常會建立在多個節點,因此 Transporter 模組就顯得相當重要。當 Transporter 與其它節點進行通訊的時,它會傳輸事件、請求呼叫及響應處理等。若節點實例在多個不同節點上運作時,它將會根據規則進行節點間的負載平衡。由於運算邏輯與 Transporter 模組是分開的,因此 Transporter 是可替換的,它不會影響到程式碼的邏輯開發。
以下將介紹 Moleculer 框架中內建的 Transporter,其中細部設定筆者未測試或確認過的部分,本文將暫時保留官方提供的範例註解說明。
TCP 是一個無需相依套件,不須設定的 Transporter 。它使用 Gossip 協定[2] 來傳播節點狀態、服務清單及 Heartbeat 機制。它包含了一個整合的 UDP 探索功能,可以用來偵測網路上新增的節點或是離線的節點。
範例:快速使用
moleculer.config.js
module.exports = {
transporter: "TCP"
};
範例:完整的選項預設值
moleculer.config.js
module.exports = {
logger: true,
transporter: {
type: "TCP",
options: {
// 啟用 UDP 探索
udpDiscovery: true,
// Reusing UDP server socket
udpReuseAddr: true,
// UDP port
udpPort: 4445,
// UDP bind address (if null, bind on all interfaces)
udpBindAddress: null,
// UDP 發送間隔 (秒)
udpPeriod: 30,
// Multicast address.
udpMulticast: "239.0.0.0",
// Multicast TTL setting
udpMulticastTTL: 1,
// Send broadcast (Boolean, String, Array<String>)
udpBroadcast: false,
// TCP server port. Null or 0 means random port
port: null,
// Static remote nodes address list (when UDP discovery is not available)
urls: null,
// Use hostname as preffered connection address
useHostname: true,
// Gossip sending period in seconds
gossipPeriod: 2,
// Maximum enabled outgoing connections. If reach, close the old connections
maxConnections: 32,
// Maximum TCP packet size
maxPacketSize: 1 * 1024 * 1024
}
}
};
範例:陣列清單方式設定靜態節點列表
如果你的網路環境不允許使用 UDP 的話,請在 urls
選項提供節點清單(Host/IP 、連接埠、節點 ID)。
moleculer.config.js
module.exports = {
nodeID: "node-1",
logger: true,
transporter: {
type: "TCP",
options: {
udpDiscovery: false,
urls: [
"172.17.0.1:6000/node-1",
"172.17.0.2:6000/node-2",
"172.17.0.3:6000/node-3"
],
}
}
};
範例:逗號分隔方式設定節點列表,注意開頭需要加上 tcp://
moleculer.config.js
module.exports = {
nodeID: "node-1",
transporter: "tcp://172.17.0.1:6000/node-1,172.17.0.2:6000/node-2,172.17.0.3:6000/node-3"
};
範例:使用 JSON 檔案設定節點列表
moleculer.config.js
module.exports = {
nodeID: "node-1",
transporter: "file://./nodes.json"
};
nodes.json
[
"127.0.0.1:6001/client-1",
"127.0.0.1:7001/server-1",
"127.0.0.1:7002/server-2"
]
注意,不需要所有的遠端節點,只要至少有一個節點上線即可。例如創建一個無服務的節點,它僅作為 gossiper 而不做任何事,只是為了使用 gossip 協定分享其它節點的地址,但所有的節點都必須知道 gossiper 節點的地址,才能夠與其它所有節點進行通訊。
NATS 是一個敏捷、高效又彈性的開源訊息傳遞系統,適合用於邊緣運算、雲原生、混合部署的服務架構[3] 。
使用前請安裝 NATS 套件
npm install nats --save
。
範例:快速使用,預設連線為 nats://localhost:4222
moleculer.config.js
module.exports = {
transporter: "NATS"
};
範例:連線到遠端 NATS 服務器
moleculer.config.js
module.exports = {
transporter: "nats://user:pass@nats-server:4222"
};
範例:選項設定方式
moleculer.config.js
module.exports = {
transporter: {
type: "NATS",
options: {
servers: ["nats://localhost:4222"],
user: "admin",
pass: "password"
}
}
};
範例: TLS 設定方式
參數請參閱: https://github.com/nats-io/nats.js#tlsoptions
moleculer.config.js
const fs = require("fs");
module.exports = {
transporter: {
type: "NATS",
options: {
servers: ["nats://localhost:4222"]
tls: {
key: fs.readFileSync('./client-key.pem'),
cert: fs.readFileSync('./client-cert.pem'),
ca: [ fs.readFileSync('./ca.pem') ]
}
}
}
};
Redis 是一個開源的訊息 Broker[4] 。
使用前請安裝 ioredis 套件
npm install ioredis --save
。
範例:快速使用,預設連線為 redis://localhost:6379
moleculer.config.js
module.exports = {
transporter: "Redis"
};
範例:連線到 Redis 服務器
moleculer.config.js
module.exports = {
transporter: "redis://localhost:6379"
};
範例:安全連線到 Redis 服務器
moleculer.config.js
module.exports = {
transporter: "rediss://localhost:6379"
};
範例:選項設定方式
moleculer.config.js
module.exports = {
transporter: {
type: "Redis",
options: {
host: "redis-server",
db: 0
}
}
};
範例:Redis 多個節點集群
moleculer.config.js
module.exports = {
transporter: {
type: "Redis",
options: {
cluster: {
nodes: [
{ host: "127.0.0.1", port: 6380 },
{ host: "127.0.0.1", port: 6381 }
]
}
}
}
};
MQTT 是一個物聯網訊息傳輸標準[5] 。例如使用 Eclipse Mosquitto 這套輕量級的開源 MQTT 訊息 Broker [6] 。
使用前請安裝 MQTT 套件
npm install mqtt --save
。
範例:快速使用,預設連線為 mqtt://localhost:1883
moleculer.config.js
module.exports = {
transporter: "MQTT"
};
範例:連線到遠端 MQTT 服務器
moleculer.config.js
module.exports = {
transporter: "mqtt://mqtt-server:1883"
};
範例:安全連線到遠端 MQTT 服務器
moleculer.config.js
module.exports = {
transporter: "mqtts://mqtt-server:1883"
};
範例:選項設定方式
moleculer.config.js
module.exports = {
transporter: {
type: "MQTT",
options: {
host: "mqtt-server",
port: 1883,
// QoS 等級,詳情請參閱: https://github.com/mqttjs/MQTT.js#about-qos
qos: 0,
// 發送主題分割符號
topicSeparator: "."
}
}
};
AMQP 0.9 是一個訊息傳輸標準[7] 。例如使用 RabbitMQ [8]。
使用前請安裝 amqplib 套件
npm install amqplib --save
。
範例:快速使用,預設連線為 amqp://guest:guest@localhost:5672
moleculer.config.js
module.exports = {
transporter: "AMQP"
});
範例:連線到遠端 AMQP 服務器
moleculer.config.js
module.exports = {
transporter: "amqp://rabbitmq-server:5672"
};
範例:安全連線到遠端 AMQP 服務器
moleculer.config.js
module.exports = {
transporter: "amqps://rabbitmq-server:5672"
};
範例:選項設定方式
moleculer.config.js
module.exports = {
transporter: {
type: "AMQP",
options: {
url: "amqp://user:pass@rabbitmq-server:5672",
eventTimeToLive: 5000,
prefetch: 1,
socketOptions: {
servername: process.env.RABBIT_SERVER_NAME
}
// 設定後隊列若在 2 分鐘內(預設值)沒有任何客戶端連線將會被自動刪除
autoDeleteQueues: true
}
}
};
AMQP 1.0 是一個訊息傳輸標準[7] 。例如使用 ActiveMq[9] 或 RabbitMQ [8] + rabbitmq_amqp1_0 Plugin[10] 。
注意,這是一個實驗性 Transporter ,請不要在生產環境使用它!
使用前請安裝 rhea-promise 套件
npm install rhea-promise --save
。
範例:快速使用,預設連線為 amqp10://guest:guest@localhost:5672
moleculer.config.js
module.exports = {
transporter: "AMQP10"
});
範例:連線到遠端 AMQP 服務器
moleculer.config.js
module.exports = {
transporter: "amqp10://activemq-server:5672"
};
範例:選項設定方式
moleculer.config.js
module.exports = {
transporter: {
url: "amqp10://user:pass@activemq-server:5672",
eventTimeToLive: 5000,
heartbeatTimeToLive: 5000,
connectionOptions: { // rhea connection options https://github.com/amqp/rhea#connectoptions, example:
ca: "", // (if using tls)
servername: "", // (if using tls)
key: "", // (if using tls with client auth)
cert: "" // (if using tls with client auth)
},
queueOptions: {}, // rhea queue options https://github.com/amqp/rhea#open_receiveraddressoptions
topicOptions: {}, // rhea queue options https://github.com/amqp/rhea#open_receiveraddressoptions
messageOptions: {}, // rhea message specific options https://github.com/amqp/rhea#message
topicPrefix: "topic://", // RabbitMq uses '/topic/' instead, 'topic://' is more common
prefetch: 1
}
};
Kafka 是一個開源的分散式事件串流平台[11] 。
使用前請安裝 kafka-node 套件
npm install kafka-node --save
。
範例:連線至 Kafka 。
由於 Kafka 3.3 版以上會將 KRaft 模式標記為可生產使用,也預計在 3.4 版棄用 ZooKeeper 模式,因此你也可以提早試試不依賴 ZooKeeper 的新模式。
moleculer.config.js
module.exports = {
transporter: "kafka://192.168.51.29:2181"
};
範例:選項設定方式
moleculer.config.js
module.exports = {
transporter: {
type: "kafka",
options: {
host: "192.168.51.29:2181",
// KafkaClient 選項,請參閱: https://github.com/SOHU-Co/kafka-node#kafkaclient
client: {
zkOptions: undefined,
noAckBatchOptions: undefined,
sslOptions: undefined
},
// KafkaProducer 選項,請參閱: https://github.com/SOHU-Co/kafka-node#producerkafkaclient-options-custompartitioner
producer: {},
customPartitioner: undefined,
// ConsumerGroup 選項,請參閱: https://github.com/SOHU-Co/kafka-node#consumergroupoptions-topics
consumer: {
},
// 進階 `send` 選項,請參閱: https://github.com/SOHU-Co/kafka-node#sendpayloads-cb
publish: {
partition: 0,
attributes: 0
}
}
}
};
由於官方將棄用此方法,筆者就不做介紹了。
https://docs.nats.io/legacy/stan/intro
你也可以建立客製化的 Transporter 模組,官方建議可以參考 NATS Transporter 的原始碼[13] 來修改,再實作 connect
、 disconnect
、 subscribe
、 send
方法。
範例:建立客製化的 Transporter
my-transporter.js
const BaseTransporter = require("moleculer").Transporters.Base;
class MyTransporter extends BaseTransporter {
connect() { /*...*/ }
disconnect() { /*...*/ }
subscribe() { /*...*/ }
send() { /*...*/ }
}
module.exports = MyTransporter;
範例:使用客製化的 Transporter
moleculer.config.js
const MyTransporter = require("./my-transporter");
module.exports = {
transporter: new MyTransporter()
};
由於一些 Transporter 已內建了平衡器的解決方案 (例如 RabbitMQ
、 NATS
),因此你可以設定 disableBalancer: true
來禁用 Moleculer 的平衡器。
moleculer.config.js
module.exports = {
disableBalancer: true,
transporter: "nats://some-server:4222"
};
注意,如果禁用了 Moleculer 內建的平衡器方案,所有的請求事件都將會透過 Transporter 傳輸,包含
本地服務
請求也會透過 Transporter 發送。
Transporter 需要一個序列化模組來 序列化
與 反序列化
欲傳輸的資料封包, Moleculer 內建了許多序列化器,其中預設的序列化器是 JSON
。
範例:JSON 序列化器
moleculer.config.js
module.exports = {
nodeID: "server-1",
transporter: "NATS",
serializer: "JSON"
};
範例:Avro 序列化器[14]
使用前請安裝 avsc 套件
npm install avsc --save
。
moleculer.config.js
module.exports = {
serializer: "Avro"
};
範例:MsgPack 序列化器[15]
使用前請安裝 msgpack5 套件
npm install msgpack5 --save
。
moleculer.config.js
module.exports = {
serializer: "MsgPack"
};
範例:Notepack 序列化器[16]
使用前請安裝 notepack 套件
npm install notepack.io --save
。
moleculer.config.js
module.exports = {
serializer: "Notepack"
};
範例:ProtoBuf 序列化器[17]
使用前請安裝 protobufjs 套件
npm install protobufjs --save
。
moleculer.config.js
module.exports = {
serializer: "ProtoBuf"
};
範例:Thrift 序列化器[18]
使用前請安裝 Thrift 套件
npm install thrift --save
。
moleculer.config.js
module.exports = {
serializer: "Thrift"
};
範例:CBOR 序列化器[19]
CBOR (cbor-x) 是一個非常快的序列化器。
使用前請安裝 cbor-x 套件
npm install cbor-x --save
。
moleculer.config.js
module.exports = {
serializer: "CBOR"
};
你可以建立客製化序列化模組,官方建議可以參考 JSON 序列化器的原始碼[20] 來修改,再實作 serialize
、 deserialize
方法。
範例:建立客製化的序列化器
my-serializer
const BaseSerializer = require("moleculer").Serializers.Base;
class MySerializer extends BaseSerializer {
serialize(obj, type) { /*...*/ }
deserialize(buf, type) { /*...*/ }
}
module.exports = MySerializer;
範例:使用客製化的序列化器
moleculer.config.js
const MySerializer = require("./my-serializer");
module.exports = {
serializer: new MySerializer()
};
[1] Networking, https://moleculer.services/docs/0.14/networking.html
[2] Gossip protocol, https://en.wikipedia.org/wiki/Gossip_protocol
[3] NATS, https://nats.io/
[4] Redis, https://redis.io/
[5] MQTT, https://mqtt.org/
[6] Eclipse Mosquitto, https://mosquitto.org/
[7] AMQP, https://www.amqp.org/
[8] RabbitMQ, https://www.rabbitmq.com/
[9] ActiveMq, https://activemq.apache.org/
[10] AMQP 1.0 support for RabbitMQ, https://github.com/rabbitmq/rabbitmq-amqp1.0
[11] Apache Kafka, https://kafka.apache.org/
[12] STAN, https://docs.nats.io/legacy/stan/intro
[13] Moleculer NATS Transporter, https://github.com/moleculerjs/moleculer/blob/master/src/transporters/nats.js
[14] Avro, https://github.com/mtth/avsc
[15] MsgPack, https://github.com/mcollina/msgpack5
[16] notepack, https://github.com/darrachequesne/notepack
[17] ProtoBuf, https://developers.google.com/protocol-buffers/
[18] Apache Thrift, https://thrift.apache.org/
[19] cbor-x, https://github.com/kriszyp/cbor-x
[20] JSON serializer, https://github.com/moleculerjs/moleculer/blob/master/src/serializers/json.js