iT邦幫忙

2022 iThome 鐵人賽

DAY 14
0
自我挑戰組

Spring In Action系列 第 14

JmsTemplate & RabbitTemplate

  • 分享至 

  • xImage
  •  
  1. JmsTemplate

這邊會使用spring-boot-starter-artemis套件來實作,artemis是activemq新的實作版本。

在我們的Spring App上使用activemq時,還需要另外啟動activemq broker,我們app傳出去的訊息其實是會傳到這個broker上的mq,然後app可能透過receive的方法或者listener的設定去拿到broker mq上的訊息。這個broker其實也可以是Spring幫我們embeded在memory中的,但是這樣就變成只支援在當前的app上了,無法讓其他app去使用。

JmsTemplate會是我們從Spring App不管傳訊息或收訊息會用到的核心類別。

send

// Send raw messages
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;

// Send messages converted from objects
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
 
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;

void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;

void convertAndSend(String destinationName, Object message,MessagePostProcessor postProcessor) throws JmsException;

有send()和convertAndSend()兩類方法:

  • send()

接受MessageCreator所return的Message物件當作要傳出去的訊息,也可放入Destination物件指定要傳到哪裡,如果不放Destination就代表要傳到default的destination(可在application.properties設定)

  • convertAndSend()

直接接受要傳出的物件,方法內會幫我們轉成Message物件傳出去。不過由於是由方法內部幫我們轉成Message的,若我們希望設定一些customize的參數在Message反而就不像send()方法的MessageCreator可以自定義,這時候就要再傳入MessagePostProcessor(吃入Message返回Message)來針對要傳出的Message預先處理後,再傳出去。

receive

Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
 
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;

當我們call receive方法時,Spring App就會主動去向broker拉該destination的訊息下來。但除了這種主動拉的方式外,其實還可以在Spring App設定Listener來主動監聽broker該destination是否有訊息在mq中了,若有就會自動拉進來。

listener

package demo.kitchen.messaging.jms.listener;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
 
import demo.SwordOrder;
import demo.kitchen.KitchenUI;
 
@Profile("jms-listener")
@Component
public class OrderListener {
  
  private KitchenUI ui;
 
  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }
 
  @JmsListener(destination = "demo.order.queue")
  public void receiveOrder(SwordOrder order) {
    ui.displayOrder(order);
  }
  
}

以上範例代表會主動監聽demo.order.queue這個destination有沒有訊息。

這段嘗試使用RabbitMQ(RabbitTemplate)來實作messaging的功能,說實在跟ActiveMQ(JmsTemplate)概念根本完全一樣,只是他多了exchange的一層來去分派訊息給不同的mq,在JmsTemplate中只會指定Destination,而在RabbitTemplate要指定exchange和routing key,若不指定就會使用預設的值。

引入以下dependency即可在Spring App使用RabbitMQ:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

參考以下RabbitTemplate的send方法:

// Send raw messages
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
 
// Send messages converted from objects
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
 
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;

void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

send的範例code:

package demo.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import demo.Order;
 
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
  private RabbitTemplate rabbit;
 
  @Autowired
  public RabbitOrderMessagingService(RabbitTemplate rabbit) {
    this.rabbit = rabbit;
  }
 
  public void sendOrder(TacoOrder order) {
    MessageConverter converter = rabbit.getMessageConverter();
    MessageProperties props = new MessageProperties();
    Message message = converter.toMessage(order, props);
    rabbit.send("demo.order", message);
  }
}

以下為receive方法:

// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
 
// Receive objects converted from messages
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis)
                                                    throws AmqpException;
 
// Receive type-safe objects converted from messages
<T> T receiveAndConvert(ParameterizedTypeReference<T> type)
                                                      throws AmqpException;
<T> T receiveAndConvert(
    String queueName, ParameterizedTypeReference<T> type)
                                                      throws AmqpException;
<T> T receiveAndConvert(
    long timeoutMillis, ParameterizedTypeReference<T> type)
                                                      throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
    ParameterizedTypeReference<T> type)
                                                      throws AmqpException;

receive範例code:

package demo.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
@Component
public class RabbitOrderReceiver {
  private RabbitTemplate rabbit;
  private MessageConverter converter;
 
  @Autowired
  public RabbitOrderReceiver(RabbitTemplate rabbit) {
    this.rabbit = rabbit;
    this.converter = rabbit.getMessageConverter();
  }
 
  public SwordOrder receiveOrder() {
    Message message = rabbit.receive("demo.order");
    return message != null
           ? (SwordOrder) converter.fromMessage(message)
           : null;
  }
}

當然也可以設定listener:

package demo.kitchen.messaging.rabbit.listener;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import demo.SwordOrder;
import tacos.kitchen.KitchenUI;
 
@Component
public class OrderListener {
  
  private KitchenUI ui;
 
  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }
 
  @RabbitListener(queues = "demo.order.queue")
  public void receiveOrder(SwordOrder order) {
    ui.displayOrder(order);
  }
  
}

上一篇
OAuth2
下一篇
KafkaTemplate
系列文
Spring In Action30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言