本文會針對一些RabbitMQ的特性進行更多討論,主題如下:
今天我們可以假設一個情境,假設我們是一個電商網站,當消費者交易完成後,為了處理交易上的繁瑣事宜、結帳、開立發票、稅金巴拉巴拉的,我們希望這筆交易明細是在明天才發出的。除了使用排程處理,也可以發送延遲佇列去實現。
運作上,生產者可以在當下發出Delay Queue給Exchange,而Exchange負責管理要在甚麼時候將資訊送給Consumer,這段時間訊息會被暫放在Exchange之中,等到明天才寄送給消費者。
這個功能只要安裝一個額外的RabbitMQ Plugin就可以實現了、程式的設定如下。
// 定義 Delayed Exchange,使用插件提供的 x-delayed-message 類型
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 延遲消息基於 direct 路由
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
上一輪我們提到AMQP的解偶導致了傳輸可能遺失的問題,那在AMQP中主要遺失的狀況有
RabbitMQ有實作Transaction用來確保訊息處理的可靠性與一致性,他可以用來確保訊息正確無誤的發送到Broker(RabbitMQ的服務名稱),並在發生失敗的情況下Rollback,提供了Atomic原子性的特點。主要的流程是 開啟交易 > 發送訊息 > 提交交易,如果沒問題,就被確認。 > 有問題就rollback。程式碼如下:
// 開啟交易
channel.txSelect();
try {
String exchangeName = "my-exchange";
String routingKey = "my-routing-key";
String message = "Hello, RabbitMQ!";
// 發送消息
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
// 提交交易
channel.txCommit();
} catch (Exception e) {
// 如果有錯誤,回滾交易
channel.txRollback();
e.printStackTrace();
}
雖然RabbitMQ Transaction可確保訊息的一致性,但要謹慎使用。Transasction 會帶來性能成本,因此應僅在必要時使用它們。此外,事務應盡可能小,以盡量減少它們對性能的影響。
RabbitMQ中,針對Producer發送到Broker的訊息確認,稱呼為Confirm,預設是關閉的,開啟後我們就可以確保一個訊息在Broker被收到之後,Publish可以收到這道訊息的Confirm。實際上可以參考程式碼
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 開啟 Confirm 模式
channel.confirmSelect();
String message = "Hello, RabbitMQ with Confirm!";
// 發送消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8"));
System.out.println("Sent: " + message);
// 確認消息
if (channel.waitForConfirms()) {
System.out.println("Message was confirmed by the broker.");
} else {
System.out.println("Message was not confirmed by the broker.");
}
} catch (Exception e) {
e.printStackTrace();
}
Producer可以透過兩種方法來確認訊息是否有發送到Broker
ACK機制用來確認訊息從Broker到Consumer的是否正確被消費,通常在Consumer處理一道訊息完成後,會內建觸發一個basic.ack()的方法,告知Broker此筆訊息已經被被正確處理。
假設回復異常、或是未收到ACK就會Broker進行錯誤處理,Consumer的ACK主要有三種模式:
這個可以透過SpringBoot的參數設定
spring.rabbitmq.listener.simple.acknowledge-mode=(manul,auto,none)
DLX 是指一個訊息在Queue中變成了Dead Message之後,他可以被重新發送道另外一個Exchage之中,這個Exchange就是一個DLX。Dead Message會發生的原因如下:
如果要定義一個DLX的話,要在RoutingKey設定
// 設置主佇列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "my-dlx-exchange");
args.put("x-dead-letter-routing-key", "dlx-routing-key");
channel.queueDeclare("my-queue", true, false, false, args);
// 設置 DLX 佇列
channel.queueDeclare("my-dlx-queue", true, false, false, null);
channel.exchangeDeclare("my-dlx-exchange", "direct");
channel.queueBind("my-dlx-queue", "my-dlx-exchange", "dlx-routing-key");
那麼今天的部分就到這邊吧,我們明天見。
參考資源:
https://www.cnblogs.com/qiyongchu/p/15656853.html
https://yoziming.github.io/post/220204-gulimall-18-rabbitmq/
RabbitMQ的confirm、ack、transaction三个概念的理解_mq ack是什么意思-CSDN博客
死信交換 | RabbitMQ 繁體中文 (dev.org.tw)