iT邦幫忙

2025 iThome 鐵人賽

DAY 11
0

前言:Kafka

Kafka 是一個分散式的串流處理平台,它有別於傳統實現 AMQP 協議的消息隊列,有更強大的數據處理能力,它的獨特優勢在於:

  1. 分散式:可設置多分區並行處理來源數據,且水平擴展能力強
  2. 持久化:數據是保存在硬碟上的,一定時間內可訪問,不怕丟失數據
  3. 時序性:同一個分區內有時序性,可以幫助回溯

由於 Kafka 具備天然的分散式、可擴展的特性,使其吞吐量極高,可處理每秒百萬級的流量高峰,並且異步處理不影響主線程,這對我們的站內信、推送通知系統十分有幫助,一旦發出請求不用等待回應,馬上就能去做別的事情了,萬一發送失敗還可設計重試機制,通知信件也不怕丟失。

Kafka 基礎概念

在建立 Kafka 最高層次的理解時,我們可以把架構簡化再簡化,從 Kafka 三本柱(我自封的)開始認識並衍生出去更多的技術細節,這三個核心的組件構成了最基本的 Kafka 訂閱發佈機制,它們分別是 Broker、Producer 和 Consumer:

  • Kafka Broker:簡言之就是一個 Kafka 的伺服器節點,就像我們要用 Redis 前需要先在某台機器上啟動 Redis 一樣,Kafka 也需要一台可以運行的實體讓它可以作業、執行資料儲存跟消息轉發。一個 Kafka Cluster 會有多個 Broker 且可水平擴展,每個 Broker 負責儲存部分資料,如果我在本機用 Docker 運行起一個 Kafka 的 Docker Image 那我的 Mac 就是一個 Kafka Broker。
  • Kafka Producer:剛講到的 Broker 是 Server 端的角色,而 Producer 跟 Consumer 就是所謂的 Client 端。Producer 是產生、並發送消息給 Broker 的組件,它會發送給在 Broker 裡的某 Topic 的某 Partition(下一篇再細講,若不指定 Partition 的話,Kafka 會自動分配),像我們透過 SpringBoot 的 Kafka Template 發送消息的這個行為,就代表是一個 Kafka Producer。
  • Kafka Consumer:訂閱、監聽 Kafka Broker 的 Topic,並在其有新消息時,從 Topic 拿資料出來的 Client 端角色,SpringBoot 中使用 @KafkaListener 監聽,並打開 payload 就是一種 Consumer 的行為。

關於它們三個組件的進一步衍生,如 Topic、Partition、Offset、Consumer Group 等概念,到下篇再補充一點。

實作 Lab:環境建置

接下來實作環節我們要在 Docker Compose 的檔案中新增 Kafka 配置,讓我們的本地等等可以運行起 Kafka Broker 以供測試,並在 SpringBoot 專案中引入 Kafka Client 的配置,最後再做個簡單的 Pub/Sub 測試。

Docker Compose

為了測試時有一個本地的 Kafka Broker,所以我們還是需要用 Docker 運行起一個 Server,以下是 Docker Compose 的配置:

 kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
      KAFKA_LISTENERS: "PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"

配置好以後我們實際運行起容器測試看看 docker compose -f database.yaml up -d kafka (我的 Kafka 是跟 Redis 和 MySQL 共用一個 docker-compose.yaml),運行起來後看下狀態 docker ps ,看起來沒什麼問題:

CONTAINER ID   IMAGE                         COMMAND                  CREATED         STATUS         PORTS                                       NAMES
d02c9175a630   confluentinc/cp-kafka:7.4.0   "/etc/confluent/dock…"   4 minutes ago   Up 4 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp   kafka
d15c6144388d   redis:6-alpine                "docker-entrypoint.s…"   6 days ago      Up 6 days      0.0.0.0:6379->6379/tcp, :::6379->6379/tcp   redis-cache-test
c467f8fed7dd   mysql:8.0                     "docker-entrypoint.s…"   6 weeks ago     Up 6 weeks     0.0.0.0:3306->3306/tcp, :::3306->3306/tcp   core-mysql

Spring Kafka Template

首先為我們的 SpringBoot 加上 Kafka Client 的 Maven 依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

