上一輪介紹了基本的Kafka概念與Spring上Kafka的使用,這次我想要稍微提到Kafka的更多特性。
主要有關於兩件事:
Kafka 中的每個分區(Partition)都有多個副本(Replicas),這些副本分布在不同的 Broker上。當 Producer發送訊息時,訊息就會寫入 Partition的Leader 副本,然後 Leader 負責將消息複製到 Follower 副本。這樣,即使某個 Broker 節點失效,其他副本仍然能保證數據的可用性。
提醒:副本數越多、資料的備援就越好,但同時系統的資訊消耗也會隨之上揚。
那你可能會問,假設Leader副本在同步Follower前就掛掉了怎麼辦?
其中一種做法就是設定ack=all策略,也就是說假設一個Partition中有一個Leader與三個Follower,Producer只有在四個Follower都收到訊息時,才會收到Broker的回應訊息。這意味著各個Follower必須要彼此同步是否收到訊息,造成了額外的開銷成本。
假設我們在生產者想要確認是否Broken有收到訊息,可以使用addCallback()異步取得結果
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("Producer成功發送訊息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("發送訊息失敗,原因:{}", ex.getMessage()));
在 Kafka 中,Consumer 的進度是由偏移量(Offset)來追踪的。每個 Consumer 組可以自行控制提交偏移量的時機,常見的策略是手動提交(manual commit
)或自動提交(auto commit
)。手動提交可以提供更細緻的控制,確保 Consumer 在確認消息已被正確處理後才提交偏移量,從而避免消息丟失或重複處理。
Poison是Kafka會出現的錯誤情形,主要的發生原因是因為格式上出了問題導致。
一般來說的Producer 到 Consumer之間會透過將Java Object 序列化後傳輸,接著Consumer 反序列化後處理資料,但假如有一天反序列化的內容有問題導致無法反序列化成功,就會產生毒藥。
Consumer 會不停嘗試反序列化毒藥,程式會進入無窮迴圈,無法繼續訂閱訊息外,LOG 記錄被大量的、重複且無意義的錯誤訊息淹沒,最終導致儲存空間用盡。
解決方案:
使用ErrorHandlingDeserializer處理反序列化失敗問題。
///
application.properties
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
///
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<Object, Object> kafkaTemplate) {
return new DeadLetterPublishingRecoverer(kafkaTemplate,
(ConsumerRecord<?, ?> r, Exception e) -> new TopicPartition(r.topic() + ".DLT", r.partition()));
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) {
return new DefaultErrorHandler(recoverer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(DefaultErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
另外也可以和RabbitMQ一樣設定一個DLX,專門處理那些異常的訊息,至少不要讓他繼續卡在Queue耗費系統資源。
@KafkaListener(topics = "input-topic", groupId = "my-group")
public void listen(String message) {
try {
// 處理消息邏輯
processMessage(message);
} catch (Exception e) {
log.error("處理消息失敗,轉移到DLX", e);
sendToDLX(message);
}
}
那今天的介紹就到此為止了,明天就會來到最後一篇記錄文章了,我們 明天見。
參考資源: