iT邦幫忙

2024 iThome 鐵人賽

DAY 17
0

RabbitMQ是一個訊息佇列的服務,透過將訊息(DATA)非同步傳輸與建立多個佇列功能,以此功能實現資料的緩衝、訊息的分發以及解偶。

在系統規模大的情況下,適度使用這類服務能緩解系統的壓力。同類的產品還有:ActiveMQ、Kafka等等。

在此我們要提到Queue使用的通訊協定──AMQP,AMQP的主要架構如下:

  1. Producer (生產者、或者說訊息發送者)
  2. Exchange (交換器、透過交換器解偶生產者與消費者的關聯)
  3. Queue (佇列、訊息的存放區域)
  4. Consumer (消費者、或者說訊息接收者)
  5. Channel (通道、生產者與消費者的傳輸管道)
  6. Connection (連線)

基本上RabbitMQ的傳輸如下:

  1. 建立一個Connection(連線)
  2. 在Connection中建立Channel(通道)
  3. 建立一個Exchange做為資料的中繼站,再建立一個routingKey給Exchange辨認傳輸目標,建立好後發送方的準備就已經完成,發送方需要知道的細節並非發送到Consumer而是Exchange,實現了解偶。
  4. Consumer(消費者)想要接收到來自Producer的訊息的話,必須要建立Queue,作為訊息的接收點,再來,因為只有Queue無法連接上,所以會使用Queue與RoutingKey去綁定在Exchange中,這樣一來傳輸才能通。

AMQP 的優點

  • 可靠性高:AMQP在正常情況下、訊息是不會被丟失了,若是HTTP傳輸可能會因為Timeout等問題導致請求遺失,但AMQP 消息的確認機制,確保每條消息都能被正確處理。
  • 靈活的路由機制:交換機和隊列的組合使得消息路由變得非常靈活,適應不同應用場景的需求。
  • 可擴展性:AMQP 協議的架構設計使得它能夠在高並發、高吞吐量的情況下運行,適合於大型分佈式系統、意思是你隨時可以根據請求的多寡調整Queue的數量。

AMQP也未必都是好處,我們當然可以享受在AMQP下解偶的好處,但同時也帶來了一個問題,就是發送方因為未與接收方直接連接,導致其實他無法確認接收方是否真的有收到訊息、Producer只能有確保資料有成功發送到交換器。

而對消費者也是一樣的,Consumer沒有辦法確認Producer是否有發出請求,只能確認沒有從Exchange收到請求而已。

可以參考程式碼:

// 這是接收方的定義
public void sendOrderToQueue(Order order) {
    // 建立 RabbitMQ 連線和 Channel
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    String exchangeName = "order-exchange";
    String routingKey = "order.created";

    // 將訂單數據轉換為字串(可以是 JSON 格式)
    String message = convertOrderToJson(order);
    
    // 發送消息到交換器
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
}

// 這是接收方的定義
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "order-exchange";
    public static final String QUEUE_NAME = "order-queue";
    public static final String ROUTING_KEY = "order.created";

    // 定義 Exchange
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    // 定義 Queue
    @Bean
    public Queue orderQueue() {
        return new Queue(QUEUE_NAME, true); // durable = true
    }

    // 定義 Binding (將 Queue 綁定到 Exchange 上,並使用 Routing Key)
    @Bean
    public Binding binding(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(ROUTING_KEY);
    }
}

// 這是接收到訊息的地方
// 監聽來自 "order-queue" 的消息
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveOrderMessage(String message) {
    // 假設消息是 JSON 格式,這裡可以將它轉回到 Order 對象
    Order order = convertJsonToOrder(message);

    // 處理訂單邏輯
    System.out.println("接收到的訂單: " + order);
}

另外,在Exchange部分,主要有四種發送訊息的方法:

  • Direct Exchange:Exchange根據RoutingKey將訊息發送到指定了Queue。
  • Fanout Exchange:忽視了RoutingKey,發送給Exchange下所有綁定Queue,用來廣播。
  • Topic Exchange:同樣使用RoutingKey進行判斷,只不過使用模糊比對,例如:”order.*”。
  • Headers Exchange:根據消息的 Header 屬性進行路由。

那今天的部分就先到這裡了,我們明天見。


上一篇
[DAY 24] Spring 常見快取種類
下一篇
[DAY 26] Spring RabbitMQ 常見問題
系列文
週日時在做什麼?有沒有空?可以來寫SpringBoot嗎?30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言