昨天已經將 JDBC Sink Connector 的測試程式撰寫完成了,今天主要會把 JDBC Sink Connector 打包成 Jar 檔案,然後使用 Ohara Manager 的 UI 把 RandomNumberSourceConnector 和 JDBCSinkConnector 的 jar 檔,加入到 Workspace 的 Plugin 內。之後使用 Pipeline 建立 RandNumberSourceConnector 把資料寫入到 Topic 內,然後再透過 JDBCSinkConnector 把 Topic 的資料寫入到資料庫內。
今天的實作使用 ohara 的版本為 0.8.0-SNAPSHOT。資料庫使用 MySQL 因此要準備 MySQL 的 JDBC jar 檔。
在實作之前需要安裝 MySQL 使用的作業系統為 CentOS,指令如下:
# wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm
# rpm -ivh mysql-community-release-el7-5.noarch.rpm
# yum install -y mysql-server
# systemctl start mysqld
# systemctl enable mysqld
修改能使用遠端的方式登入 MySQL database
# mysql -u root -p
mysql> create database database1;
mysql> create user 'user1'@'%' identified by '123456';
mysql> grant all privileges on *.* to 'user1'@'%';
mysql> flush privileges;
以上的指令主要是建立資料庫和使用者並且開權限給 user1 的使用者,這些設定主要目的是要讓 JDBC Sink Connector 能順利的連線到資料庫裡,以下是建立資料表的指令:
mysql> create table table1(column1 varchar(45), column2 varchar(45), column3 varchar(45));
建立三個欄位主要的目的是 RandNumberSourceConnector 程式,預設會建立三個欄位的亂數資料,然後 JDBC Sink Connector 沒有處理有關於 schema 的部份,就直接使用 RandNumberSourceConnector 預設欄位的名稱和欄位的數量。
當準備好寫入到目的端的資料庫之後,就可以啟動 Ohara Configurator 和 Ohara Manager 的 Container 服務,指令如下:
$ docker run --rm -p 12345:12345 -d oharastream/configurator:0.8.0-SNAPSHOT build/libs --hostname 192.168.56.104 --port 12345
$ docker run --rm -d -p 5050:5050 oharastream/manager:0.8.0-SNAPSHOT --port 5050 --configurator http://192.168.56.104:12345/v0
啟動了 Ohara Configurator 和 Ohara Manager 的服務之後,我們還要準備 Build 我們撰寫好的 RandNumberSourceConnector 和 JDBCSinkConnector Jar 檔,使用以下的指令:
$ gradle clean build -x test
Sample 的程式放在以下的連結裡:
https://github.com/jackyoh/ohara-connector-demo
也可以直接使用 git 的指令把程式碼 clone 下來並且 build 成 jar 檔。build 完的 jar 檔會放在 build/libs 資料夾下面的 ohara-connector-demo.jar
以下就可以使用 ohara manager 的 UI 部署 connector plugin 以及執行 pipeline 的部份, 步驟如下:
1.使用 Ohara Manager 建立 Node,建立完成的畫面如下:
2.建立 Workspace 並且把 ohara-connector-demo.jar 以及 mysql 的 jar 檔,上傳到 plugin 裡,畫面如下:
因為 MySQL 的 JDBC 的 jar 檔有 License 的問題,所以不會包進 Ohara 裡,所以要另外準備上傳到 Plugin 上,上傳完 Plugin 之後的畫面如下:
在 Workspace 的畫面上建立 Topic,topic 的名稱為 topic1
建立 Pipeline。Source 的部份選擇 idv.jack.ohara.RandomNumberSourceConnector,Sink 的部份選擇 idv.jack.ohara.JDBCSinkConnector,topic 選擇 topic1,建立完成的畫面如下:
從畫面上可以看到,我們在實作 JDBC Sink Connector 定義的 definition 方法的參數,會被載入到 JDBC Sink Connector 的畫面上,讓使用者的設定更簡單和方便。
Pipeline 執行的畫面如下:
執行的結果如下:
mysql> select * from table1;
+-------------+-------------+-------------+
| column1 | column2 | column3 |
+-------------+-------------+-------------+
| 304798398 | 304798398 | 304798398 |
| 1317316683 | 1317316683 | 1317316683 |
| -228679295 | -228679295 | -228679295 |
| 1186593524 | 1186593524 | 1186593524 |
| 1909877485 | 1909877485 | 1909877485 |
| -1363348393 | -1363348393 | -1363348393 |
| 463501600 | 463501600 | 463501600 |
| -434675897 | -434675897 | -434675897 |
| -505261521 | -505261521 | -505261521 |
| -1288489684 | -1288489684 | -1288489684 |
| -1930790977 | -1930790977 | -1930790977 |
| 2105369567 | 2105369567 | 2105369567 |
| 1004822954 | 1004822954 | 1004822954 |
| -1965264559 | -1965264559 | -1965264559 |
遇到的問題:
在使用 8.0.17 的 jdbc driver 會收到以下的 exception:
java.sql.SQLException: The server time zone value 'EDT' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:76)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:827)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:447)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:237)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:199
解決的方法,只要在 db 的 connection string 加上 ?serverTimezone=UTC 就可以解決,如下:
jdbc:mysql://10.100.0.152:3306/database1?serverTimezone=UTC
今天總算已經將之前實作的 Source 和 Sink Connector 串起來了,並且也有顯示資料的結果了。明天會介紹有關於 Ohara 測試的部份。