docker-compose範例: 在AWS EC2 上設定Advertised Listeners (含外網 / 內網Listener)
在之前的系列文章中,們已經介紹了 Kafka 的特性、角色與運作方式。講完了理論,這篇文章我們回到實作面,以我們的專案情境為例,說明如何使用 Docker 執行 Kafka。
透過Docker部署Kafka非常方便,Kafka的Docker Hub提供了現成的映像檔(Image),只要設定基本環境並啟動容器(Conatiner)即可。
然而,啟動Kafka很簡單,但要讓別人順利的連線到Kafka,這中間的眉眉角角可能比你我想像中的還多。我在設定過程中著實卡了一陣子,希望這篇文章可以幫大家少繞一點路。
如果你只是想讓遠端的Client 連上EC2上的 Kafka broker,只需要在docker-compose中的environment加上這一行:
kafka:
...
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你的 EC2 IP 或外部主機名稱:9092
並且確保你的docker和EC2 security group開放 9092 port,一切搞定。
這個問題在Stack Overflow上有很多類似的討論,可能因為大家實在問了太多次了,Confluent官方還發表了一篇文章,專門來解釋這其中的原理。
在這篇文章中,我會整理如何讓遠端的 Producer / Consumer 連線到 EC2 上的 Kafka Broker,並解釋必須進行的相關設定。
注: 如果使用的是 Confluent Cloud,則不需要設定 listener,這篇文章只針對使用docker-compose執行
Kafka container的情況。
當Client嘗試連線到 Kafka broker時,Client實際上是先向Kafka取得 meta data(包括 Leader Partition 的 broker 地址),再進行資料傳輸。
Listener 的設定分為 Listeners
和 Advertised Listeners
兩部分
實作上,我們需要設定監聽0.0.0.0(監聽機器上所有interface)的Listener,以及配對的Advertised Listener。
再接著說明細節之前,我們先介紹Leader partition和Follower partition,這兩個在連線中會參與的重要角色。
當Client連接到Kafka時,首先需要取得的,就是Leader Partition所在Broker的連線資訊。
回到連線的過程,以下以我們專案的情境為例: 在Lambda Function中,使用KafkaJS實作的Producer。具體步驟如下:
圖: Client端先透過Initial connection,跟Broker拿到真正的連線方式(URL),對Kafka傳送訊息
1.Initial connection: Client Request (Connect by Host IP)
2.Initial connection: Server Response(Response: meta data)
3.Send Message
這邊要注意的是,我們在Client端指定的Kafka Host,只是一個可以拿到Meta data的位址。
Client可以透過步驟1~2 (Initial connection),去Broker Cluster取得Meta data。這份Meta Data裡面,才包含了實際要讀寫的Broker連線位址,也就是Leader Partition所在Broker位址。
而這個Meta Data的內容,要在Kafka Broker(server端)上透過advertised.listeners
或者KAFKA_ADVERTISED_LISTENERS設定
。如果沒設定的話,預設是回應broker所在的host name。
所以如果要讓Client正確讀寫遠端broker的資料,需要在Kafka Broker config設定好連線方式,不然就會看到明明Client端可以成功connect到Broker Cluster,卻又出現連線到localhost:9092失敗這種奇怪的事情。
Advertised Listener的HostName / IP: port
直接以一個docker-compose中的設定為例:
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://ec2–1x–1xx–1xx–1xx.ap-southeast-1.compute.amazonaws.com:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
這邊設定了兩個Listener,名字分別叫做INTERNAL以及EXTERNAL,他們都使用PLAINTEXT(表示沒有加密)做連線的protocol,並監聽所有的interface(0.0.0.0),並且分別監聽9092
和9093
port
當Client對broker的9092
port做initial connection時,broker會回應我們設定的localhost:9092。這是因為INTERNAL
這個Listener有設定同名的ADVERTISED_LISTENER。
而當Client對9093
port做initial connection時,同樣也會回應跟EXTERNAL
同名的 ADVERTISED_LISTENER的url: ec2–1x–1xx–1xx–1xx.ap-southeast-1.compute.amazonaws.com:9093
最後我們在KAFKA_INTER_BROKER_LISTENER_NAME指定使用INTERNAL作為broker之間溝通的Listener。
PLAINTEXT
(沒加密, 僅建議測試用), SSL
, SASL_PLAINTEXT
, SASL_SSL
指定要使用哪一個Listner來讓Broker彼此溝通
0.0.0.0
)。需注意下面的原理以及限制:
EC2的hostname是只有AWS內網可以resolved的Internal hostname
,會造成Client在Initial connection後,拿到的Meta data是一個連不到目標kafka broker的Hostname。因為Client端通常不在AWS內網裡面。docker-compose範例:在AWS EC2上設定Advertised Listeners: (單一Listener)
version: "2"
services:
zookeeper:
image: zookeeper:3.4
mem_limit: 104857600
container_name: zookeeper
restart: unless-stopped
ports:
# exposing for debug reason
- 2181:2181
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/datalog:/datalog"
- "./zookeeper/zoo.cfg:/conf/zoo.cfg"
kafka:
image: 'bitnami/kafka:2.1.1'
hostname: localhost
container_name: kafka
ports:
- '9092:9092'
# expose the external port for external clients
- '9093:9093'
environment:
# Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
- KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
# set advertised listeners of default listener to the hostname which can be resolved both internally and externally
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://EC2的external hostname或IP:9092
volumes:
- KAFKA_VOLUMES:/bitnami/kafka
depends_on:
- zookeeper
volumes:
KAFKA_VOLUMES:
docker-compose範例: 在AWS EC2 上設定Advertised Listeners (外網/內網Listener)
version: "2"
services:
zookeeper:
image: zookeeper:3.4
mem_limit: 104857600
container_name: zookeeper
restart: unless-stopped
ports:
# exposing for debug reason
- 2181:2181
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/datalog:/datalog"
- "./zookeeper/zoo.cfg:/conf/zoo.cfg"
kafka:
image: 'bitnami/kafka:2.1.1'
hostname: localhost
container_name: kafka
ports:
- '9092:9092'
# expose the external port for external clients
- '9093:9093'
environment:
# Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
- KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
# set listeners and advertised listeners for external (9093) and internal(9092)
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://EC2的external hostname或IP:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
volumes:
- KAFKA_VOLUMES:/bitnami/kafka
depends_on:
- zookeeper
volumes:
KAFKA_VOLUMES:
這邊要注意的是,除了要設定Listener對應的KAFKA_ADVERTISED_LISTENERS以外,
還必須設定KAFKA_INTER_BROKER_LISTENER_NAME,來告訴kafka,broker彼此之間的通訊要用哪一個listener。否則會出現下面的錯誤,即便你只設定了一個Listener也一樣:
ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are INTERNAL,EXTERNAL
使用Kafka提供的Producer程式,測試上面docker-compose的設定中,這兩個Port的連線:
docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list kafka_broker_ip:9093 -topic TEST_TOPIC
docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh - broker-list kafka broker ip:9092 - topic TEST_TOPIC
...ERROR Error when sending message to topic TEST_TOPIC with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
以上就是連線到Kafka的設定方式。設定完成後,就可以讓專案中的AWS Lambda (Producer) 和 Bot Server(Consumer)使用Kafka作為Message Queue了。
本文修改自作者的blog文章: [Kafka] 設定Listener / Advertised Listener:讓Client遠端連線到AWS EC2上的Kafka)