“In man's struggle against the world, bet on the world.”
― Franz Kafka
Kafka MirrorMaker
是 Kafka
提供的一個工具去讓你可以將來源叢集 ( Cluster ) 的資料同步到目標叢集,基本上的做法很單純就是利用消費者去消費來源叢集指定的Topic
,再使用生產者去新增到目標叢集,藉此達到資料同步的目的。比較常見的使用場景是災害備援,將資料鏡像複製到另外一個備份的資料中心當備援,或是將資料鏡像後提供給不同消費者去使用,這樣可以提高不少的吞吐量和容錯度。
目前環境:
1個 zookeeper: localhost:2181
3個 broker: localhost:9092, :9093, :9094
準備工作:
新建一個 zookeeper
使用 localhost:2182
新建3個 broker
使用 localhost:9192, 9193, 9194
$ cd /usr/local/etc/kafka
$ cp zookeeper.properties zookeeper-2.properties
$ vim zookeeper-2.properties
修改內容如下:
dataDir=/usr/local/var/lib/zookeeper-2
clientPort=2182
broker.id=200
listeners=PLAINTEXT://localhost:9192
log.dirs=/usr/local/var/lib/kafka-logs/broker200
zookeeper.connect=localhost:2182
broker.id=201
listeners=PLAINTEXT://localhost:9193
log.dirs=/usr/local/var/lib/kafka-logs/broker201
zookeeper.connect=localhost:2182
broker.id=202
listeners=PLAINTEXT://localhost:9194
log.dirs=/usr/local/var/lib/kafka-logs/broker202
zookeeper.connect=localhost:2182
$ kafka-server-start /usr/local/etc/kafka/mirror-server-1.properties
$ kafka-server-start /usr/local/etc/kafka/mirror-server-2.properties
$ kafka-server-start /usr/local/etc/kafka/mirror-server-3.properties
查看 Broker 詳細資料,可以看到 Broker200 的 cluster.id
是跟原本broker1
是不一樣的,因為是連到不同Zookeeper
$ grep --color 'log.dirs' mirror-server-1.properties
log.dirs=/usr/local/var/lib/kafka-logs/broker200
$ cd /usr/local/var/lib/kafka-logs/broker200
$ cat meta.properties
#
#Fri Sep 03 12:08:56 CST 2021
broker.id=200
version=0
cluster.id=jwCSNMp4RLGf9aQg__VFXw
mirror-consumer.properties
和mirror-producer.properties
$ cp consumer.properties mirror-consumer.properties
修改內容如下:
bootstrap.servers=localhost:9192,localhost:9193,locaslhost:9194
group.id=test-MirrorMaker-group
exclude.internal.topics=true
mirror.topics.whitelist=app_log
client.id=mirror_maker_consumer
$ cp producer.properties mirror-producer.properties
修改內容如下:
bootstrap.servers=localhost:9192,localhost:9193,localhost:9194
acks=1
batch.size=100
client.id=mirror_maker_producer
在目標Cluster
建立跟來源Cluster
一樣的Topic
另外,如果你的消費者在目標叢集是平行併發去消費的話,那很重要的話是,你必須要記得要新增建立跟目標叢集一樣數量的分區 ( partition ),舉例來說,如果你的主題名稱為 keyin-logs
的主題擁有六個分區在目標叢集,要確保你的叢集也有六個分區,如果你的目標叢集未預先建立好分區,那 mirrormaker
會嘗試去建議主題,目前叢集會依據設定檔的 num.partitions
參數和 num.replicas
參數去建立分區,但這個數量可能就不會符合使用者希望的分區數量
$ kafka-topics --create --zookeeper 127.0.0.1:2182 --replication-factor 2 --partitions 3 --topic topicWithThreeBroker
MirrorMaker
通常建議在目標叢集上使用 MirrorMaker
以下為指令 MirrorMaker
的相關參數
$ kafka-mirror-maker
可選參數 說明
------ -----------
--abort.on.send.failure <String: Stop 噴錯失敗的時候退出 mirror maker
the entire mirror maker when a send (預設值: true)
failure occurs>
--consumer.config <String: config file> 消費者設定檔
--consumer.rebalance.listener <String: 消費者重新平衡監聽器
A custom rebalance listener of type
ConsumerRebalanceListener>
--message.handler <String: A custom 處理生產者跟消費者間的訊息處理器
message handler of type
MirrorMakerMessageHandler>
--message.handler.args <String: Mirror Maker 使用的訊息處理器
Arguments passed to message handler
constructor.>
--num.streams <Integer: Number of 消費者串流的數量
threads> (預設值: 1)
--offset.commit.interval.ms <Integer: 提交偏移量的間隔時間
offset commit interval in (預設值: 60000毫秒)
millisecond>
--producer.config <String: config file> Embedded producer config.
--rebalance.listener.args <String: Arguments used by custom rebalance
Arguments passed to custom rebalance listener for mirror maker consumer.
listener constructor as a string.>
--version 顯示 Kafka 的版本
--whitelist <String: Java regex 指定要進行鏡像的 topic,這個參數支援 Java 的正則表達式,可以使用`--whitelist 'topicA|topicB`,或是`--whitelist '*'`
(String)>