Ohara 官方有提供了一些 Source Connector,像是 JDBC Source Connector、Ftp Source Connector、Perf Source Connector…等等的 Connector,如果使用者另外有其它需求也可以自已來寫程式實作 Ohara 的 Connector 介面,這樣的好處是寫完 connector 的程式並且打包成 jar 檔之後,就可以透過 ohara manager 的 UI 介面上傳到 ohara 的 Worker Cluster, 之後就可以透過 UI 的方式拉出自已實作的 Connector 程式並且執行 Pipeline。
如果是實作 Kafka 提供的 connector 介面,要自已手動做非常多的事,像是把 jar 檔佈署到 worker cluster 裡、設定 plugin dir 參數、使用 Kafka Worker 提供的 Restful API 來啟動自已實作的 connector…等等的工作。Ohara 的目標就是要讓開發者能更關注在connector 程式,處理資料邏輯的部份。
今天主要會介紹要如何實作 Ohara 提供的 Connector 介面,來實作亂數產生數字的 Source Connector,這個 Connector 名稱我就把它命名成 RandomNumberSourceConnecotr。
在實作程式之前需要設定 build.gradle 的檔案,如下:
apply plugin: 'scala'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven {
url "https://dl.bintray.com/oharastream/ohara"
}
}
tasks.withType(JavaCompile){
options.encoding = 'UTF-8'
}
tasks.withType(Javadoc){
options.encoding = 'UTF-8'
}
dependencies {
compile 'org.scala-lang:scala-library:2.12.9'
compile 'junit:junit:4.12'
compile 'com.island.ohara:ohara-client:0.7.1'
compile 'com.island.ohara:ohara-common:0.7.1'
compile 'com.island.ohara:ohara-kafka:0.7.1'
}
以上的 build.gradle 檔案,主要會匯入要實作 ohara connector 介面的 jar 檔
RandomNumberSourceConnecotr.scala
class RandomNumberSourceConnecotr extends RowSourceConnector {
private[perf] var settings: TaskSetting = _
override def _taskClass(): Class[_ <: RowSourceTask] = classOf[RandomNumberTask]
override def _taskSettings(maxTasks: Int): util.List[TaskSetting] = Seq.fill(maxTasks)(settings).asJava
override def _start(config: TaskSetting): Unit = {
this.settings = settings
}
override def _stop(): Unit = {
//Nothing
}
override protected def _definitions(): java.util.List[SettingDef] = Seq().asJava
}
以上的程式碼主要是 Connector 程式碼的進入點,它會先繼承 RowSourceConnector Ohara 定義的 Connector 介面,背後會去包裝 Kafka 的 Connector 介面, 之後實作 _taskClass、_taskSettings、_start、_definitions 方法,在這裡值得一提的是 _definitions 方法,它的做用主要是定義了有哪些設定的參數,並且設定給 connector 使用,如果有定義 _definitions 這樣 UI 才能自動的載入,實作 Connector 的設定參數畫面。
RandomNumberTask.scala
class RandomNumberTask extends RowSourceTask {
private[perf] var schema: Seq[Column] = _
private[this] var topics: Seq[String] = _
override def _start(settings: TaskSetting): Unit = {
this.topics = settings.topicNames().asScala
this.schema = settings.columns.asScala
}
override def _stop(): Unit = {
}
override def _poll(): util.List[RowSourceRecord] = {
Thread.sleep(5000)
val value = CommonUtils.randomString()
val row: Row = Row.of(
schema.sortBy(_.order).map { c =>
Cell.of(
c.name,
c.dataType match {
case DataType.BOOLEAN => false
case DataType.BYTE => ByteUtils.toBytes(value).head
case DataType.BYTES => ByteUtils.toBytes(value)
case DataType.SHORT => value.toShort
case DataType.INT => value.toInt
case DataType.LONG => value
case DataType.FLOAT => value.toFloat
case DataType.DOUBLE => value.toDouble
case DataType.STRING => value.toString
case _ => value
}
)
}: _*
)
val records: Seq[RowSourceRecord] = topics.map(RowSourceRecord.builder().row(row).topicName(_).build())
records.toList.asJava
}
}
以上的程式主要是定義在執行 connector 下的 task,在這裡的程式主要會定期的將亂數資料寫入到 Kafka Topic,另外 Ohara 也定義 Row 來當作回傳資料的格式類型,Row 下面定義了 DataType,因此 Ohara 的 Connector 程式資料有 Metadata 的概念。
今天已經簡單的介紹要如何撰寫 Ohara 的 source connector,在這裡只是簡單的情境,在實際的情況還要考慮很多東西,像是資料 offset 的管理、task reblance 時要做的處理…等等的議題,如果有興趣也可以參考以下的連結,來了解 connector 的實作。
https://ohara.readthedocs.io/en/latest/custom_connector.html
今天寫的程式還沒有做測試,因此明天還會繼續介紹有關於 connector 在 Ohara 測試的部份。
以下是今天實作的 Source Connector 連結:
https://github.com/jackyoh/ohara-connector-demo
以上的文章經過了 Day 20 的測試程式,發現以上的程式是有 bug。正確的程式碼已經在以下的連結修正了:
https://github.com/jackyoh/ohara-connector-demo