Kafka 是一個分散式的串流處理平台,它有別於傳統實現 AMQP 協議的消息隊列,有更強大的數據處理能力,它的獨特優勢在於:
由於 Kafka 具備天然的分散式、可擴展的特性,使其吞吐量極高,可處理每秒百萬級的流量高峰,並且異步處理不影響主線程,這對我們的站內信、推送通知系統十分有幫助,一旦發出請求不用等待回應,馬上就能去做別的事情了,萬一發送失敗還可設計重試機制,通知信件也不怕丟失。
在建立 Kafka 最高層次的理解時,我們可以把架構簡化再簡化,從 Kafka 三本柱(我自封的)開始認識並衍生出去更多的技術細節,這三個核心的組件構成了最基本的 Kafka 訂閱發佈機制,它們分別是 Broker、Producer 和 Consumer:
關於它們三個組件的進一步衍生,如 Topic、Partition、Offset、Consumer Group 等概念,到下篇再補充一點。
接下來實作環節我們要在 Docker Compose 的檔案中新增 Kafka 配置,讓我們的本地等等可以運行起 Kafka Broker 以供測試,並在 SpringBoot 專案中引入 Kafka Client 的配置,最後再做個簡單的 Pub/Sub 測試。
為了測試時有一個本地的 Kafka Broker,所以我們還是需要用 Docker 運行起一個 Server,以下是 Docker Compose 的配置:
kafka:
image: confluentinc/cp-kafka:7.4.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
KAFKA_LISTENERS: "PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
配置好以後我們實際運行起容器測試看看 docker compose -f database.yaml up -d kafka
(我的 Kafka 是跟 Redis 和 MySQL 共用一個 docker-compose.yaml),運行起來後看下狀態 docker ps
,看起來沒什麼問題:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d02c9175a630 confluentinc/cp-kafka:7.4.0 "/etc/confluent/dock…" 4 minutes ago Up 4 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
d15c6144388d redis:6-alpine "docker-entrypoint.s…" 6 days ago Up 6 days 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp redis-cache-test
c467f8fed7dd mysql:8.0 "docker-entrypoint.s…" 6 weeks ago Up 6 weeks 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp core-mysql
首先為我們的 SpringBoot 加上 Kafka Client 的 Maven 依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
接著在專案配置上 Kafka 伺服器的位址,用來跟 Kafka 建立連線,Kafka Client 會透過它找到叢集中其他的 Broker,這邊代表 Kafka Broker 會在本機 9092 的 Port,下面就是 Producer 跟 Consumer 它們的一些參數設置:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: notification-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: true
接下來我們看到 Java 裡面用來初始化 Kafka Client 物件的配置類,用途在於:
總之它就是為 Spring 跟 Kafka 之間搭起一個橋樑,不用每次要用的時候都要 new,而是交給 Spring 容器自動管理。但其實如果比較單純的專案架構,剛剛的 .yaml 文件就已經夠用了,SpringBoot 會在容器啟動時自動載入那些參數,KafkaConfig 是在需要更靈活、彈性的配置時使用的。
@Configuration
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootStrapServers;
@Bean
public KafkaTemplate<String, String> KafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps= new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "play-ground");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
接著我們來實際測試一個 Kafka Producer 發送一個訊息給 Kafka Broker,然後 Consumer 監聽到並把訊息印出來的過程,首先我們簡單粗暴地創建一個 API:
kafkaTemplate.send("TEST", "kafka 666");
到 TEST 這個隨便亂設的 Topic。@RequiredArgsConstructor
@RequestMapping(value = "/notification")
@RestController
public class NotificationController {
private final KafkaTemplate<String, String> kafkaTemplate;
@GetMapping(value = "/kafka-test")
public ResponseEntity<String> kafkaTest() {
kafkaTemplate.send("TEST", "kafka 666");
return ResponseEntity.ok("kafka-test");
}
}
接著創建一個監聽的 KafkaConsumer:
@Component
public class KafkaConsumer {
@KafkaListener(topics = "TEST")
public void process(String content) {
System.out.println(content + " gogogo");
}
}
當我們運行起 SpringBoot 後,我們連到 Kafka Broker 一看就會發現,剛剛在 @KafkaListener 上的 Topic 已經被註冊進去了:
➜ ~ docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
TEST
然後 Call 了 API 以後確實在終端上看到了文字訊息:
2025-09-01T21:46:55.297+08:00 INFO 36557 --- [playground-module] [dule-producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=playground-module-producer-1] ProducerId set to 2 with epoch 0
kafka 666 gogogo
今天簡單複習了一下 Kafka 基礎的組件,包含 Broker、Producer、Consumer,然後實際在 SpringBoot 專案中配置了 Kafka Client,又簡單做了一個 Pub/Sub 的小測試,明天再補充一下 Kafka 相關的知識後,就要開始實作功能了。