iT邦幫忙

2024 iThome 鐵人賽

DAY 17
0

上一輪我們介紹過RabbitMQ,這次我們來提提Kafka吧。Kafka與RabbitMQ一樣都提供了Message Queue 服務,但Kafka的功能更加適合大型系統的處理,資料吞吐量的表現也比RabbitMQ好,除此之外,Kafka還能提供 Streaming Processing與簡易的Databse功能,就讓我們一起來看看吧。

image.png

這是Kafka的架構圖,與RabbitMQ的關係類似。以下介紹Kafka的核心概念。

  • Event:在Kafka中一筆資料被稱做一個消息,在Kafka傳輸時通常還會傳入Key、Timestamp與Meta Data Header,這些全部傳輸的資料合起來叫做一個事件。
  • Topics:傳輸的訊息的類別名稱。
  • Partitions:Topic內切分多個Partitions,內部已序列方式存放了訊息(message)或紀錄(Record),Partition可以分散存放在不同機器中作為備援。
  • Producers 生產者,把訊息丟到Topic,與RabbitMQ相同
  • Consumers 消費者,從Topic讀取訊息,與RabbitMQ相同
  • Brokers 指單個服務,與RabbitMQ相同
  • Clusters 多個Broken連接在一起稱作Cluster,Cluster中存在一個Controller (動態建立),負責分配Partition與監控Broker狀態。
  • Retention Kafka設定可以存放在磁碟的時間,Kafka會把資料存在磁碟中,可以針對不同Topic設定不同Retention
  • Multiple Cluster 多叢集管理
  • Replicas:備份
  • Offset : 每個Partition中的資料是有順序性,每筆資料寫入一個partition時都會有一個的id稱之為offset。

Kafka的優勢有

  1. 可擴展性**(Scalability)一個 Topic 也能夠將不同 Partition 部署於不同 Broker 上。這種特殊架構讓 Kafka 更容易實現水平擴展,使用者只需在現有的基礎架構中加入新伺服器或資料中心就能處理更多資料。**
  2. 更大量的資料吞吐。
  3. 資料持久性,RabbitMQ一旦訊息被接收,通常就會被刪除,但Kafka預設使用磁碟儲存,更適合存放訊息。
  4. Partition 可帶來更大的資料可用性。

想要測試Kafka,可以參考官方的教學:https://kafka.apache.org/quickstart

以下簡單撰寫使用教學

  1. Apache Download Mirrors 安裝最新的Kafka
  2. 解壓縮後,啟動ZooKeeper Server
$ tar -xzf kafka_2.13-3.8.0.tgz
$ cd kafka_2.13-3.8.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
  1. 開啟另外一個Terminal,啟動Kafka Server
bin/kafka-server-start.sh config/server.properties

這樣本機就已經啟用Kafka了,接下來看SpringBoot的設定吧。首先是Maven安裝依賴。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

接著在Application.properties定義生產者與消費者的序列化與反序列化工具。並訂立group.id給Consumer用來分群、以及auto-offset-reset=earliest 代表當Kafka Consumer啟動且在Group中找不到現有的偏移量時,在Kafka重新啟動或新加入Group時能從最早的資料開始處理。


# Kafka broker 地址
spring.kafka.bootstrap-servers=localhost:9092
# 消費者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 生產者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

接著定義Kafka 接收訊息的設定,這邊定義了接收訊息的TOPIC以及GroupId。

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my_topic", groupId = "my-group")
    public void consume(String message) {
        System.out.println("接收到的訊息: " + message);
    }
}

最後加入Producer與Consumer。


@Service
public class KafkaProducer {

    private static final String TOPIC = "my_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message) {
        kafkaProducer.sendMessage(message);
        return "已發送訊息: " + message;
    }
}

透過以上的設定,你應該可以自行建立出一個Kafka Server了

image.png

那麼今天的介紹就到這邊了,我們明天見吧。

參考資料:

Apache Kafka 介紹. Data really powers everything that we… | by Chi-Hsuan Huang | Medium

Apache Kafka

Apache Kafka 是什麼?核心元件、優勢、常見使用案例一次看! - 歐立威科技 (omniwaresoft.com.tw)

什麼是 Apache Kafka?| Google Cloud


上一篇
[DAY 26] Spring RabbitMQ 常見問題
下一篇
[DAY 28] Spring Kafka 小筆記
系列文
週日時在做什麼?有沒有空?可以來寫SpringBoot嗎?30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言