延續昨天介紹實作 Ohara 提供的 Source Connector 介面的部份,今天主要會使用 Ohara 測試程式提供的 Worker MiniCluster 去測試 RandomNumberSourceConnector 的 connector 程式執行的結果是否正確。
使用 Worker MiniCluster 測試程式的主要好處是,在撰寫完 connector 程式之後就可以直接測試 connector 程式在 task 分散執行時,邏輯或是資料的結果是否正確。如果沒有 Worker MiniCluster 的話只能使用 Mock 的方式去模擬單一 task 的邏輯是否正確,測不到當 task 是分散時或是另外在建立一個 connector 時,原本的 connector 的資料是否正確,這要在 connector 部署到真實的測試環境上才會發現到問題,但是這會提高 Debug 的難度。因此建議要使用 Worker 的 MiniCluster 來撰寫測試程式,才會在開發階段就會發現 connector 程式的問題,像是 offset 的控制、資料的邏輯…等等的問題。
以下就是 RandomNumberSourcConnector 的測試程式:
TestRandomNumberSourceConnector.scala
import com.island.ohara.client.kafka.WorkerClient
import com.island.ohara.common.data.{Column, DataType, Row, Serializer}
import com.island.ohara.common.setting.{ConnectorKey, TopicKey}
import com.island.ohara.common.util.{CommonUtils, Releasable}
import com.island.ohara.kafka.Consumer
import com.island.ohara.testing.service.{Brokers, Workers, Zookeepers}
import org.junit.Test
import org.scalatest.Matchers
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.Await
class TestRandomNumberSourceConnector extends Matchers{
@Test
def test(): Unit = {
val brokerNumber = 1
val workerNumber = 1
val zookeeper = Zookeepers.local(0)
try {
val brokers = Brokers.local(zookeeper, (1 to brokerNumber).map(x => 0).toArray)
try {
val workers = Workers.local(brokers, (1 to workerNumber).map(x => 0).toArray)
try {
val workerClient = WorkerClient(workers.connectionProps())
val topicKey = TopicKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
val connectorKey = ConnectorKey.of(CommonUtils.randomString(5), CommonUtils.randomString(5))
val schema = Seq(Column.builder().name("a").dataType(DataType.STRING).order(1).build())
Await.result(workerClient
.connectorCreator()
.topicKey(topicKey)
.connectorClass(classOf[RandomNumberSourceConnecotr])
.numberOfTasks(1)
.connectorKey(connectorKey)
.settings(Map())
.columns(schema)
.create(), 10 seconds)
val consumer =
Consumer
.builder[Row, Array[Byte]]()
.topicName(topicKey.topicNameOnKafka)
.offsetFromBegin()
.connectionProps(brokers.connectionProps)
.keySerializer(Serializer.ROW)
.valueSerializer(Serializer.BYTES)
.build()
val record = consumer.poll(java.time.Duration.ofSeconds(30), 3).asScala
record.size >= 3 shouldBe true
println(s"Record size is: ${record.size}")
println(s"Value is: ${record.head.key().get.cell(0).value()}")
} finally Releasable.close(workers)
} finally Releasable.close(brokers)
} finally Releasable.close(zookeeper)
}
}
測試程式主要是會啟動 Zookeeper、Broker 和 connector 的 MiniCluster 服務,然後使用 WorkerClient 連到 MiniCluster 的 Worker 上,之後就會開始執行 RandomNumberSourceConnecotr 的 connector 測試程式,產生資料,然後測試程式會啟動 consumer 在 30 秒之內要等待 3 筆以上的資料,之後就執行測試程式斷言的比較了。
Ohara 建立了以上的 MiniCluster 可以幫助開發者有效執行 connector 測試的部份,在目前的 Kakfa library 是沒有的功能,這也是使用 Ohara 的好處之一。
寫了測試之後才發現 Day 19 撰寫的程式碼是有問題的,因此修正後的程式碼放在,以下的連結裡:
https://github.com/jackyoh/ohara-connector-demo
主要修改了以下的問題:
Build.gradle 有些要匯入的 jar 檔 library 忘了匯入
RandomNumberSourceConnecotr 的 _start 方法,把 settings 的變數設定錯誤了,這樣在執行 connector 的 task 會出現 NullPointException
在 RandomNumberTask 的 poll 方法,會每隔 5 秒去產生資料,如果使用 Thread.sleep 在測試執行完成之後 task 還在 sleep 就會收到 exit 1 的錯誤訊息
明天還會繼續介紹要如何的把寫完的 connector 程式,使用 ohara manager 的 UI 進行佈署的部份。