RabbitMQ是一個訊息佇列的服務,透過將訊息(DATA)非同步傳輸與建立多個佇列功能,以此功能實現資料的緩衝、訊息的分發以及解偶。
在系統規模大的情況下,適度使用這類服務能緩解系統的壓力。同類的產品還有:ActiveMQ、Kafka等等。
在此我們要提到Queue使用的通訊協定──AMQP,AMQP的主要架構如下:
基本上RabbitMQ的傳輸如下:
AMQP也未必都是好處,我們當然可以享受在AMQP下解偶的好處,但同時也帶來了一個問題,就是發送方因為未與接收方直接連接,導致其實他無法確認接收方是否真的有收到訊息、Producer只能有確保資料有成功發送到交換器。
而對消費者也是一樣的,Consumer沒有辦法確認Producer是否有發出請求,只能確認沒有從Exchange收到請求而已。
可以參考程式碼:
// 這是接收方的定義
public void sendOrderToQueue(Order order) {
// 建立 RabbitMQ 連線和 Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "order-exchange";
String routingKey = "order.created";
// 將訂單數據轉換為字串(可以是 JSON 格式)
String message = convertOrderToJson(order);
// 發送消息到交換器
channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
}
// 這是接收方的定義
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "order-exchange";
public static final String QUEUE_NAME = "order-queue";
public static final String ROUTING_KEY = "order.created";
// 定義 Exchange
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
// 定義 Queue
@Bean
public Queue orderQueue() {
return new Queue(QUEUE_NAME, true); // durable = true
}
// 定義 Binding (將 Queue 綁定到 Exchange 上,並使用 Routing Key)
@Bean
public Binding binding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with(ROUTING_KEY);
}
}
// 這是接收到訊息的地方
// 監聽來自 "order-queue" 的消息
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveOrderMessage(String message) {
// 假設消息是 JSON 格式,這裡可以將它轉回到 Order 對象
Order order = convertJsonToOrder(message);
// 處理訂單邏輯
System.out.println("接收到的訂單: " + order);
}
另外,在Exchange部分,主要有四種發送訊息的方法:
那今天的部分就先到這裡了,我們明天見。