iT邦幫忙

2021 iThome 鐵人賽

DAY 11
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 11

卡夫卡的藏書閣【Book11】- Kafka Connect 2

Step3. 新增 Source connector

  • 可以查看一下當前的 connector
$ curl 'http://127.0.0.1:8083/connectors'
[]
  • 接著要新增 source connector:
$ curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{
"name":"test-upload-source-mysql",
"config":{
    "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=資料庫密碼",
    "table.whitelist":"source_users",
    "incrementing.column.name": "id",
    "mode":"incrementing",
    "topic.prefix": "test-mysql-"}
}'

這邊說明 config 參數:

  • name:指定新增 connector 的名稱
  • config:指定 connector 的設定資訊
    • connector.class:使用哪個 connector 類別
    • connection.url:連結 Mysql 的 url
    • table.whitelist:下載哪些表格
    • incrementing.column.name:增長的欄位名稱
    • mode:指定 connector 的模式
    • topic.prefix:Kafka會新增一個 Topic,這邊是指令該 Topic 的前綴,最後產生的名稱會是前綴加上表格名稱,Ex. test-mysql-source_users

建立成功會出現以下訊息:

HTTP/1.1 201 Created
Date: Sun, 29 Aug 2021 10:11:24 GMT
Location: http://127.0.0.1:8083/connectors/test-upload-source-mysql
Content-Type: application/json
Content-Length: 377
Server: Jetty(9.4.39.v20210325)

{
   "name":"test-upload-source-mysql",
   "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=資料庫密碼",
      "table.whitelist":"source_users",
      "incrementing.column.name":"id",
      "mode":"incrementing",
      "topic.prefix":"test-mysql-",
      "name":"test-upload-source-mysql"
   },
   "tasks":[],
   "type":"source"
}
  • 查看當前建立的 connectors
$ curl http://127.0.0.1:8083/connectors
["test-upload-source-mysql"]
  • 查看 connector 運行狀態
$ curl http://127.0.0.1:8083/connectors/test-upload-source-mysql/status
{
   "name":"test-upload-source-mysql",
   "connector":{
      "state":"RUNNING",
      "worker_id":"192.168.133.118:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"192.168.133.118:8083"
      }
   ],
   "type":"sink"
}
  • 這邊整理 REST API
    | Method | REST API | 說明 |
    | —————— | ——————- | ———- |
    | GET | /connectors | 取得所有正在運作中的 connector |
    | POST | /connectors | 新增一個 connector |
    | GET | /connectors/{name} | 取得指定 connector 的資訊 |
    | GET | /connectors/{name}/config | 取得指定 connector 的設定資訊 |
    | PUT | /connectors/{name}/config | 修改指定 connector 的設定資訊 |
    | GET | /connectors/{name}/status | 取得指定 connector 的運行狀態(運行中、停止、失敗),如果有發生錯誤,也會顯示具體的錯誤資訊 |
    | GET | /connectors/{name}/tasks | 取得指定 connector 運行中的 task |
    | GET | /connectors/{name}/tasks/{tasksId}/status | 取得指定 connector 指令的 task 狀態 |
    | PUT | /connectors/{name}/pause | 暫停指定的 connector 和它的 task |
    | PUT | /connectors/{name}/resume | 恢復一個暫停中的 connector |
    | POST | /connectors/{name}/restart | 重新啟動一個 connector |
    | POST | /connectors/{name}/tasks/{taskID}/restart | 重新啟動一個 task |
    | DELETE | /connectors/{name} | 刪除一個 connector,停止它的所有 task 並且刪除相關 config |

  • 正常運作的 connect 會將 mysql 改變的資料送給 topic test-mysql-sink_users,可以用 consumer 去看資料內容

  • 在資料庫新增幾筆資料

