iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 20
0
Software Development

用30天介紹 open source 專案 Ohara 系列 第 20

Day 20 撰寫 Source Connector 測試程式

  • 分享至 

  • xImage
  •  

延續昨天介紹實作 Ohara 提供的 Source Connector 介面的部份,今天主要會使用 Ohara 測試程式提供的 Worker MiniCluster 去測試 RandomNumberSourceConnector 的 connector 程式執行的結果是否正確。

使用 Worker MiniCluster 測試程式的主要好處是,在撰寫完 connector 程式之後就可以直接測試 connector 程式在 task 分散執行時,邏輯或是資料的結果是否正確。如果沒有 Worker MiniCluster 的話只能使用 Mock 的方式去模擬單一 task 的邏輯是否正確,測不到當 task 是分散時或是另外在建立一個 connector 時,原本的 connector 的資料是否正確,這要在 connector 部署到真實的測試環境上才會發現到問題,但是這會提高 Debug 的難度。因此建議要使用 Worker 的 MiniCluster 來撰寫測試程式,才會在開發階段就會發現 connector 程式的問題,像是 offset 的控制、資料的邏輯…等等的問題。

以下就是 RandomNumberSourcConnector 的測試程式:
TestRandomNumberSourceConnector.scala

import com.island.ohara.client.kafka.WorkerClient
import com.island.ohara.common.data.{Column, DataType, Row, Serializer}
import com.island.ohara.common.setting.{ConnectorKey, TopicKey}
import com.island.ohara.common.util.{CommonUtils, Releasable}
import com.island.ohara.kafka.Consumer
import com.island.ohara.testing.service.{Brokers, Workers, Zookeepers}
import org.junit.Test
import org.scalatest.Matchers

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.Await
class TestRandomNumberSourceConnector extends Matchers{
  @Test
  def test(): Unit = {
    val brokerNumber = 1
    val workerNumber = 1
    val zookeeper = Zookeepers.local(0)
    try {
      val brokers = Brokers.local(zookeeper, (1 to brokerNumber).map(x => 0).toArray)
      try {
        val workers = Workers.local(brokers, (1 to workerNumber).map(x => 0).toArray)
        try {
          val workerClient = WorkerClient(workers.connectionProps())
          val topicKey = TopicKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
          val connectorKey = ConnectorKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
          val schema = Seq(Column.builder().name("a").dataType(DataType.STRING).order(1).build())

          Await.result(workerClient
            .connectorCreator()
            .topicKey(topicKey)
            .connectorClass(classOf[RandomNumberSourceConnecotr])
            .numberOfTasks(1)
            .connectorKey(connectorKey)
            .settings(Map())
            .columns(schema)
            .create(), 10 seconds)

          val consumer =
            Consumer
              .builder[Row, Array[Byte]]()
              .topicName(topicKey.topicNameOnKafka)
              .offsetFromBegin()
              .connectionProps(brokers.connectionProps)
              .keySerializer(Serializer.ROW)
              .valueSerializer(Serializer.BYTES)
              .build()


          val record = consumer.poll(java.time.Duration.ofSeconds(30), 3).asScala
          record.size >= 3 shouldBe true
          println(s"Record size is: ${record.size}")
          println(s"Value is: ${record.head.key().get.cell(0).value()}")
        } finally Releasable.close(workers)
      } finally Releasable.close(brokers)
    } finally Releasable.close(zookeeper)
  }
}

測試程式主要是會啟動 Zookeeper、Broker 和 connector 的 MiniCluster 服務,然後使用 WorkerClient 連到 MiniCluster 的 Worker 上,之後就會開始執行 RandomNumberSourceConnecotr 的 connector 測試程式,產生資料,然後測試程式會啟動 consumer 在 30 秒之內要等待 3 筆以上的資料,之後就執行測試程式斷言的比較了。

Ohara 建立了以上的 MiniCluster 可以幫助開發者有效執行 connector 測試的部份,在目前的 Kakfa library 是沒有的功能,這也是使用 Ohara 的好處之一。

寫了測試之後才發現 Day 19 撰寫的程式碼是有問題的,因此修正後的程式碼放在,以下的連結裡:
https://github.com/jackyoh/ohara-connector-demo

主要修改了以下的問題:

  • Build.gradle 有些要匯入的 jar 檔 library 忘了匯入

  • RandomNumberSourceConnecotr 的 _start 方法,把 settings 的變數設定錯誤了,這樣在執行 connector 的 task 會出現 NullPointException

  • 在 RandomNumberTask 的 poll 方法,會每隔 5 秒去產生資料,如果使用 Thread.sleep 在測試執行完成之後 task 還在 sleep 就會收到 exit 1 的錯誤訊息

明天還會繼續介紹要如何的把寫完的 connector 程式,使用 ohara manager 的 UI 進行佈署的部份。


上一篇
Day 19 實作 Ohara 的 Source Connector 介面
下一篇
Day 21 部署自訂義的 Source Connector
系列文
用30天介紹 open source 專案 Ohara 30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言