這段也是在講messaging,不過採用了Apache Kafka的實作。
Kafka的架構採取cluster,代表不會只有一個broker在運作,而是有很多個broker在cluster之中,而當訊息的topic傳進來後,會將topic拆分成多個partition,各個broker之中會保存有各個分散的partition,而至於如何再將訊息給listener就由Kafka內部各個broker來決定。
若要使用Kafka,需引入以下dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
以下為Kafka發送訊息的方法:
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition,
K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition,
Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
Long timestamp, K key, V data);
可以發現就沒有和JmsTemplate或RabbitTemplate一致了,沒有一定要包成Message,而是透過generic來指定要傳送的類別,不過topic這個屬性就還是和Jms或Rabbit類似。
sendDefault則是指會使用default的topic,可以在application.properties設定。
來看看實作的code:
package demo.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import demo.SwordOrder;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate<String, SwordOrder> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService( KafkaTemplate<String, SwordOrder> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(SwordOrder order) {
kafkaTemplate.send("demo.orders.topic", order);
}
}
Kafka沒有receive的方法,一律使用listener:
package demo.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import demo.Order;
import demo.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="demo.orders.topic")
public void handle(SwordOrder order) {
ui.displayOrder(order);
}
}
可多傳入ConsumerRecord來拿到Kafka相關的資訊:
@KafkaListener(topics="demo.orders.topic")
public void handle( SwordOrder order, ConsumerRecord<String, SwordOrder> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
也可使用老朋友Message:
@KafkaListener(topics="demo.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}