iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 10
1
  • Kafka HA on Kubernetes
    • Deploy kafka-ha
    • Kafka Introduction
    • kafka 基本使用
    • kafka operation scripts
    • 集群內部的 HA topology
    • 集群內部的 HA 設定,網路設定
    • Prometheus Metrics Exporter 很重要
    • 效能調校

由於我比較熟悉 GCP / GKE 的服務,這篇的操作過程都會以 GCP 平台作為範例,不過操作過程大體上是跨平台通用的。

寫文章真的是體力活,覺得我的文章還有參考價值,請左邊幫我點讚按個喜歡,右上角幫我按個追縱,底下歡迎留言討論。給我一點繼續走下去的動力。

對我的文章有興趣,歡迎到我的網站上 https://chechiachang.github.io 閱讀其他技術文章,有任何謬誤也請各方大德直接聯繫我,感激不盡。

Exausted Cat Face


摘要

  • 從 Zookeeper 獲取資訊
  • 取得並處理 topic
  • benchmark kafka

zookeeper

zookeeper 是 kafka 的分散式協調系統,在 kafka 上多個節點間需要協調的內容,例如:彼此節點的ID,位置與當前狀態,或是跨節點 topic 的設定與狀態。取名叫做 zookeeper 就是在協調混亂的分散式系統,,裡面各種不同種類的服務都要協調,象個動物園管
理員。Zookeeper 的官方文件 有更詳細的說明。

Kafka 的節點資訊,與當前狀態,是放在 zookeeper 上,我們可以透過以下指令取得

# 首先先取得 zkCli 的 cli,這個只有連進任何一台 zookeeper 內部都有
kubectl exec -it kafka-0-zookeeper-0 --container kafka-broker bash

# 由於是在 Pod 內部,直接 localhost 詢問本地
/usr/bin/zkCli.sh -server localhost:2181

