$ curl 'http://127.0.0.1:8083/connectors'
[]
$ curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{
"name":"test-upload-source-mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=資料庫密碼",
"table.whitelist":"source_users",
"incrementing.column.name": "id",
"mode":"incrementing",
"topic.prefix": "test-mysql-"}
}'
這邊說明 config 參數:
建立成功會出現以下訊息:
HTTP/1.1 201 Created
Date: Sun, 29 Aug 2021 10:11:24 GMT
Location: http://127.0.0.1:8083/connectors/test-upload-source-mysql
Content-Type: application/json
Content-Length: 377
Server: Jetty(9.4.39.v20210325)
{
"name":"test-upload-source-mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=資料庫密碼",
"table.whitelist":"source_users",
"incrementing.column.name":"id",
"mode":"incrementing",
"topic.prefix":"test-mysql-",
"name":"test-upload-source-mysql"
},
"tasks":[],
"type":"source"
}
$ curl http://127.0.0.1:8083/connectors
["test-upload-source-mysql"]
$ curl http://127.0.0.1:8083/connectors/test-upload-source-mysql/status
{
"name":"test-upload-source-mysql",
"connector":{
"state":"RUNNING",
"worker_id":"192.168.133.118:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"192.168.133.118:8083"
}
],
"type":"sink"
}
這邊整理 REST API
| Method | REST API | 說明 |
| —————— | ——————- | ———- |
| GET | /connectors | 取得所有正在運作中的 connector |
| POST | /connectors | 新增一個 connector |
| GET | /connectors/{name} | 取得指定 connector 的資訊 |
| GET | /connectors/{name}/config | 取得指定 connector 的設定資訊 |
| PUT | /connectors/{name}/config | 修改指定 connector 的設定資訊 |
| GET | /connectors/{name}/status | 取得指定 connector 的運行狀態(運行中、停止、失敗),如果有發生錯誤,也會顯示具體的錯誤資訊 |
| GET | /connectors/{name}/tasks | 取得指定 connector 運行中的 task |
| GET | /connectors/{name}/tasks/{tasksId}/status | 取得指定 connector 指令的 task 狀態 |
| PUT | /connectors/{name}/pause | 暫停指定的 connector 和它的 task |
| PUT | /connectors/{name}/resume | 恢復一個暫停中的 connector |
| POST | /connectors/{name}/restart | 重新啟動一個 connector |
| POST | /connectors/{name}/tasks/{taskID}/restart | 重新啟動一個 task |
| DELETE | /connectors/{name} | 刪除一個 connector,停止它的所有 task 並且刪除相關 config |
正常運作的 connect 會將 mysql 改變的資料送給 topic test-mysql-sink_users,可以用 consumer 去看資料內容
在資料庫新增幾筆資料
INSERT INTO source_users(`username`, `nickname`) VALUES('小熊維尼', 'polar bear');
INSERT INTO source_users(`username`, `nickname`) VALUES('大谷翔平', '笑死');
INSERT INTO source_users(`username`, `nickname`) VALUES('鄧不利多', '校長');
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-mysql-source_users --from-beginning
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"username"
},
{
"type":"string",
"optional":false,
"field":"nickname"
}
],
"optional":false,
"name":"source_users"
},
"payload":{
"id":1,
"username":"小熊維尼",
"nickname":"polar bear"
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"username"
},
{
"type":"string",
"optional":false,
"field":"nickname"
}
],
"optional":false,
"name":"source_users"
},
"payload":{
"id":2,
"username":"大谷翔平",
"nickname":"笑死"
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"username"
},
{
"type":"string",
"optional":false,
"field":"nickname"
}
],
"optional":false,
"name":"source_users"
},
"payload":{
"id":3,
"username":"鄧不利多",
"nickname":"校長"
}
}
到這邊 source connector 已經新增、設定成功,資料也有同步到 Kafka topic了,接下來要新增 sink connector
curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{"name":"test-download-to-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/target_database",
"connection.user":"root",
"connection.password":"ifalo.net",
"topics":"test-mysql-source_users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"id",
"table.name.format": "target_users"}}'
HTTP/1.1 201 Created
Date: Mon, 30 Aug 2021 09:28:58 GMT
Location: http://127.0.0.1:8083/connectors/test-download-to-mysql
Content-Type: application/json
Content-Length: 444
Server: Jetty(9.4.39.v20210325)
{"name":"test-download-to-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://127.0.0.1:3306","connection.user":"root","connection.password":"ifalo.net","topics":"test-mysql-source_users","auto.create":"false","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"id","table.name.format":"target_database.target_users","name":"test-download-to-mysql"},"tasks":[],"type":"sink"}
參數說明:
name:指定新增 connector 的名稱
config:新增 connector 的設定資訊
建立完成後,一樣可以查看設定和狀態
$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/config
{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"table.name.format":"target_database.target_users",
"connection.password":"資料庫密碼",
"connection.user":"root",
"topics":"test-mysql-source_users",
"name":"test-download-to-mysql",
"auto.create":"false",
"connection.url":"jdbc:mysql://127.0.0.1:3306",
"insert.mode":"upsert",
"pk.mode":"record_value",
"pk.fields":"id"
}
$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/status
{
"name":"test-download-to-mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306",
"connection.user":"root",
"connection.password":"資料庫密碼",
"topics":"test-mysql-source_users",
"auto.create":"false",
"insert.mode":"upsert",
"pk.mode":"record_value",
"pk.fields":"id",
"table.name.format":"target_database.target_users",
"name":"test-download-to-mysql"
},
"tasks":[
],
"type":"sink"
}
mysql> use target_database;
Database changed
mysql> select * from target_users;
+----+--------------+------------+
| id | username | nickname |
+----+--------------+------------+
| 1 | 小熊維尼 | polar bear |
| 2 | 大谷翔平 | 笑死 |
| 3 | 鄧不利多 | 校長 |
+----+--------------+------------+
3 rows in set (0.00 sec)
今天的練習到此結束,如果想要將剛剛建立的 connector 刪掉,一樣是呼叫 REST API即可
$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-upload-source-mysql'
$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-download-to-mysql'