INSERT INTO source_users(`username`, `nickname`) VALUES('小熊維尼', 'polar bear');
INSERT INTO source_users(`username`, `nickname`) VALUES('大谷翔平', '笑死');
INSERT INTO source_users(`username`, `nickname`) VALUES('鄧不利多', '校長');
  • 開啟 consumer,可以看到剛剛 mysql 新增的資料已 JSON 格式存在 topic中。
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-mysql-source_users --from-beginning

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":false,
            "field":"username"
         },
         {
            "type":"string",
            "optional":false,
            "field":"nickname"
         }
      ],
      "optional":false,
      "name":"source_users"
   },
   "payload":{
      "id":1,
      "username":"小熊維尼",
      "nickname":"polar bear"
   }
}{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":false,
            "field":"username"
         },
         {
            "type":"string",
            "optional":false,
            "field":"nickname"
         }
      ],
      "optional":false,
      "name":"source_users"
   },
   "payload":{
      "id":2,
      "username":"大谷翔平",
      "nickname":"笑死"
   }
}{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":false,
            "field":"username"
         },
         {
            "type":"string",
            "optional":false,
            "field":"nickname"
         }
      ],
      "optional":false,
      "name":"source_users"
   },
   "payload":{
      "id":3,
      "username":"鄧不利多",
      "nickname":"校長"
   }
}

到這邊 source connector 已經新增、設定成功,資料也有同步到 Kafka topic了,接下來要新增 sink connector

Step 4: 新增 Sink Connector

curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{"name":"test-download-to-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/target_database",
"connection.user":"root",
"connection.password":"ifalo.net",
"topics":"test-mysql-source_users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"id",
"table.name.format": "target_users"}}'

HTTP/1.1 201 Created
Date: Mon, 30 Aug 2021 09:28:58 GMT
Location: http://127.0.0.1:8083/connectors/test-download-to-mysql
Content-Type: application/json
Content-Length: 444
Server: Jetty(9.4.39.v20210325)

{"name":"test-download-to-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://127.0.0.1:3306","connection.user":"root","connection.password":"ifalo.net","topics":"test-mysql-source_users","auto.create":"false","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"id","table.name.format":"target_database.target_users","name":"test-download-to-mysql"},"tasks":[],"type":"sink"}

參數說明:

  • name:指定新增 connector 的名稱

  • config:新增 connector 的設定資訊

    • connector.class:使用哪個 connector 類別
    • connection.url:Mysql 連接的 url
    • topics:從哪個 topic 讀取資料
    • auto.create:是否自動新建表格
    • insert.mode:寫入的模式,這邊選用 upsert
    • pk.mode:選擇主鍵模式 record_value 從訊息的 value 中取得資料
    • pk.fields:pk 欄位名稱
    • table.name.format:指定輸出到資料庫哪個表格
  • 建立完成後,一樣可以查看設定和狀態

$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/config 
{
   "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
   "table.name.format":"target_database.target_users",
   "connection.password":"資料庫密碼",
   "connection.user":"root",
   "topics":"test-mysql-source_users",
   "name":"test-download-to-mysql",
   "auto.create":"false",
   "connection.url":"jdbc:mysql://127.0.0.1:3306",
   "insert.mode":"upsert",
   "pk.mode":"record_value",
   "pk.fields":"id"
}

$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/status
{
   "name":"test-download-to-mysql",
   "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url":"jdbc:mysql://127.0.0.1:3306",
      "connection.user":"root",
      "connection.password":"資料庫密碼",
      "topics":"test-mysql-source_users",
      "auto.create":"false",
      "insert.mode":"upsert",
      "pk.mode":"record_value",
      "pk.fields":"id",
      "table.name.format":"target_database.target_users",
      "name":"test-download-to-mysql"
   },
   "tasks":[
      
   ],
   "type":"sink"
}
  • 確認狀態是運行中好,查看是否成功同步 source_database 的資料
mysql> use target_database;
Database changed
mysql> select * from target_users;
+----+--------------+------------+
| id | username     | nickname   |
+----+--------------+------------+
|  1 | 小熊維尼     | polar bear |
|  2 | 大谷翔平     | 笑死       |
|  3 | 鄧不利多     | 校長       |
+----+--------------+------------+
3 rows in set (0.00 sec)

今天的練習到此結束,如果想要將剛剛建立的 connector 刪掉,一樣是呼叫 REST API即可

$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-upload-source-mysql'
$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-download-to-mysql'

上一篇
卡夫卡的藏書閣【Book10】- Kafka Connect 1
下一篇
卡夫卡的藏書閣【Book12】- KafkaJS 安裝
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言