iT邦幫忙

2019 iT 邦幫忙鐵人賽

DAY 3
0
AI & Data

Streaming data process with Apache Flink系列 第 3

[Day3] Apache Flink Scala REPL

  • 分享至 

  • xImage
  •  

昨天我們執行了官方提供的範例程式, 透過CLI進行了flink job submit, 並看到了結果。如果在開發的過程中需要測試語法, 除了寫成scala檔包成JAR檔執行之外, 是否有更方便的方式呢?Flink提供了類似spark-shell的工具, 今天先來介紹這個工具, 後續要測試API的話會在這個工具底下進行。

執行Scala REPL

./bin/start-scala-shell.sh local

執行成功後會顯示下面的畫面
https://ithelp.ithome.com.tw/upload/images/20181011/20105229T63eRfGPkG.png
這個工具支援了Batch/Streaming, 因此在啟動之後, 兩種的ExecutionEnvironments會自動的建立起來,分別是"benv"和"senv", 有點類似spark-shell一樣啟動之後會幫你把sc(SparkContext)已經建立起來。會有簡單的範例如下圖:
https://ithelp.ithome.com.tw/upload/images/20181011/201052292GZpv4Rs5p.png

寫一個Streaming的程式透過Streaming API吧

  1. 接續昨天的Quick Start, 啟用port 9000輸入一些資料做資料源頭
  2. 在shell裡面建立data pipeline
//load data from socket
val textStreaming = senv.socketTextStream("localhost", 9000) 
//data transform with mapreduce
val countsStreaming = textStreaming.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.keyBy(0).sum(1) 
countsStreaming.print()
//run the programe
senv.execute("Streaming Wordcount")

3.執行結果如下
https://ithelp.ithome.com.tw/upload/images/20181011/20105229D7Rny9ZRkp.png

其他應用

  1. 增添JAR檔, 在執行shell的時候給予參數, 如:
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
  1. 從上述的code也可以看到, 如果想連接其他cluster或是yarn的資源的話可以在執行shell的時候給予相對應的參數。

上一篇
[Day2] Apache Flink Quick-Start
系列文
Streaming data process with Apache Flink3
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言