iT邦幫忙

2024 iThome 鐵人賽

DAY 17
0

上一輪介紹了基本的Kafka概念與Spring上Kafka的使用,這次我想要稍微提到Kafka的更多特性。

主要有關於兩件事:

  1. Kafka 怎麼保證訊息的可靠性
  2. Poison Pill 是什麼?

Kafka 怎麼保證訊息的可靠性

Kafka 中的每個分區(Partition)都有多個副本(Replicas),這些副本分布在不同的 Broker上。當 Producer發送訊息時,訊息就會寫入 Partition的Leader 副本,然後 Leader 負責將消息複製到 Follower 副本。這樣,即使某個 Broker 節點失效,其他副本仍然能保證數據的可用性。

提醒:副本數越多、資料的備援就越好,但同時系統的資訊消耗也會隨之上揚。

那你可能會問,假設Leader副本在同步Follower前就掛掉了怎麼辦?

其中一種做法就是設定ack=all策略,也就是說假設一個Partition中有一個Leader與三個Follower,Producer只有在四個Follower都收到訊息時,才會收到Broker的回應訊息。這意味著各個Follower必須要彼此同步是否收到訊息,造成了額外的開銷成本。

假設我們在生產者想要確認是否Broken有收到訊息,可以使用addCallback()異步取得結果

 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
        future.addCallback(result -> logger.info("Producer成功發送訊息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
          ex -> logger.error("發送訊息失敗,原因:{}", ex.getMessage()));

在 Kafka 中,Consumer 的進度是由偏移量(Offset)來追踪的。每個 Consumer 組可以自行控制提交偏移量的時機,常見的策略是手動提交(manual commit)或自動提交(auto commit)。手動提交可以提供更細緻的控制,確保 Consumer 在確認消息已被正確處理後才提交偏移量,從而避免消息丟失或重複處理。

Poision Pill

Poison是Kafka會出現的錯誤情形,主要的發生原因是因為格式上出了問題導致。

一般來說的Producer 到 Consumer之間會透過將Java Object 序列化後傳輸,接著Consumer 反序列化後處理資料,但假如有一天反序列化的內容有問題導致無法反序列化成功,就會產生毒藥。

Consumer 會不停嘗試反序列化毒藥,程式會進入無窮迴圈,無法繼續訂閱訊息外,LOG 記錄被大量的、重複且無意義的錯誤訊息淹沒,最終導致儲存空間用盡。

解決方案:

使用ErrorHandlingDeserializer處理反序列化失敗問題。

///
application.properties
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
///
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<Object, Object> kafkaTemplate) {
        return new DeadLetterPublishingRecoverer(kafkaTemplate, 
            (ConsumerRecord<?, ?> r, Exception e) -> new TopicPartition(r.topic() + ".DLT", r.partition()));
    }

    @Bean
    public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) {
        return new DefaultErrorHandler(recoverer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(DefaultErrorHandler errorHandler) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

另外也可以和RabbitMQ一樣設定一個DLX,專門處理那些異常的訊息,至少不要讓他繼續卡在Queue耗費系統資源。

@KafkaListener(topics = "input-topic", groupId = "my-group")
public void listen(String message) {
    try {
        // 處理消息邏輯
        processMessage(message);
    } catch (Exception e) {
        log.error("處理消息失敗,轉移到DLX", e);
        sendToDLX(message); 
    }
}

那今天的介紹就到此為止了,明天就會來到最後一篇記錄文章了,我們 明天見。

參考資源:

https://medium.com/@peiiun7887/spring-for-apache-kafka-實作-da47cbee1d34https://blog.csdn.net/w_monster/article/details/125394637


上一篇
[DAY 27] Spring Kafka 使用教學
下一篇
[DAY 29] 探秘Reactive Programming - RxJava
系列文
週日時在做什麼?有沒有空?可以來寫SpringBoot嗎?30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言