iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 23
0
Software Development

用30天介紹 open source 專案 Ohara 系列 第 23

Day 23 撰寫 Sink Connector 測試程式

昨天已經把 JDBC Sink Connector 程式撰寫完成了,今天主要是寫測試程式的部份。測試的方法主要會先啟動 Zookeeper、Broker 和 Worker 的 MiniCluster,然後啟動 RandomNumberSourceConnector 自動產生亂數的資料寫入到 topic 裡,之後再透過 JDBCSinkConnector 把 topic 的資料寫入到 Database 裡。

測試使用的 Database 是使用 Embedded 的 database,在測試程式啟動的時侯才會把資料庫啟動起來,這樣的好處是不用連到外部的資料庫,單元測試程式連到外部的資料庫有以下的壞處:

  • 在執行測試前和測試後需要做清資料的動作
  • 如果不能連到資料庫上,就不能測試
  • 開發者或是 QA Server 頻繁的連到資料庫上,會因為資料庫連線負擔變大,測試會變很慢而且測試有可能 timeout exception

以下是 JDBCSinkConnector 的測試程式:

class TestJDBCSinkConnector extends Matchers {

  @Test
  def test(): Unit = {
    val db = Database.local()
    val client = DatabaseClient.builder.url(db.url()).user(db.user()).password(db.password()).build
    val tableName = "table1"
    val column1 = RdbColumn("a", "VARCHAR(45)", true)
    val column2 = RdbColumn("b", "VARCHAR(45)", false)
    client.createTable(tableName, Seq(column1, column2))

    val brokerNumber = 1
    val workerNumber = 1
    val zookeeper = Zookeepers.local(0)
    val brokers = Brokers.local(zookeeper, (1 to brokerNumber).map(x => 0).toArray)
    val workers = Workers.local(brokers, (1 to workerNumber).map(x => 0).toArray)

    val workerClient = WorkerClient(workers.connectionProps())
    val topicKey = TopicKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
    val connectorKey1 = ConnectorKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
    val connectorKey2 = ConnectorKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))

    val schema = Seq(Column.builder().name("a").dataType(DataType.STRING).order(1).build(),
                     Column.builder().name("b").dataType(DataType.STRING).order(1).build())

    //Start RandomNumberSourceConnector
    Await.result(workerClient
      .connectorCreator()
      .topicKey(topicKey)
      .connectorClass(classOf[RandomNumberSourceConnecotr])
      .numberOfTasks(1)
      .connectorKey(connectorKey1)
      .settings(Map())
      .columns(schema)
      .create(), 10 seconds)

    //Start JDBCSinkConnector
    Await.result(workerClient
      .connectorCreator()
      .topicKey(topicKey)
      .connectorClass(classOf[JDBCSinkConnector])
      .numberOfTasks(1)
      .connectorKey(connectorKey2)
      .settings(Map("db.tablename" -> tableName,
                    "db.url" -> db.url,
                    "db.username" -> db.user,
                    "db.password" -> db.password
      ))
      .columns(schema)
      .create(), 10 seconds)

    val consumer =
      Consumer
        .builder[Row, Array[Byte]]()
        .topicName(topicKey.topicNameOnKafka)
        .offsetFromBegin()
        .connectionProps(brokers.connectionProps)
        .keySerializer(Serializer.ROW)
        .valueSerializer(Serializer.BYTES)
        .build()


    val record = consumer.poll(java.time.Duration.ofSeconds(30), 3).asScala
    record.size >= 3 shouldBe true

    //Check database data
    val connection = client.connection
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(s"SELECT * FROM $tableName")
    var count = 0
    while(resultSet.next()) {
      count = count + 1
    }
    count >= 3 shouldBe true
    workers.close()
    brokers.close()
    zookeeper.close()
    statement.close()
    connection.close()
    client.close()
  }
}

以上的測試程式在執行到測試的方法時,會建立 Embedded MySQL 的 Database,因此在 build.gradle 需要加上 mysql jdbc 的 library。建立完資料庫之後會建立資料表,在這裡因為沒有處理 schema 的部份,所以 Source Connector 寫入到 topic 資料的欄位名稱要和資料表的欄位名稱相同,這樣 Source Connector 才能成功將資料寫入到 Database 裡。

建立完資料表之後就會啟動 Zookeeper、Broker 和 Worker 的 MiniCluster,並且會啟動RandomNumberSourceConnecotr 以及 JDBCSinkConnector 二個 Connector,一個 Connector 負責產生資料到 topic 另外一個 Connector 負責把資料寫入到資料庫的資料表裡。之後再連進資料表裡面去確認資料是否有成功寫入進去。

這個測試程式只是一個簡單的 Sample,程式還可以修改的更好,像是啟動 MiniCluster 可以放在 Before Class 裡,這樣就可以有多個 test case 的方法,不用重新啟動 MiniCluster。另外可以修改的地方是有關於 close 的部份,應該要用 try .... finally.... 的方式撰寫會比較好。

今天已經介紹了要如何撰寫 JDBC Sink Connector 測試程式的部份了,明天會介紹要如何的使用 Ohara manager 提供的 WebUI,把 JDBC Sink Connector 部署到 Worker Cluster 上,並且執行 Pipeline 來確認資料是否有寫入到資料庫上。


上一篇
Day 22 實作 Ohara 的 Sink Connector 介面
下一篇
Day 24 部署自定義的 Sink Connector
系列文
用30天介紹 open source 專案 Ohara 30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言