iT邦幫忙

2021 iThome 鐵人賽

DAY 10
1
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 10

卡夫卡的藏書閣【Book10】- Kafka Connect 1

“Books are a narcotic.”
― Franz Kafka
突然想起了恐龍書


Kafka Connect 是一個可靠的、可隨情境擴增或是縮減的資料傳輸工具,用來處理 Kafka 跟其他資料儲存系統間的資料傳輸,透過定義 connector 可以輕易地從 Kafka 傳入和傳出大量資料。

Kafka Connect 可以消化掉整個資料庫、搜集你app server的資料集,放入到 Kafka topics 中,確保資料在低延遲下可以被取用。匯出方面,Connector 可以將 Kafka topics 的資料傳送給 Elasticsearch 或是離線分析用的Hadoop系統。

Kafka Connect 可以分為匯入的 source connector 跟匯出 sink connector,目前可以支援的範圍很廣,你可以將 Microsoft SQL Server、MQTT、Java JDBC、IMB MQ、salesforce、JSON檔案、poster Sql、CSV檔案、Mysql...等資料透過 source connector 匯入 Kafka topic,再透過 sink connector 將資料匯出到 Google BigQuery、hadoop、Amazon S3、elasticsearch、snowflake、ORACLE、各類DB...等。

坐而言不如起而行,今天會帶大家簡單實作一個小練習,我們將會透過 Kafka Connect 讓兩個資料庫資料對接,做到類似ETL的功能,主要分為三個步驟:

  1. 設定來源資料庫跟目標資料庫
  2. 下載所需的 connect 和設定 Kafka connect-distributed
  3. 新增 Source connector
  4. 新增 Sink connector

Step 1: 設定來源資料庫跟目標資料庫

首先,在本地的 Mysql 創建一個來源資料庫跟目標資料庫

mysql> create database `source_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.00 sec)

mysql> create database `target_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.01 sec)

在來源資料庫 source_database 和 target_database 各創建一張表當作資料來源

mysql> use source_database;
Database changed
mysql> CREATE TABLE `source_users` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`username` VARCHAR(20) NOT NULL,   
`nickname` VARCHAR(20) NOT NULL,   
PRIMARY KEY (`id`) 
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.05 sec)

mysql> use target_database;
Database changed
mysql> CREATE TABLE `target_users` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `username` VARCHAR(20) NOT NULL,
  `nickname` VARCHAR(20) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.03 sec)

Step 2: 下載所需的 connect 和設定 Kafka connect-distributed

  • 下載 Confluent 的 JDBC connect
    • 將檔案下載到 Kafka Server 所在的機器
  • 因為今天練習是用 Mysql,所以還需要下載 maven mysql connect (Maven Repository: mysql » mysql-connector-java)
    • wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
  • 解壓縮 unzip confluentinc-kafka-connect-jdbc-10.2.1.zip
  • 創建資料夾 mkdir -p kafkaConnect/lib
  • 將剛剛下載的 jdbc 檔案都移到資料夾中,mysql 的 connector 移到 lib 資料夾中
mv confluentinc-kafka-connect-jdbc-10.2.1 kafkaConnect/
mv mysql-connector-java-8.0.20.jar kafkaConnect/lib/

我們今天是採用 connector 的 distributed 模式,另外還有 standalone 模式,官方建議線上採用 distributed 的模式,因為可擴增性、可用性和管理等各方面都更佳

  • 需要修改相對應的設定檔 connect-distributed.properties
bootstrap.servers=127.0.0.1:9092 # 指到 Broker Server 的 IP 位子
group.id=connect-cluster
rest.port=8083 # REST 介面監聽的 port,預設是8083,順便一題如果你是用 connect 的 standalone 模式,預設 port 是 8084。
plugin.path=/usr/local/etc/kafkaConnect # 剛剛創建資料夾的絕對路徑
  • 執行 kafka connect
    • connect-distributed /usr/local/etc/kafka/connect-distributed.properties
  • 啟動完成後,可以查看目前的 connector 的 plugin
$ curl 'http://127.0.0.1:8083/connector-plugins'
[
   {
      "class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "type":"sink",
      "version":"10.2.1"
   },
   {
      "class":"io.confluent.connect.jdbc.JdbcSourceConnector",
      "type":"source",
      "version":"10.2.1"
   },
   {
      "class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
      "type":"sink",
      "version":"2.8.0"
   },
   {
      "class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "type":"source",
      "version":"2.8.0"
   },
   {
      "class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
      "type":"source",
      "version":"1"
   },
   {
      "class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
      "type":"source",
      "version":"1"
   },
   {
      "class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "type":"source",
      "version":"1"
   }
]

上一篇
卡夫卡的藏書閣【Book9】- Kafka Partition Reassign
下一篇
卡夫卡的藏書閣【Book11】- Kafka Connect 2
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30

尚未有邦友留言

立即登入留言