iT邦幫忙

2022 iThome 鐵人賽

DAY 15
0
自我挑戰組

Spring In Action系列 第 15

KafkaTemplate

  • 分享至 

  • xImage
  •  

這段也是在講messaging,不過採用了Apache Kafka的實作。

Kafka的架構採取cluster,代表不會只有一個broker在運作,而是有很多個broker在cluster之中,而當訊息的topic傳進來後,會將topic拆分成多個partition,各個broker之中會保存有各個分散的partition,而至於如何再將訊息給listener就由Kafka內部各個broker來決定。

若要使用Kafka,需引入以下dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

以下為Kafka發送訊息的方法:

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, 
                                        K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, 
                                        Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
 
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, 
                                               Long timestamp, K key, V data);

可以發現就沒有和JmsTemplate或RabbitTemplate一致了,沒有一定要包成Message,而是透過generic來指定要傳送的類別,不過topic這個屬性就還是和Jms或Rabbit類似。

sendDefault則是指會使用default的topic,可以在application.properties設定。

來看看實作的code:

package demo.messaging;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import demo.SwordOrder;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
  
  private KafkaTemplate<String, SwordOrder> kafkaTemplate;
  
  @Autowired
  public KafkaOrderMessagingService( KafkaTemplate<String, SwordOrder> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }
  
  @Override
  public void sendOrder(SwordOrder order) {
    kafkaTemplate.send("demo.orders.topic", order);
  }
  
}

Kafka沒有receive的方法,一律使用listener:

package demo.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import demo.Order;
import demo.kitchen.KitchenUI;
 
@Component
public class OrderListener {
 
  private KitchenUI ui;
 
  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }
 
  @KafkaListener(topics="demo.orders.topic")
  public void handle(SwordOrder order) {
    ui.displayOrder(order);
  }
 
}

可多傳入ConsumerRecord來拿到Kafka相關的資訊:

@KafkaListener(topics="demo.orders.topic")
public void handle( SwordOrder order, ConsumerRecord<String, SwordOrder> record) {
  log.info("Received from partition {} with timestamp {}", 
      record.partition(), record.timestamp());
  
  ui.displayOrder(order);
}

也可使用老朋友Message:

@KafkaListener(topics="demo.orders.topic")
public void handle(Order order, Message<Order> message) {
  MessageHeaders headers = message.getHeaders();
  log.info("Received from partition {} with timestamp {}",
      headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
      headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
  ui.displayOrder(order);
}

上一篇
JmsTemplate & RabbitTemplate
下一篇
Intro of integration flow
系列文
Spring In Action30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言