Connecting to localhost:2181
2019-09-25 15:02:36,089 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2019-09-25 15:02:36,096 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=kafka-0-zookeeper-0.kafka-0-zookeeper-headless.default.svc.cluster.local
2019-09-25 15:02:36,096 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_131
2019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
2019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/bin/../build/classes:/usr/bin/../build/lib/*.jar:/usr/bin/../share/zookeeper/zookeeper-3.4.10.jar:/usr/bin/../share/zookeeper/slf4j-log4j12-1.6.1.jar:/usr/bin/../share/zookeeper/slf4j-api-1.6.1.jar:/usr/bin/../share/zookeeper/netty-3.10.5.Final.jar:/usr/bin/../share/zookeeper/log4j-1.2.16.jar:/usr/bin/../share/zookeeper/jline-0.9.94.jar:/usr/bin/../src/java/lib/*.jar:/usr/bin/../etc/zookeeper:
2019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
2019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=4.14.127+
2019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=zookeeper
2019-09-25 15:02:36,102 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/home/zookeeper
2019-09-25 15:02:36,102 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/
2019-09-25 15:02:36,105 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@42110406
Welcome to ZooKeeper!
2019-09-25 15:02:36,160 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2019-09-25 15:02:36,374 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2019-09-25 15:02:36,393 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x16d67baf1310001, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]

取得 kafka broker 資料

# List root Nodes
$ ls /

[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

# Brokers 的資料節點
$ ls /brokers
[ids, topics, seqid]

# List /brokers/ids 得到三個 kafka broker
$ ls /brokers/ids
[0, 1, 2]

# 列出所有 topic 名稱
ls /brokers/topics
[ticker]

ticker 是上篇範利用到的 topic

簡單來說,zookeeper 存放這些狀態與 topic 的 metadata

  • 儲存核心的狀態與資料,特別是 broker 萬一掛掉,也還需要維持的資料
  • 協調工作,例如協助 broker 處理 quorum,紀錄 partition master 等
# 離開 zkCli
quit

Kafka

這邊一樣先連線進去一台 broker,取得 kafka binary

kubectl exec -it kafka-0-0 --container kafka-broker bash

 ls /usr/bin/ | grep kafka
kafka-acls
kafka-broker-api-versions
kafka-configs
kafka-console-consumer
kafka-console-producer
kafka-consumer-groups
kafka-consumer-perf-test
kafka-delegation-tokens
kafka-delete-records
kafka-dump-log
kafka-log-dirs
kafka-mirror-maker
kafka-preferred-replica-election
kafka-producer-perf-test
kafka-reassign-partitions
kafka-replica-verification
kafka-run-class
kafka-server-start
kafka-server-stop
kafka-streams-application-reset
kafka-topics
kafka-verifiable-consumer
kafka-verifiable-producer

很多工具,我們這邊只會看其中幾個

topic 資訊

Topic 的資訊,跟 zookeeper 要

# List topics
/usr/bin/kafka-topics --list --zookeeper kafka-0-zookeeper

ticker

操作 message

從 topic 取得 message

# This will create a new console-consumer and start consuming message to stdout
/usr/bin/kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic engine_topic_soundwave_USD \
--timeout 0 \
--from-beginning

如果 ticker 那個 example pod 還在執行,這邊就會收到 ticker 的每秒 message

如果沒有,也可以開啟另一個 broker 的連線

kubectl exec -it kafka-0-1 --container kafka-broker bash

# 使用 producer 的 console 連入,topic 把 message 塞進去
/usr/bin/kafka-console-producer \
--broker-list localhost:9092\
 --topic ticker

tick [enter]
tick [enter]

kafka-console-consumer 那個 terminal 就會收到 message

tick
tick

當然也可以使用 consumer group

# Use consumer to check ticker topics
/usr/bin/kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic ticker \
--group test

有做過上面的操作產生 consumer group,就可以透過 consumer API,取得 consumer group 狀態

# Check consumer group
/usr/bin/kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group ticker \
--describe

Consumer group 'test' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
ticker          0          23              23              0               -               -               -

Topic 設定操作

Topic 設定文件 在此

這邊透過 kafka-configs 從 zookeeper 取得 topic 設定,這邊的 max.message.bytes,是這個 topic 每個 message 的最大上限。

/usr/bin/kafka-configs --zookeeper kafka-0-zookeeper:2181 --describe max.message.bytes --entity-type topics

Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Configs for topic 'ticker' are

__consumer__offsets 是系統的 topic ,紀錄目前 consumer 讀取的位置。

ticker 沒有設定,就是 producer 當初產生 topic 時沒有指定,使用 default 值

由於我們公司的使用情境常常會超過,所以可以檢查 producer app 那端送出的 message 大小,在比較這邊的設定。當然現在 ticker 的範例,只有一個 0-60 的數值,並不會超過。這個可以在 helm install 的時候,使用 value.yaml 傳入時更改。

不喜歡這個值,可以更改,這邊增加到 16MB

TOPIC=ticker

/usr/bin/kafka-configs \
  --zookeeper kafka-3-zookeeper:2181 \
  --entity-type topics \
  --alter \
  --entity-name ${TOPIC} \
  --add-config max.message.bytes=16000000

Benchmark

使用內建工具跑 benchmark

Producer

/usr/bin/kafka-producer-perf-test \
  --num-records 100 \
  --record-size 100 \
  --topic performance-test \
  --throughput 100 \
  --producer-props bootstrap.servers=kafka:9092 max.in.flight.requests.per.connection=5 batch.size=100 compression.type=none

100 records sent, 99.108028 records/sec (0.01 MB/sec), 26.09 ms avg latency, 334.00 ms max latency, 5 ms 50th, 70 ms 95th, 334 ms 99th, 334 ms 99.9th.

Consumer

/usr/bin/kafka-consumer-perf-test \
  --messages 100 \
  --broker-list=kafka:9092 \
  --topic performance-test \
  --group performance-test \
  --num-fetch-threads 1

上一篇
Kafka HA 3 - Run a basic app
下一篇
Kafka HA 5 - High Availability Topology
系列文
其實我真的沒想過只是把服務丟上 kubernetes 就有這麼多問題只好來參加30天分享那些年我怎麼在 kubernetes 上踩雷各項服務30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
wajika
iT邦新手 5 級 ‧ 2021-02-27 10:50:55

想看更多的使用内建工具的操作案例。

好奇问下,我看文章里使用kafka-producer-perf-test --producer-props加了一些参数,你使用这些参数时是怎么思考的?为什么要加?

上面的參數有些只是我測試時候留下的,並沒有太多意義

例如 max.in.flight.requests.per.connection 這個參數 default 就是為 5,就不需要設定

實務上,我們會希望 producer performance test 行為,越接近我們實務的案例越好,可以考慮使用上的情境,來進行模擬

我要留言

立即登入留言