iT邦幫忙

第 11 屆 iT 邦幫忙鐵人賽

DAY 9
2
DevOps

其實我真的沒想過只是把服務丟上 kubernetes 就有這麼多問題只好來參加30天分享那些年我怎麼在 kubernetes 上踩雷各項服務系列 第 9

Kafka HA 3 - Run a basic app

  • Kafka HA on Kubernetes
    • Deploy kafka-ha
    • Kafka Introduction
    • kafka 基本使用
    • kafka utility scripts, topic operation, kafka benchmark
    • 集群內部的 HA topology
    • 集群內部的 HA 設定,網路設定
    • Prometheus Metrics Exporter 很重要
    • 效能調校

由於我比較熟悉 GCP / GKE 的服務,這篇的操作過程都會以 GCP 平台作為範例,不過操作過程大體上是跨平台通用的。

寫文章真的是體力活,覺得我的文章還有參考價值,請左邊幫我點讚按個喜歡,右上角幫我按個追縱,底下歡迎留言討論。給我一點
繼續走下去的動力。

對我的文章有興趣,歡迎到我的網站上 https://chechiachang.github.io 閱讀其他技術文章
,有任何謬誤也請各方大德直接聯繫我,感激不盡。

Exausted Cat Face


摘要

  • 在 Kubernetes 中連線 kafka
  • 使用 golang library 連線到 Kafka
  • 透過 kafka script 操作 kafka

kubernetes 中連線 kafka

先看一看 kafka pods

$ kubectl get pods --selector='app=kafka'

NAME        READY   STATUS    RESTARTS   AGE
kafka-1-0   1/1     Running   1          26d
kafka-1-1   1/1     Running   0          26d
kafka-1-2   1/1     Running   0          26d

$ kubectl get pods -l 'app=zookeeper'

NAME                  READY   STATUS    RESTARTS   AGE
kafka-1-zookeeper-0   1/1     Running   0          26d
kafka-1-zookeeper-1   1/1     Running   0          26d
kafka-1-zookeeper-2   1/1     Running   0          26d

$ kubectl get pods -l 'app=kafka-exporter'

NAME                               READY   STATUS    RESTARTS   AGE
kafka-1-exporter-88786d84b-z954z   1/1     Running   5          26d
kubectl describe pods kafka-1-0

Name:           kafka-1-0
Namespace:      default
Priority:       0
Node:           gke-chechiachang-pool-1-e4622744-wcq0/10.140.15.212
Labels:         app=kafka
                controller-revision-hash=kafka-1-69986d7477
                release=kafka-1
                statefulset.kubernetes.io/pod-name=kafka-1-0
