iT邦幫忙

2024 iThome 鐵人賽

DAY 24
0
Software Development

從零開始構建能理解語義的 Linebot 架構系列 第 24

使用 Spring Boot 開發 Backend Bot Server: Kafka Consumer 客製化配置與訊息接收

  • 分享至 

  • xImage
  •  

概述

  • 本文將介紹在Spring中撰寫Kafka Consumer所需註冊的Beans及其設定。
  • Bot Server的Kafka Consumer實踐中,我們以關係圖的方式說明相關Bean的註冊與取用關係,幫助你更清晰地了解Beans是如何被設定與取用,以及它們最終如何被Spring Kafka執行的關聯性。

為什麼沒有使用Spring Boot的自動配置(Auto-Configure)?

根據官方說明,我們只需要在Bot Server加入以下的程式碼,就可以讓一個Consumer運作:

@SpringBootApplication
public class Application {

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}
  • 因為Spring Boot的Auto-Configure功能,我們只需註冊@KafkaListener這個特化註解,Spring Boot會用預設值設定好全部的配置並執行Consumer。
  • 然而,這些預設設定無法滿足我們專案的需求。

JSON String

  • 當我們在AWS Lambda接收LINE Bot的Webhook並透過Kafka Producer寫入資料時,使用的格式為JSON。然而,Kafka預設並不知道應該如何將JSON String進行解序列化,因此需要額外的設置來指定解序列化的對象。
  • 簡單來說,在Spring Kafka中,我們需要指定ContainerFactory, ConsumerFactory,並且在ConsumerFactory 設定讀出來的Event中:
    • Key的Deserializer為: StringDeserializer
    • Value的Deserializer為: JsonDeserializer

客製化ContainerFactory及ConsumerFactory

  • 需要在標示了@EnableKafka, 及@Configuration的Class做設定
  • 使用關係為: ContainerFactory 這個Bean會被KafkaListener取用,而ContainerFactory會透過setConsumerFactory(),把我們註冊的另外一個Bean: ConsumerFactory 設置為Consumer用的設定。
  • 我們會在ConsumerFactory做基本的設定,以及指定Deserializer: StringDeserializer 和 JsonDeserializer

程式碼如下:

  • Class KafkaConsumerConfig
package com.cancerpio.nextpagelinebotserver;

import java.util.HashMap;
import java.util.Map;

import com.cancerpio.nextpagelinebotserver.model.UserData;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;
    @Value("${spring.kafka.consumer.groupId}")
    private String groupId;
    @Value("${spring.kafka.consumer.autoOffsetReset}")
    private String autoOffsetReset;

    // 設定一個客製化的ConsumerFactory並註冊給Spring: 因為我們需處理JSON格式的資料,所以這邊要指定Key和Value的Deserializer
    @Bean
    public ConsumerFactory<String, UserData> userDataConsumerFactory() {
	return new DefaultKafkaConsumerFactory<>(consumerProps(), new StringDeserializer(),
		new JsonDeserializer<>(UserData.class));
    }

    @Bean
    // 設定給@kafkaListener的ContainerFactory,並且指定ConsumerFactory為上面客製化後的ConsumerFactory
    // 需注意: @KafkaListener必須指定containerFactory為此Bean名稱: "userDataListenerContainerFactory"
    // 否則Spring會使用預設的containerFactory
    ConcurrentKafkaListenerContainerFactory<String, UserData> userDataListenerContainerFactory(
	    ConsumerFactory<String, UserData> consumerFactory) {
	ConcurrentKafkaListenerContainerFactory<String, UserData> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(userDataConsumerFactory());
	return factory;
    }

    private Map<String, Object> consumerProps() {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
	// ...
	return props;
    }

}
  • 這邊我們配置了JsonDeserializer,並在ConsumerFactory的角括號<>中指定值的型態為我們自訂的Class: UserData。這個配置與前面提到的一樣,使得Jackson能夠在背後自動將從Kafka獲取的資料映射到UserData這個類別。