接著在專案配置上 Kafka 伺服器的位址,用來跟 Kafka 建立連線,Kafka Client 會透過它找到叢集中其他的 Broker,這邊代表 Kafka Broker 會在本機 9092 的 Port,下面就是 Producer 跟 Consumer 它們的一些參數設置:

  • Producer 相關:
    • key-serializer/value-serializer:定義如何把 Java 物件序列化成位元組再傳給 Kafka,這裡使用 StringSerializer,把 String 的 key-value 轉換成 byte[] 的工具。
    • acks:定義 Producer 發送消息後,要等多少 Broker 確認才算成功。
      • all 代表 leader 及所有的 ISR (in-sync replicas) 都確認後才算成功 → 最安全但延遲最高。
      • 0 不等回覆 → 最快,但容易丟掉資料。
      • 1 只等 leader 回覆 → 折衷,有可會有少量丟失。
    • retries:訊息發送失敗時的重試次數。
  • Consumer 相關:
    • group-id:消費者群組的 id。Kafka 會依照 Group 協調 Partition 的分配,確保同一個 Group 裡的 Consumer 不會重複消費同一條訊息。如果多個 Consumer 都屬於這組,都會共同分擔 Topic 的 Partition。
    • key-deserializer/value-deserializer:定義如何把 Broker 回傳的位元組資料反序列化成 Java 物件,這裡是 StringDeserializer,key-value 都會被還原成字串。
    • auto-offset-reset:當 Consumer 的 Group 沒有 offset 紀錄,或 offset 已超出範圍時的行為。
      • earliest:從最舊的訊息開始讀。
      • latest:從最新的訊息開始讀(常用於一般即時消費場景)。
      • none:若無有效 offset 就丟錯。
    • enable-auto-commit:是否自動提交 offset(預設 true),false 需手動 commit,方便確保業務邏輯真的處理完才提交,避免訊息丟失或重複消費。
kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
    consumer:
      group-id: notification-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: true

接下來我們看到 Java 裡面用來初始化 Kafka Client 物件的配置類,用途在於:

  • 建立 KafkaTemplate,它是 Spring 提供的工具類,讓我們可以很方便的發送訊息到 Kafka,它透過 producerFactory 建立。
  • 建立 Producer 跟 Consumer 工廠(設定序列化、反序列化方式、群組 ID 等)。
  • 建立 Listener 容器工廠,讓 @KafkaListener 可以運作。

總之它就是為 Spring 跟 Kafka 之間搭起一個橋樑,不用每次要用的時候都要 new,而是交給 Spring 容器自動管理。但其實如果比較單純的專案架構,剛剛的 .yaml 文件就已經夠用了,SpringBoot 會在容器啟動時自動載入那些參數,KafkaConfig 是在需要更靈活、彈性的配置時使用的。

@Configuration
public class KafkaConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootStrapServers;

    @Bean
    public KafkaTemplate<String, String> KafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps= new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "play-ground");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

測試:Producer → Consumer

接著我們來實際測試一個 Kafka Producer 發送一個訊息給 Kafka Broker,然後 Consumer 監聽到並把訊息印出來的過程,首先我們簡單粗暴地創建一個 API:

  • 並注入 KafkaTemplate。
  • 在 Call API 的同時發送訊息出去:kafkaTemplate.send("TEST", "kafka 666"); 到 TEST 這個隨便亂設的 Topic。
@RequiredArgsConstructor
@RequestMapping(value = "/notification")
@RestController
public class NotificationController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping(value = "/kafka-test")
    public ResponseEntity<String> kafkaTest() {
        kafkaTemplate.send("TEST", "kafka 666");
        return ResponseEntity.ok("kafka-test");
    }
}

接著創建一個監聽的 KafkaConsumer:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "TEST")
    public void process(String content) {
        System.out.println(content + " gogogo");
    }
}

當我們運行起 SpringBoot 後,我們連到 Kafka Broker 一看就會發現,剛剛在 @KafkaListener 上的 Topic 已經被註冊進去了:

➜  ~ docker exec kafka kafka-topics --list --bootstrap-server localhost:9092

TEST

然後 Call 了 API 以後確實在終端上看到了文字訊息:

2025-09-01T21:46:55.297+08:00  INFO 36557 --- [playground-module] [dule-producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=playground-module-producer-1] ProducerId set to 2 with epoch 0
kafka 666 gogogo

總結

今天簡單複習了一下 Kafka 基礎的組件,包含 Broker、Producer、Consumer,然後實際在 SpringBoot 專案中配置了 Kafka Client,又簡單做了一個 Pub/Sub 的小測試,明天再補充一下 Kafka 相關的知識後,就要開始實作功能了。


上一篇
Day 10 | 推送系統實作前導&十日回顧
下一篇
Day 12 | Kafka 基礎知識補充 | Topic, Partition, Consumer Group
系列文
系統設計一招一式:最基本的功練到爛熟就是殺手鐧,從單體架構到分布式系統的 Lab 實作筆記12
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言