Kafka 是一個分散式串流平台,專為巨量資料流設計,可透過水平擴展的方式來提升乘載量,甚至可以把資料存放於硬碟中,提供擴展性、可靠性、持久性的串流服務。
Kafka 採用 Publish/Subscribe 機制,透過 Producer 將 事件(Event) 傳送給 Kafka Broker,Consumer 透過訂閱的方式接收來自 Producer 的 Event。
在 Kafka 的生態系中,傳送資料到 Kafka 後會以 Event 的形式存在,有時候會以 紀錄(Record) 或 訊息(Message) 來稱呼。每一個 Event 會由以下幾個部分組成:
主題(Topic) 是用來組織、分類與保存 Event 的,可以把它想像成是資料夾,而 Event 是裡面的檔案。
Topic 除了用來分類 Event 外,最值得一提的是 Consumer 可以在需要的時候從中取出 Event,與其他訊息系統不同,Event 不會在取出後刪除,開發者可以針對 Topic 設定 Event 的保留時間,超過時間就會被丟棄,而判斷的依據就是 Event 的 Timestamp。
Kafka 為了可擴展性設計了 Partition 機制,將 Topic 的資料分區存放,以利於分散在各個 Kafka Broker,如此一來,應用程式就可以同時從不同 Kafka Broker 寫入、讀取資料。
當有一筆 Event 分配到 Topic 時,會根據該 Event 的 key 來進行分區,如果沒有特別設定 key,則會由 Kafka 自動分配。Kafka 會依照 Event 的 Timestamp 來決定該 Event 在 Partition 內的 偏移量(offset),每一筆 Event 會根據上一筆 Event 的 offset 進行疊加,如果沒有上一筆則會以 0 當作 offset,如此一來,就可以透過 offset 確保 Event 的順序,讓 Consumer 依照該順序進行取出,達到保證順序的效果,不過不同 Partition 之間是無法保證順序的,這就表示如果有要照順序執行的 Event,就必須放在同一個 Partition 內,也就是說,這些 Event 必須使用相同的 key。
Kafka 針對 Consumer 的部分設計了群組的概念,讓多個 Consumer 共享一個 Group ID,並使用該 Group ID 訂閱一個 Topic 下的所有 Partition,如此一來,便可以將風險分散在多個 Consumer 上來提升容錯性。一個 Group 內的 Consumer 不會重複處理相同的 Partition,所以每一個 Partition 都只會由一個 Consumer 來處理,但如果是不同 Group 就不影響。
注意:一個 Consumer Group 內的 Consumer 數量不會超過 Partition 的數量,原因是多出來的 Consumer 並不會分配到工作;但也不一定要讓 Consumer 的數量與 Partition 一致,是可以少於 Partition 數量的。
由於 Consumer Group 內的 Consumer 都有可能因為某些原因發生變化,比如:新增 Consumer、某個 Consumer 停機等,這些情況下可能會導致新增的 Consumer 沒有分配到工作、停機造成部份 Partition 內的 Event 沒有被處理,所以 Kafka 設計了一套機制來 重新平衡(Rebalance) Consumer Group 進而解決上述問題。
Rebalance 需要有一個 協調者(Group Coordinator) 的角色來擔任仲介,每當有一個新的 Consumer 要加入某個 Consumer Group 時,會向 Group Coordinator 發送加入請求,進而發起 Rebalance,將部份 Partition 的處理工作分配給該 Consumer。
針對 Consumer 停機的部分,則是由每一個 Group 內的 Consumer 向 Group Coordinator 發送自己的 心跳(Heartbeat),讓 Group Coordinator 知道這個 Consumer 還活著,一旦失去心跳就會發起 Rebalance,將該 Consumer 負責的 Partition 交給其他 Consumer 來處理。
補充:Group Coordinator 是由某個 kafka Broker 來擔任。
前面有提到 Consumer 會依據 Event 的 offset 來取出 Event,那麽 Consumer 如果停機了,下次回線的時候要怎麼知道從哪個 offset 開始做起呢?又或是發生 Rebalance 的時候要怎麼知道該從何做起呢?針對這個問題,Kafka 設計了 提交偏移量(Commit Offset) 的機制,讓 Consumer 在處理完 Event 時主動 Commit offset,Kafka Broker 會將該 offset 保存起來,如此一來,就可以儲存最後一次完成的 Event 在哪個 offset。
補充:Kafka Client 函式庫通常會提供 自動提交(Auto Commit) 的功能,可以定時或定量 commit offset,是最簡單的處理方式,但還是建議依實際情況決定是否合適。
在終端機輸入下方指令以便從 Docker Hub 下載 Kafka 的 Docker Image:
$ docker pull bitnami/kafka
補充:從
3.7.0
開始,Apache 官方提供了 Kafka 的 Docker 版本,但該 Image 沒有將 Client 用的 Script 包進去,為了方便測試,這裡使用 bitnami 的版本。
在啟動 Kafka 之前,需要先建立 Docker Bridge Network,原因是待會會透過另一個 Container 來擔任 Client 進而操作 Kafka:
$ docker network create <NETWORK_NAME>
接下來就透過下方指令將 Kafka 架設在 9092
port 並運行於 <NETWORK_NAME>
上:
$ docker run --name <NAME> --hostname <HOSTNAME> \
--network <NETWORK_NAME> \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,EXTERNAL://localhost:9094 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@<HOSTNAME>:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-d \
-p 9092:9092 \
-p 9094:9094 \
bitnami/kafka:latest
在讓兩個服務透過 Kafka 進行交換訊息之前,先試著用 bitnami/kafka 裡面的 Shell Script 來進行操作。
首先,透過下方指令執行 bitnami/kafka 內部的 kafka-topics.sh
來建立名為 demo
的 Topic:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-topics.sh --topic demo --create --bootstrap-server <HOSTNAME>:9092
建立完畢後,透過下方指令執行 bitnami/kafka 內部的 kafka-console-producer.sh
來啟動 Producer,並指定 demo
為要寫入的 Topic:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic demo --bootstrap-server <HOSTNAME>:9092
此時終端機會呈現可以輸入文字的狀態,這邊輸入 Hello World1
並按下 Enter
鍵,就會將該筆 Event 寫入到 Topic:
接著,打開新的終端機輸入下方指令執行 bitnami/kafka 內部的 kafka-console-consumer.sh
來啟動 Consumer,並指定 demo
為要訂閱的 Topic,這邊可以透過 --from-beginning
從頭開始讀取:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server <HOSTNAME>:9092
此時終端機會收到剛剛從 Producer 發送的 Event:
如果這時候從 Producer 的終端機輸入 Hello World2
並送出的話,Consumer 的終端機也會順利收到該 Event:
NestJS 實作了 Kafka Transporter,讓微服務應用程式可以用跟其他 Transporter 一樣的開發風格來使用 Kafka。
要使用 Kafka Transporter 之前需要先安裝下方套件:
$ npm install kafkajs
補充:kafkajs 是一套用於 Node.js 的 Kafka 客戶端,有興趣可以參考 官方文件。
修改載入點 main.ts
的內容,將 transport
設定為 Transport.KAFKA
,並根據架設的 Kafka 位址來設定 brokers
的資訊:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9094'],
},
},
},
);
await app.listen();
}
bootstrap();
上方範例在 options
只使用了 client
裡面的 brokers
,事實上,Kafka Transporter 的 options
內容十分豐富。下方列出在 Kafka Transporter 下的 options
較重要的設定:
clinet
:設置 Kafka Client,如:brokers
、clientId
等,詳細內容可以參考 kafkajs 相關文件。consumer
:設置 Consumer,如:groupId
、readUncommitted
等,詳細內容可以參考 kafkajs 相關文件。run
:設置 Consumer 處理訊息的方式,如:autoCommit
、eachBatchAutoResolve
等,詳細內容可以參考 kafkajs 相關文件。subscribe
:設置訂閱,目前只有 fromBeginning
的屬性,預設為 false
,詳細內容可以參考 kafkajs 相關文件。producer
:設置 Producer,如:createPartitioner
、allowAutoTopicCreation
等,詳細內容可以參考 kafkajs 相關文件。send
:設置 Producer 傳送訊息的方式,如:acks
、compression
等,詳細內容可以參考 kafkajs 相關文件。producerOnlyMode
:只擔任 Producer 角色,不需加入 Consumer Group。postfixId
:Client ID 的後綴,如果是微服務應用程式,預設使用 -server
,如果是客戶端,預設使用 -client
。Kafka Transporter 也支援 Request-response 與 Event-based 訊息模式。下方是範例程式碼,在 AppController
內實作 sayHello
方法並套用 @MessagePattern
裝飾器,以及實作 onOrderCreated
方法並套用 @EventPattern
裝飾器:
import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@MessagePattern('message.hello')
sayHello(data: string) {
console.log(data);
return `Hello, ${data}`;
}
@EventPattern('order.created')
onOrderCreated(order: { name: string; }) {
console.log(order);
}
}
注意:Kafka Transporter 不支援 Pattern 設定為可序列化物件,因為它會被用來當作是 Kafka Topic。
透過 bitnami/kafka 進行測試,透過下方指令啟動 Producer,並指定 Topic 為 order.created
:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic order.created --bootstrap-server <HOSTNAME>:9092
此時輸入下方訊息,並按下 Enter
鍵進行發送:
{"name":"Test"}
此時在微服務應用程式的終端機會顯示發送的訊息,且該訊息有正確地反序列化:
假如要取得該請求的相關資訊,比如:原始訊息,可以透過 @Ctx
裝飾器取得 KafkaContext
。下方是範例程式碼:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
KafkaContext
} from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('order.created')
onOrderCreated(
@Payload() order: { name: string; },
@Ctx() ctx: KafkaContext
) {
console.log(ctx.getMessage());
console.log(order);
}
}
使用 bitnami/kafka 進行測試,透過下方指令啟動 Producer,並指定 Topic 為 order.created
:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic order.created --bootstrap-server <HOSTNAME>:9092
輸入下方訊息,並按下 Enter
鍵進行發送:
{"name":"Test"}
此時在微服務應用程式的終端機會顯示發送的訊息以及原始訊息:
Kafka 為了要確保 Consumer 還在運作,會持續監聽來自 Consumer 傳送的 Heartbeat,當超過一段時間沒有收到 Heartbeat,Kafka Broker 會認定該 Consumer 已經停止運作,進而產生 會話超時(Session Timeout)。
在大部分情況下,不太會發生 Session Timeout,但如果收到訊息後處理的速度過慢,有可能因此導致 Session Timeout 的情況發生,針對這個問題,我們可以透過 KafkaContext
來取得 heartbeat
函式,在緩慢的任務中間手動觸發 Heartbeat,進而降低 Session Timeout 的可能。以下方程式碼為例,透過 @Ctx
裝飾器取得 KafkaContext
並透過 getHeartbeat
方法取得 heartbeat
函式:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
MessagePattern
} from '@nestjs/microservices';
const sleep = (milliSeconds: number) => {
return new Promise((resolve) => setTimeout(() => resolve(), milliSeconds));
};
@Controller()
export class AppController {
@MessagePattern('message.hello')
async sayHello(@Payload() data: string, @Ctx() ctx: KafkaContext) {
const heartbeat = ctx.getHeartbeat();
await sleep(15000);
heartbeat();
await sleep(15000);
console.log(data);
return `Hello, ${data}`;
}
}
使用 bitnami/kafka 進行測試,透過下方指令啟動 Producer,並指定 Topic 為 message.hello
:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic message.hello --bootstrap-server <HOSTNAME>:9092
輸入下方訊息,並按下 Enter
鍵進行發送:
HAO
等一陣子後,在微服務應用程式的終端機會顯示發送的訊息:
如果把 heartbeat
拿掉的話,重新測試一次會在微服務應用程式的終端機看到 Session Timeout 的錯誤訊息:
在某些情況下,可能會希望自行決定 Commit Offset 的時機,首先,調整 main.ts
的內容,將 autoCommit
設為 false
:
// ...
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
// ...
run: {
autoCommit: false,
},
},
},
);
await app.listen();
}
bootstrap();
接著調整 Handler 的部份,透過 KafkaContext
取得 Consumer 實例,並使用該實例的 commitOffsets
方法來手動處理:
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
EventPattern,
KafkaContext
} from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('order.created')
async onOrderCreated(
@Payload() order: { name: string; },
@Ctx() ctx: KafkaContext
) {
const { offset } = ctx.getMessage();
const partition = ctx.getPartition();
const topic = ctx.getTopic();
await ctx.getConsumer().commitOffsets([{ topic, offset, partition }]);
console.log(item);
}
}
Kafka Transporter 提供的 Client 與其他 Transporter 不同,它不使用 ClientProxy
,取而代之的是 ClientKafka
,不過整體的設定方式與使用方式沒有太大的變化。
修改 AppModule
的內容,透過 ClientsModule
建立 Kafka Transporter 的 ClientKafka
:
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
// ...
@Module({
// ...
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9094'],
},
}
}
])
]
})
export class AppModule {}
前面有提到 Kafka Transporter 也支援 Request-response 與 Event-based 訊息模式,可以透過 ClientKafka
的 send
與 emit
方法來實現。
注意:由於 Kafka Transporter 針對 Request-response 的部份有較特殊的處理,故拆成 Event-based 與 Request-response 兩個小章節進行介紹。
Event-based 與其他 Transporter 的實現方式相同,下方是範例程式碼,修改 AppController
的內容,使用 @Inject
裝飾器注入 ClientKafka
,並設計 onOrderCreated
方法:
import { Inject, Controller, Get } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
@Inject('KAFKA_SERVICE') private readonly kafkaService: ClientKafka
) {}
@Get('orderCreated')
onOrderCreated() {
return this.kafkaService.emit('order.created', { name: 'Test' });
}
}
透過 Postman 使用 GET
方法存取 http://localhost:3000/orderCreated,在微服務應用程式的終端機會看到 { name: 'Test' }
:
Kafka Transporter 為了要現雙向傳輸的 Request-response 模式,會需要額外建立一個回覆用的 Topic,並確保 Client 可以被分配到至少一個 Partition,而這個回覆用的 Topic 會以發送的 Topic 作為開頭,並以 .reply
作為結尾,以 message.hello
為例,對應的回覆 Topic 即 message.hello.reply
。
在實作方面需要特別留意,在與 Kafka Broker 建立連線前,需要透過 ClientKafka
的 subscribeToResponseOf
方法來訂閱回覆的 Topic。下方為範例程式碼,在 AppController
使用 @Inject
裝飾器注入 ClientKafka
,並實作 OnModuleInit
介面,在 onModuleInit
階段執行 subscribeToResponseOf
,並以要發送的 Topic 名稱作為參數:
import { Inject, Controller, Get, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
@Controller()
export class AppController implements OnModuleInit {
constructor(
@Inject('KAFKA_SERVICE') private readonly kafkaService: ClientKafka
) {}
onModuleInit() {
this.kafkaService.subscribeToResponseOf('message.hello');
}
}
接著,透過 bitnami/kafka 建立名稱為 message.hello.reply
的 Topic:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-topics.sh --topic message.hello.reply --create --bootstrap-server <HOSTNAME>:9092
最後,在 AppController
實作 sayHello
方法:
import { Inject, Controller, Get, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
@Controller()
export class AppController implements OnModuleInit {
constructor(
@Inject('KAFKA_SERVICE') private readonly kafkaService: ClientKafka
) {}
@Get()
sayHello() {
return this.kafkaService.send('message.hello', 'HAO');
}
onModuleInit() {
this.kafkaService.subscribeToResponseOf('message.hello');
}
}
透過 Postman 使用 GET
方法存取 http://localhost:3000,會看到下方的結果:
在某些情況下,可能會希望自訂 Headers 與 Key,實作的方式非常簡單,依照下方格式進行回傳即可:
interface KafkaOutgoingMessage<T = unknown> {
headers: Record<string, unknown>;
key: string;
value: T;
}
修改微服務應用程式的 AppController
,將 sayHello
的回傳值調整為 KafkaOutgoingMessage
,並設定 Headers customVersion
為 1
、Key 為輸入的字串:
import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
interface KafkaOutgoingMessage<T = unknown> {
headers: Record<string, unknown>;
key: string;
value: T;
}
@Controller()
export class AppController {
@MessagePattern('message.hello')
sayHello(data: string) {
const message: KafkaOutgoingMessage = {
headers: {
customVersion: '1',
},
key: data,
value: `Hello, ${data}`,
};
return message;
}
}
注意:由於是 Request-response 模式,故回傳時是將訊息寫入至回覆用的 Topic,以上方範例來說,就會是
message.hello.reply
。
此時用 bitnami/kafka 啟動 Consumer 進行觀察,透過指定 --property
的方式來決定要額外印出哪些資訊,這邊我們設置 print.key=true
以及 print.headers=true
來顯示 Key 與 headers 的資訊:
$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-consumer.sh --topic message.hello.reply --property print.key=true --property print.headers=true --bootstrap-server <HOSTNAME>:9092
透過 Postman 使用 GET
方法存取 http://localhost:3000,會在 Consumer 的終端機上看到帶有 Headers 與 Key 的資料:
回顧一下本篇的重點內容,一開始先針對 Kafka 進行了基礎的介紹,包含:Kafka 是什麼?Event/Topic/Partition 之間是什麼關係?Consumer Group 與 Rebalance 是如何運作的?Commit Offset 是如何運作的?
接著,透過 Docker 架設 Kafka Broker 並運用封裝好的 Shell Script 來操作 Kafka,包含:建立 Topic、產生 Event、訂閱 Event等。
對 Kafka 有一定程度理解以後,就開始用 Kafka Transporter 來建立基於 Kafka 的 NestJS 微服務應用程式,同時也建立基於 Kafka 的 NestJS Client。
在閱讀完本篇內容後,相信大家收穫滿滿,下一篇將會介紹目前很流行同時也是我很喜歡的 gRPC,敬請期待!