iT邦幫忙

2024 iThome 鐵人賽

DAY 17
0

本文會針對一些RabbitMQ的特性進行更多討論,主題如下:

  • Delay-Queue 延遲佇列
  • RabbitMQ 的資料遺失問題
  • RabbitMQ Transaction
  • RabbitMQ ACK
  • RabbitMQ Confirm
  • DLX Dead-Letter-Exchange

Delay-Queue 延遲佇列 是什麼?怎麼做的?

今天我們可以假設一個情境,假設我們是一個電商網站,當消費者交易完成後,為了處理交易上的繁瑣事宜、結帳、開立發票、稅金巴拉巴拉的,我們希望這筆交易明細是在明天才發出的。除了使用排程處理,也可以發送延遲佇列去實現。

運作上,生產者可以在當下發出Delay Queue給Exchange,而Exchange負責管理要在甚麼時候將資訊送給Consumer,這段時間訊息會被暫放在Exchange之中,等到明天才寄送給消費者。

這個功能只要安裝一個額外的RabbitMQ Plugin就可以實現了、程式的設定如下。

    // 定義 Delayed Exchange,使用插件提供的 x-delayed-message 類型
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");  // 延遲消息基於 direct 路由
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }

上一輪我們提到AMQP的解偶導致了傳輸可能遺失的問題,那在AMQP中主要遺失的狀況有

  1. Producer 到 RabbitMQ Exchange 遺失
    • 未實作Publish Confirm
    • 訊息被 RabbitMQ Exchange 拒絕、訊息超過可接受長度或者 TTL (Time to Live) 設定有誤,導致過期。
  2. RabbitMQ Exchange訊息遺失,這邊的比較少發生
  3. RabbitMQ Exchange送到 Consumer端遺失
    • 消費者在處理訊息後未回覆確認消息給RabbitMQ,而RabbitMQ若沒有重新發送給其他的Listener的話就會消失。

RabbitMQ 的 Transaction (Producer)

RabbitMQ有實作Transaction用來確保訊息處理的可靠性與一致性,他可以用來確保訊息正確無誤的發送到Broker(RabbitMQ的服務名稱),並在發生失敗的情況下Rollback,提供了Atomic原子性的特點。主要的流程是 開啟交易 > 發送訊息 > 提交交易,如果沒問題,就被確認。 > 有問題就rollback。程式碼如下:

// 開啟交易
channel.txSelect();
try {
    String exchangeName = "my-exchange";
    String routingKey = "my-routing-key";
    String message = "Hello, RabbitMQ!";

    // 發送消息
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());

    // 提交交易
    channel.txCommit();
} catch (Exception e) {
    // 如果有錯誤,回滾交易
    channel.txRollback();
    e.printStackTrace();
}

雖然RabbitMQ Transaction可確保訊息的一致性,但要謹慎使用。Transasction 會帶來性能成本,因此應僅在必要時使用它們。此外,事務應盡可能小,以盡量減少它們對性能的影響。

RabbitMQ Confirm (Producer)

RabbitMQ中,針對Producer發送到Broker的訊息確認,稱呼為Confirm,預設是關閉的,開啟後我們就可以確保一個訊息在Broker被收到之後,Publish可以收到這道訊息的Confirm。實際上可以參考程式碼

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    // 開啟 Confirm 模式
    channel.confirmSelect();

    String message = "Hello, RabbitMQ with Confirm!";
    
    // 發送消息
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8"));
    System.out.println("Sent: " + message);

    // 確認消息
    if (channel.waitForConfirms()) {
        System.out.println("Message was confirmed by the broker.");
    } else {
        System.out.println("Message was not confirmed by the broker.");
    }
} catch (Exception e) {
    e.printStackTrace();
}

Producer可以透過兩種方法來確認訊息是否有發送到Broker

  1. ConfirmCallBack()用來確認訊息是否成功發送到Exchange,異步方法。
  2. ReturnCallback()用來確認是否資料有抵達Broken但沒有對應的Queue
  3. waitForConfirms() 用來等待Broker的確認,同步方法。

RabbitMQ ACK (Consumer acknowledgement)

ACK機制用來確認訊息從Broker到Consumer的是否正確被消費,通常在Consumer處理一道訊息完成後,會內建觸發一個basic.ack()的方法,告知Broker此筆訊息已經被被正確處理。

假設回復異常、或是未收到ACK就會Broker進行錯誤處理,Consumer的ACK主要有三種模式:

  • AcknowledgeMode.NONE:自動確認模式,Broker不等待Consumer回復確認,直接將訊息從Queue中移除。可能導致訊息遺失。
  • AcknowledgeMode.MANUAL:手動確認模式,在每次訊息處理完成後,Consumer必須要回復Broker一個basic.ack()訊息,告知Broker這筆訊息已經成功消費,這樣Broker才會將訊息從Queue中移除。
    這個模式下提供三種方法
    • basic.ack() 用來回覆Broker訊息成功消費
    • basic.nack() 用來回覆Broker訊息處理失敗,會觸發重新發送。
    • basic.reject() 用來回覆Broker訊息處理被拒絕,不會觸發重新發送。
  • AcknowledgeMode.AUTO:自動處理,是預設值。由RabbitMQ自動判斷與訊息確認。

這個可以透過SpringBoot的參數設定

spring.rabbitmq.listener.simple.acknowledge-mode=(manul,auto,none)

Dead Letter Exchange DLX

DLX 是指一個訊息在Queue中變成了Dead Message之後,他可以被重新發送道另外一個Exchage之中,這個Exchange就是一個DLX。Dead Message會發生的原因如下:

  1. 訊息的TTL 已經到期,死在Queue裡面。
  2. Consumer手動確認情況下,觸發了basic.nack()或是basic.reject()方法,且不重寄送 ( requeue=false)
  3. Queue被塞到滿了。

如果要定義一個DLX的話,要在RoutingKey設定

// 設置主佇列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "my-dlx-exchange");
args.put("x-dead-letter-routing-key", "dlx-routing-key");
channel.queueDeclare("my-queue", true, false, false, args);

// 設置 DLX 佇列
channel.queueDeclare("my-dlx-queue", true, false, false, null);
channel.exchangeDeclare("my-dlx-exchange", "direct");
channel.queueBind("my-dlx-queue", "my-dlx-exchange", "dlx-routing-key");

那麼今天的部分就到這邊吧,我們明天見。

參考資源:

https://www.cnblogs.com/qiyongchu/p/15656853.html

https://yoziming.github.io/post/220204-gulimall-18-rabbitmq/

RabbitMQ的confirm、ack、transaction三个概念的理解_mq ack是什么意思-CSDN博客

死信交換 | RabbitMQ 繁體中文 (dev.org.tw)


上一篇
[DAY 25] SpringBoot 簡易介紹RabbitMQ
下一篇
[DAY 27] Spring Kafka 使用教學
系列文
週日時在做什麼?有沒有空?可以來寫SpringBoot嗎?30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言