iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 22
0
Software Development

用30天介紹 open source 專案 Ohara 系列 第 22

Day 22 實作 Ohara 的 Sink Connector 介面

  • 分享至 

  • xImage
  •  

前三天已經介紹了要如何撰寫、測試和部署 Source Connector 的部份了,今天要介紹如何使用 Ohara 提供的 Sink Connector 介面,實作 JDBC Sink Connector 的部份。實作 JDBC Sink Connector 主要的目的是把收到 Topic 的資料,寫入到 RDB 的資料庫裡。以下是實作 JDBC Sink Connector 的程式碼。

JDBCSinkConnector.scala

class JDBCSinkConnector extends RowSinkConnector {
  private[this] var config: TaskSetting = _

  override protected def _start(config: TaskSetting): Unit = {
    this.config = config
  }

  override protected def _stop(): Unit = {
    // do nothing
  }

  override protected def _taskClass(): Class[_ <: RowSinkTask] = classOf[JDBCSinkTask]

  override protected def _taskSettings(maxTasks: Int): util.List[TaskSetting] = Seq.fill(maxTasks)(config).asJava

  override protected def _version: ConnectorVersion = ConnectorVersion.DEFAULT

  override protected def _definitions(): util.List[SettingDef] = Seq(
    SettingDef
      .builder()
      .displayName("jdbc url")
      .key("db.url")
      .documentation("connection to database url")
      .valueType(SettingDef.Type.STRING)
      .build(),
    SettingDef
      .builder()
      .displayName("jdbc user name")
      .key("db.username")
      .documentation("connection to database username")
      .valueType(SettingDef.Type.STRING)
      .build(),
    SettingDef
      .builder()
      .displayName("jdbc password")
      .key("db.password")
      .documentation("connection to database password")
      .valueType(SettingDef.Type.PASSWORD)
      .build(),
    SettingDef
      .builder()
      .displayName("table name")
      .key("db.tablename")
      .documentation("insert to table")
      .valueType(SettingDef.Type.STRING)
      .build()
  ).asJava
}

以上的程式碼主要是 JDBC Sink Connector 的部份,它和 RandNumber Source Connector 的程式寫法是類似的,差別在於繼承了 RowSinkConnector 並且也有實作了 _definitions 的方法,實作此方法的目的是可以把設定 JDBC 連線的資訊,載入到 Ohara manager UI 的畫面上,給使用者輸入 JDBC 連線設定。

JDBCSinkTask.scala

class JDBCSinkTask extends RowSinkTask {
  private[this] var connection: Connection = _
  private[this] var statement: Statement = _
  private[this] var tableName: String = _

  override def _start(config: TaskSetting): Unit = {
    val dbURL: String = config.stringValue("db.url")
    val dbUserName: String = config.stringValue("db.username")
    val dbPassword = config.stringValue("db.password")
    tableName = config.stringValue("db.tablename")
    connection = DriverManager.getConnection(dbURL, dbUserName, dbPassword)
    statement = connection.createStatement()
  }

  override def _stop(): Unit = {
    if (statement != null) statement.close()
    if (connection != null) connection.close()
  }

  override def _put(records: util.List[RowSinkRecord]): Unit = {
    var sqlPrefix = s"INSERT INTO ${tableName}"
    records.forEach(x => {
      val row: Row = x.row()
      val columns: String = row.cells().asScala.map(x => x.name).mkString("(", ",", ")")
      val values: String = row.cells().asScala.map(x => s"'${x.value}'").mkString("(", ",", ")")
      val sql = s"$sqlPrefix $columns VALUES $values"
      statement.executeUpdate(sql)
    })
  }
}

實作 SinkConnector 另外還需要去實作 RowSinkTask 的介面,主要會實作 _start、_stop和 _put 方法,_start 方法用來建立 JDBC 的 Connection 連線,_put 用來把收到 topic 的資料寫入到資料庫上,之後 _close 就會把 JDBC 的 Connection 關閉掉。

今天已經把 JDBC Sink Connector 程式撰寫完成了,但是還沒有寫測試程式,所以有可能程式會是錯誤的。因此明天會繼續再介紹要如何撰寫 JDBC Sink Connector 測試程式的部份,確保程式能夠正確的執行。

完整的程式碼可以參考以下的連結:
https://github.com/jackyoh/ohara-connector-demo

如果對客製化 connector 的實作有興趣的話,可以參考 Ohara 的文件,連結如下:
https://ohara.readthedocs.io/en/latest/custom_connector.html


上一篇
Day 21 部署自訂義的 Source Connector
下一篇
Day 23 撰寫 Sink Connector 測試程式
系列文
用30天介紹 open source 專案 Ohara 30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言