配置Kafka Listener

  • 我們還需要在一個@Component中配置@KafkaListener,並且在@KafkaListener中指定剛剛我們客製好的ContainerFactory及ConsumerFactory。相關程式片段如下:
  • Class KafkaConsumer
package com.cancerpio.nextpagelinebotserver.service;

import com.cancerpio.nextpagelinebotserver.model.MessageContent;
import com.cancerpio.nextpagelinebotserver.model.MessageContentTrainingLog;
import com.cancerpio.nextpagelinebotserver.model.OpenAIResponse;
import com.cancerpio.nextpagelinebotserver.model.UserData;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.bot.client.LineMessagingClient;
import com.linecorp.bot.client.LineSignatureValidator;
import com.linecorp.bot.model.ReplyMessage;
import com.linecorp.bot.model.message.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.ExecutionException;

@Component
class KafkaConsumer {
    @Autowired
    LineSignatureValidator lineSignatureValidator;
    @Autowired
    LineMessagingClient lineMessagingClient;
    @Autowired
    OpenAiApiService openAiApiService;
    @Autowired
    ObjectMapper objectMapper;
    @Autowired
    MongoTemplate mongoTemplate;

    // 設定@KafkaListener,並指定要監聽的topic和containerFactory
    // 這邊的containerFactory必須指定為我們在KafkaConsumerConfig中客製的"userDataListenerContainerFactory",來正確的處理JSON資料
    @KafkaListener(id = "${spring.kafka.consumer.id}", topics = "${spring.kafka.consumer.topic}", containerFactory = "userDataListenerContainerFactory")
    public void UserDataListener(UserData userData) {
        try {
            String replyToken = userData.getReplyToken();
            String userId = userData.getUserId();
            String text = userData.getText();
            String openAiResponse = openAiApiService.sendMessage(null, text);
            boolean storeStatus = saveTrainingDataToMongodb(openAiResponse, userId);
            String replyMessage = "Your message is: " + text + "\nResponse: " + openAiResponse + "\n storeStatus: " + storeStatus;
            lineMessagingClient.replyMessage(new ReplyMessage(replyToken, new TextMessage(replyMessage))).get()
                    .getMessage();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    ...
}
  • 可以看到,在標示@KafkaListener時,我們設定相關配置,並同時指定了UserDataListener這個Function,作為Consumer拿到訊息,並做完反序列化後的處理方法。
  • 根據上面的程式片段,UserDataListener在獲取Topic中的資訊後,會使用openAiApiService進行處理,並最終透過lineMessagingClient將結果回應給使用者。

Bot Server的Kafka Consumer實踐

  • 總的來說,上面提到的Beans是如何被設定與取用,以及它們最終如何被Spring Kafka執行的關聯性如下圖:
    https://ithelp.ithome.com.tw/upload/images/20241008/20105227Fl2Vr93IZZ.png
    圖: Kafka相關Bean的註冊與使用

總結

  • 本篇我們介紹了Kafka Consumer的程式片段,設置的要點如下:
  • 客製化ContainerFactory及ConsumerFactory
    • 在標示了@EnableKafka, 及@Configuration的Class中,設定兩個Bean:
      • ContainerFactory
        • 需要透過setConsumerFactory()指定我們設置的ConsumerFactory Bean
      • ConsumerFactory
        • 基本設定
        • 指定Key的Deserializer為: StringDeserializer
        • 指定Value的Deserializer為: JsonDeserializer
  • 配置Kafka Listener
    • @Component Class中指定Function為@KafkaListener
      • 需在@KafkaListener指定我們的ContainerFactory
      • 此Function為Consumer在獲取Topic中的資訊後要進行的後續邏輯。
  • 下一篇我們將繼續回到Spring Boot的專案面,了解如何管理依賴、打包和執行程式。

Citation

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html


上一篇
使用 Spring Boot 開發 Backend Bot Server: Serialize 及 Deserialize
下一篇
使用 Spring Boot 開發 Backend Bot Server: Maven 管理專案與依賴的實踐筆記
系列文
從零開始構建能理解語義的 Linebot 架構30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言