Annotations:    kubernetes.io/limit-ranger: LimitRanger plugin set: cpu request for container kafka-broker
Status:         Running
IP:             10.12.6.178
Controlled By:  StatefulSet/kafka-1
Containers:
  kafka-broker:
    Image:         confluentinc/cp-kafka:5.0.1
    Port:          9092/TCP
    Host Port:     0/TCP
    Command:
      sh
      -exc
      unset KAFKA_PORT && \
      export KAFKA_BROKER_ID=${POD_NAME##*-} && \
      export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092 && \
      exec /etc/confluent/docker/run

    Requests:
      cpu:      100m
    Liveness:   exec [sh -ec /usr/bin/jps | /bin/grep -q SupportedKafka] delay=30s timeout=5s period=10s #success=1 #failure=3
    Readiness:  tcp-socket :kafka delay=30s timeout=5s period=10s #success=1 #failure=3
    Environment:
      POD_IP:                                   (v1:status.podIP)
      POD_NAME:                                kafka-1-0 (v1:metadata.name)
      POD_NAMESPACE:                           default (v1:metadata.namespace)
      KAFKA_HEAP_OPTS:                         -Xmx4G -Xms1G
      KAFKA_ZOOKEEPER_CONNECT:                 kafka-1-zookeeper:2181
      KAFKA_LOG_DIRS:                          /opt/kafka/data/logs
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE:  false
      KAFKA_DEFAULT_REPLICATION_FACTOR:        3
      KAFKA_MESSAGE_MAX_BYTES:                 16000000
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:  1
      KAFKA_JMX_PORT:                          5555
    Mounts:
      /opt/kafka/data from datadir (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-2tm8c (ro)
Conditions:
Volumes:
  datadir:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  datadir-kafka-1-0
    ReadOnly:   false
  default-token-2tm8c:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-2tm8c
    Optional:    false

講幾個重點:

  • 這邊跑起來的是 kafka-broker,接收 producer 與 consumer 來的 request
  • 這邊用的是 statefulsets,不是完全無狀態的 kafka broker,而把 message 記在 datadir 上,降低故障重啟時可能遺失資料的
    風險。
  • 啟動時,把 kubernetes 指定的 pod name 塞進環境變數,然後作為當前 broker 的 ID
  • 沒有設定 Pod antiAffinity,所以有可能會啟三個 kafka 結果三個跑在同一台 node 上,這樣 node 故障就全死,沒有HA

Service & Endpoints

看一下 service 與 endpoints
zookeeper 與 exporter 我們這邊先掠過不談,到專章講高可用性與服務監測時,再來討論。

$ kubectl get service -l 'app=kafka'

NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
kafka-1            ClusterIP   10.15.242.178   <none>        9092/TCP   26d
kafka-1-headless   ClusterIP   None            <none>        9092/TCP   26d

兩個 services

  • 一個是 cluster-ip service,有 single cluster IP 與 load-balance,DNS 會過 kube-proxy。
  • 一個是 headless service,DNS 沒有過 kube-proxy,而是由 endpoint controller 直接 address record,指向把符合 service selector 的 pod。適合做 service discovery,不會依賴於 kubernetes 的實現。

詳細說明在官方文件

簡單來說,kafka broker 會做 auto service discovery,我們可以使用 headless service。

客戶端(consumer & producer) 連入時,則使用 cluster-ip service,做 load balancing。

$ kubectl get endpoints -l 'app=kafka'

NAME                            ENDPOINTS                                                          AGE
kafka-1                         10.12.1.14:9092,10.12.5.133:9092,10.12.6.178:9092                  26d
kafka-1-headless                10.12.1.14:9092,10.12.5.133:9092,10.12.6.178:9092                  26d

Golang Example

附上簡單的 Golang 客戶端,完整 Github Repository 在這邊

package main

import (
  "context"
  "fmt"
  "strconv"
  "time"

  "github.com/segmentio/kafka-go" // 使用的套件
)

func main() {
  topic := "ticker" // 指定 message 要使用的 topic
  partition := 0 // 指定 partition,由於底下連線指定連線到 partition 的 leader,所以需要指定 partition
  kafkaURL := "kafka-0:9092" // 指定 kafkaURL,也可以透過 os.GetEnv() 從環境變數裡拿到。

  // producer 對指定 topic, partition 的 leader 產生連線
  producerConn, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
  // 程式結束最後把 connection 關掉。不關會造成 broker 累積大量 connection,需要等待 broker 端 timeout 才會釋放。
  defer producerConn.Close()

  //producerConn.SetWriteDeadline(time.Now().Add(10 * time.Second))
  // 使用 go routine 跑一個 subprocess for loop,一直產生 message 到 kafka topic,這邊的範例是每秒推一個秒數。
  go func() {
    for {
      producerConn.WriteMessages(
        kafka.Message{
          Value: []byte(strconv.Itoa(time.Now().Second())),
        },
      )
      time.Sleep(1 * time.Second)
    }
  }()

  // make a new reader that consumes from topic-A, partition 0
  r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{kafkaURL},
    Topic:     topic,
    Partition: 0,
    MinBytes:  10e2, // 1KB
    MaxBytes:  10e3, // 10KB
  })
  defer r.Close()
  //r.SetOffset(42)

  // 印出 reader 收到的 message
  for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
      break
    }
    fmt.Printf("%v message at offset %d: %s = %s\n", time.Now(), m.Offset, string(m.Key), string(m.Value))
  }

}

