昨天已經把 JDBC Sink Connector 程式撰寫完成了,今天主要是寫測試程式的部份。測試的方法主要會先啟動 Zookeeper、Broker 和 Worker 的 MiniCluster,然後啟動 RandomNumberSourceConnector 自動產生亂數的資料寫入到 topic 裡,之後再透過 JDBCSinkConnector 把 topic 的資料寫入到 Database 裡。
測試使用的 Database 是使用 Embedded 的 database,在測試程式啟動的時侯才會把資料庫啟動起來,這樣的好處是不用連到外部的資料庫,單元測試程式連到外部的資料庫有以下的壞處:
以下是 JDBCSinkConnector 的測試程式:
class TestJDBCSinkConnector extends Matchers {
@Test
def test(): Unit = {
val db = Database.local()
val client = DatabaseClient.builder.url(db.url()).user(db.user()).password(db.password()).build
val tableName = "table1"
val column1 = RdbColumn("a", "VARCHAR(45)", true)
val column2 = RdbColumn("b", "VARCHAR(45)", false)
client.createTable(tableName, Seq(column1, column2))
val brokerNumber = 1
val workerNumber = 1
val zookeeper = Zookeepers.local(0)
val brokers = Brokers.local(zookeeper, (1 to brokerNumber).map(x => 0).toArray)
val workers = Workers.local(brokers, (1 to workerNumber).map(x => 0).toArray)
val workerClient = WorkerClient(workers.connectionProps())
val topicKey = TopicKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
val connectorKey1 = ConnectorKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
val connectorKey2 = ConnectorKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
val schema = Seq(Column.builder().name("a").dataType(DataType.STRING).order(1).build(),
Column.builder().name("b").dataType(DataType.STRING).order(1).build())
//Start RandomNumberSourceConnector
Await.result(workerClient
.connectorCreator()
.topicKey(topicKey)
.connectorClass(classOf[RandomNumberSourceConnecotr])
.numberOfTasks(1)
.connectorKey(connectorKey1)
.settings(Map())
.columns(schema)
.create(), 10 seconds)
//Start JDBCSinkConnector
Await.result(workerClient
.connectorCreator()
.topicKey(topicKey)
.connectorClass(classOf[JDBCSinkConnector])
.numberOfTasks(1)
.connectorKey(connectorKey2)
.settings(Map("db.tablename" -> tableName,
"db.url" -> db.url,
"db.username" -> db.user,
"db.password" -> db.password
))
.columns(schema)
.create(), 10 seconds)
val consumer =
Consumer
.builder[Row, Array[Byte]]()
.topicName(topicKey.topicNameOnKafka)
.offsetFromBegin()
.connectionProps(brokers.connectionProps)
.keySerializer(Serializer.ROW)
.valueSerializer(Serializer.BYTES)
.build()
val record = consumer.poll(java.time.Duration.ofSeconds(30), 3).asScala
record.size >= 3 shouldBe true
//Check database data
val connection = client.connection
val statement = connection.createStatement()
val resultSet = statement.executeQuery(s"SELECT * FROM $tableName")
var count = 0
while(resultSet.next()) {
count = count + 1
}
count >= 3 shouldBe true
workers.close()
brokers.close()
zookeeper.close()
statement.close()
connection.close()
client.close()
}
}
以上的測試程式在執行到測試的方法時,會建立 Embedded MySQL 的 Database,因此在 build.gradle 需要加上 mysql jdbc 的 library。建立完資料庫之後會建立資料表,在這裡因為沒有處理 schema 的部份,所以 Source Connector 寫入到 topic 資料的欄位名稱要和資料表的欄位名稱相同,這樣 Source Connector 才能成功將資料寫入到 Database 裡。
建立完資料表之後就會啟動 Zookeeper、Broker 和 Worker 的 MiniCluster,並且會啟動RandomNumberSourceConnecotr 以及 JDBCSinkConnector 二個 Connector,一個 Connector 負責產生資料到 topic 另外一個 Connector 負責把資料寫入到資料庫的資料表裡。之後再連進資料表裡面去確認資料是否有成功寫入進去。
這個測試程式只是一個簡單的 Sample,程式還可以修改的更好,像是啟動 MiniCluster 可以放在 Before Class 裡,這樣就可以有多個 test case 的方法,不用重新啟動 MiniCluster。另外可以修改的地方是有關於 close 的部份,應該要用 try .... finally.... 的方式撰寫會比較好。
今天已經介紹了要如何撰寫 JDBC Sink Connector 測試程式的部份了,明天會介紹要如何的使用 Ohara manager 提供的 WebUI,把 JDBC Sink Connector 部署到 Worker Cluster 上,並且執行 Pipeline 來確認資料是否有寫入到資料庫上。