iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 19
1
Big Data

Spark 2.0 in Scala系列 第 19

[Spark-Day19](Spark Streaming篇)Streaming初探

設定完了,開始來看看Spark Streaming到底是啥吧!基本上寫Streaming我會比較習慣在IDE中,所以回到Intellij + Scala Plugin + SBT吧。

我們目前已經看過兩個Spark的起手式(起始物件):

  • 一般RDD:SparkContext(暱稱sc)
  • SparkSQL:SparkSession(暱稱ss)

那SparkStreaming勒?那就是SparkStreamingContext(暱稱ssc)囉
看看起手式需要哪些套件跟建立方式:


[Snippet.40]建立Spark Streaming物件

//~~套件段~~
import org.apache.spark._ ①
import org.apache.spark.streaming._ ②
//~~程式段~~
  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") ③
  val ssc = new StreamingContext(conf, Seconds(10)) ④

①②是執行SparkStreaming基本的元件
③建立SparkConf,這個好東西建立SparkContext、SparkSession、SparkStreamingContext都會用到,基本上就是個設定檔元件,先前在玩SparkSQL時都是用fluent style的寫法串在一起,這裡分開寫。另外這邊要注意的就是必須給超過1個core, 因為一個core會被Streaming receiver用掉了,只給1core的話就沒有另外的core可以來實際處理串流事件了。
④建立一個ssc物件,這個地方有個重點是Seconds(10),這裡必須帶入一個Scala的Duration物件,描述時間長度的。而這寫法代表啥勒?代表Spark Streaming會以每秒處理一次的頻率批次處理串流


Spark Streaming MicroBatch

有沒有看到④裏面的批次這兩個字~對啦!原本Spark跟MapReduce一樣都是處理批次作業的。但是Spark透過將串流切成一小段一小段的批次,這樣就能用處理批次的方式處理串流啦XD~這就是微批次的方式。上個官方的微批次圖吧:
http://ithelp.ithome.com.tw/upload/images/20170105/20103839s17rWkpMaN.png
注意這與Storm等串流架構並不相同,架構師或設計師可能要注意這部份。微批次是否能滿足任務需求~

官方有個簡易的範例,但都是給片段的函式用法,直接拿來改寫成App玩玩吧:

package SparkIronMan
/**
  * Created by joechh on 2016/12/6.
  */
import org.apache.spark._
import org.apache.spark.streaming._

object Day19_SimpleStreaming extends App {
  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(5)) 
  val lines = ssc.socketTextStream("localhost", 9999) ①
  val words = lines.flatMap(_.split(" ")) ②
  val pairs = words.map(word => (word, 1)) ③
  val wordCounts = pairs.reduceByKey(_ + _) ④

  wordCounts.print() ⑤
  ssc.start() ⑥
  ssc.awaitTermination() ⑦
  //In another terminator, using #nc -lk 9999 to try the task!
}

① ssc支援從多個不同來源端接收串流,像我們後續會講到從檔案(Local、HDFS、S3等)或是從外部系統(Kafka、Akka、Flume等),此範例直接用聽某個socketPort(聽在9999),並把內容當作文字接進來的socketTextStream
② 這個動作不陌生了吧?切開 + 攤平靠flatMap
③④ 轉成pairRDD,接reduceByKey連續技,阿...這不就是經典(用到爛掉)的wordCount嗎XD
⑤ Streaming處理完了必須要有個輸出,print()就是個建議的輸出方式,將輸出印在console上(預設印出RDD中的前10筆),就像SparkSQL的show一樣,但真實情景常用的可能是寫入檔案,或是foreachRDD轉入DB之類的。
Spark Streaming會要求你將處理過的資訊丟到某個地方,如果不輸出(將print拿掉)會如何:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
	at scala.Predef$.require(Predef.scala:224)

噴錯~
⑥重點來了,Spark的lazy evaluation在Streaming一樣有用,就像是一般RDD的action類型操作一樣,即便以上都設定好了,你還另外必須執行start才會開始運作,產生真正的Steaming物件(DStreams),然後啟動receivers並開始處理
⑦如果不讓Driver端的主Thread離開(exit),加個awaitTermination()

執行下去吧!!

17/01/05 03:11:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/01/05 03:11:46 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:9999
java.net.ConnectException: 連線被拒絕
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at java.net.Socket.connect(Socket.java:538)
	....

立馬噴錯XD,為啥!!
喔喔,原來是因為socketTextStream預設那個port會有服務了,也就是有程序咬住那個port。現在根本沒有服務在上面阿XD~

開個terminator用netcat指令送資料給某個port吧:

joechh@joechh:~$ nc -lk 9999

netstat指令驗證9999 port上面有程序了

joechh@joechh:~$ sudo netstat -tnlpn

http://ithelp.ithome.com.tw/upload/images/20170105/201038399AiqMVRWIh.png
恩,有了。再次執行Spark Streaming程式!

剛開始應該是類似這樣的畫面:

-------------------------------------------
Time: 1483557535000 ms
-------------------------------------------

-------------------------------------------
Time: 1483557540000 ms
-------------------------------------------

因為沒有收到任何東西麻~在nc那個terminator隨便打點東西進去

joechh@joechh:~$ nc -lk 9999
Hello, I am Joe      
Spark Streaming is so cool
AAAA
AA AA AA

有東西了!

-------------------------------------------
Time: 1483557560000 ms
-------------------------------------------
(Joe,1)
(am,1)
(Hello,,1)
(I,1)
-------------------------------------------
Time: 1483557565000 ms
-------------------------------------------

-------------------------------------------
Time: 1483557570000 ms
-------------------------------------------

-------------------------------------------
Time: 1483557575000 ms
-------------------------------------------
(is,1)
(so,1)
(Streaming,1)
(cool,1)
(Spark,1)
(AAAA,1)
(AA,3)

想一下,如果不想玩wordCounting了,我想讓該筆事件中有含"error"才輸出到螢幕上,怎麼做?

將中間WordCount換成:

val errorLines = lines.filter(_.contains("error"));
errorLines.print()

自己玩玩看吧:)


上一篇
[Spark-Day18](Spark Streaming篇)HDFS、Kafka環境設定
下一篇
[Spark-Day20](Spark Streaming篇)Stateless Streaming by Use Case
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言