這邊可以使用 Dockerfile 包成一個 container image,然後丟上 kubernetes

我稍晚補一下 docker image 跟 deployment 方便大家操作好了。

或是懶人測試,直接 kubectl run 一個 golang base image 讓它 sleep,然後再連進去

kubectl run DEPLOYMENT_NAME --image=golang:1.13.0-alpine3.10 sleep 3600

kubectl exec -it POD_NAME sh
# 裡面沒有 Git 跟 vim 裝一下
apk add git vim

go get github.com/chechiachang/kafka-on-kubernetes

cd src/github.com/chechiachang/kafka-on-kubernetes/
vim main.go

go build .
./kafka-on-kubernetes

2019-09-24 14:20:46.872554693 +0000 UTC m=+9.154112787 message at offset 1:  = 46
2019-09-24 14:20:47.872563087 +0000 UTC m=+9.154121166 message at offset 2:  = 47
2019-09-24 14:20:48.872568848 +0000 UTC m=+9.154126926 message at offset 3:  = 48
2019-09-24 14:20:49.872574499 +0000 UTC m=+9.154132576 message at offset 4:  = 49
2019-09-24 14:20:50.872579957 +0000 UTC m=+9.154138032 message at offset 5:  = 50
2019-09-24 14:20:51.872588823 +0000 UTC m=+9.154146892 message at offset 6:  = 51
2019-09-24 14:20:52.872594672 +0000 UTC m=+9.154152748 message at offset 7:  = 52
2019-09-24 14:20:53.872599986 +0000 UTC m=+9.154158060 message at offset 8:  = 53

這樣就連上了,完成一個最簡單的使用範例。

這個例子太過簡單,上一篇講的 consumer group, partitions, offset 什麼設定全都沒用上。實務上這些都需要好好思考,並且依
據需求做調整設定。

Clean up

把測試用的 deployment 幹掉

kubectl delete deployment DEPLOYMENT_NAME

小結

  • 簡述 kafka 在 kubernetes 上運行的狀況,連線方法
  • Demo 一個小程式

上一篇
Kafka HA 2 - Introduction
下一篇
Kafka HA 4 - Run operation scripts
系列文
其實我真的沒想過只是把服務丟上 kubernetes 就有這麼多問題只好來參加30天分享那些年我怎麼在 kubernetes 上踩雷各項服務30

2 則留言

0
chichi
iT邦新手 5 級 ‧ 2019-09-25 08:04:25

或是攋人測試,直接 kubectl run 一個 golang base image 讓它 sleep,然後在連進去。
攋人 -> 懶人。

為何要做 sleep 3600 才能執行 exec 進去該 POD。

感謝協助修正~

k8s 跑一個 Pod,通常會指定一個 entrypoint,作為這個 Pod 的 main process。例如我起一個 API server,他的 main process 就是 serve 某個 port。

k8s 預設會監測這個 main process 作為 liveness check,如果 process 斷了,k8s 會判斷 main process 出問題,依照預設設定重啟整個 Pod。例如 API server main process 死了,可能就會想要自動重啟 Pod。

我們今天開這個 golang Pod,只需要 golang library,不希望裡面已經有 main process,或是因為 main process healthcheck 失敗讓我們的 Pod 被重啟,只是希望這個 Pod 一直開啟著讓我們開發或除錯,所以讓他一直睡。

這跟使用 Busybox在 K8s debug 時的用法一樣。

0
ccshih
iT邦新手 4 級 ‧ 2019-09-25 10:24:37

當客戶端在 k8s 外也能連上嗎?

這個範例不行

還需補上 ingress ,或是更改 service 成為 load balancer,去接 gcp load balancer

然後把 gcp node-level 的 firewall port 9092 打開

chichi iT邦新手 5 級 ‧ 2019-09-26 20:06:20 檢舉

了解!!

我要留言

立即登入留言