設定完了,開始來看看Spark Streaming到底是啥吧!基本上寫Streaming我會比較習慣在IDE中,所以回到Intellij
+ Scala Plugin
+ SBT
吧。
我們目前已經看過兩個Spark的起手式(起始物件):
SparkContext
(暱稱sc)SparkSession
(暱稱ss)那SparkStreaming勒?那就是SparkStreamingContext
(暱稱ssc)囉
看看起手式需要哪些套件跟建立方式:
[Snippet.40]建立Spark Streaming物件
//~~套件段~~
import org.apache.spark._ ①
import org.apache.spark.streaming._ ②
//~~程式段~~
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") ③
val ssc = new StreamingContext(conf, Seconds(10)) ④
①②是執行SparkStreaming基本的元件
③建立SparkConf,這個好東西建立SparkContext、SparkSession、SparkStreamingContext都會用到,基本上就是個設定檔元件,先前在玩SparkSQL時都是用fluent style的寫法串在一起,這裡分開寫。另外這邊要注意的就是必須給超過1個core
, 因為一個core會被Streaming receiver用掉了,只給1core的話就沒有另外的core可以來實際處理串流事件了。
④建立一個ssc物件,這個地方有個重點是Seconds(10)
,這裡必須帶入一個Scala的Duration物件,描述時間長度的。而這寫法代表啥勒?代表Spark Streaming會以每秒處理一次的頻率批次處理串流
有沒有看到④裏面的批次
這兩個字~對啦!原本Spark跟MapReduce一樣都是處理批次作業的。但是Spark透過將串流切成一小段一小段
的批次,這樣就能用處理批次的方式處理串流啦XD~這就是微批次的方式。上個官方的微批次圖吧:
注意這與Storm等串流架構並不相同,架構師或設計師可能要注意這部份。微批次是否能滿足任務需求~
官方有個簡易的範例,但都是給片段的函式用法,直接拿來改寫成App玩玩吧:
package SparkIronMan
/**
* Created by joechh on 2016/12/6.
*/
import org.apache.spark._
import org.apache.spark.streaming._
object Day19_SimpleStreaming extends App {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999) ①
val words = lines.flatMap(_.split(" ")) ②
val pairs = words.map(word => (word, 1)) ③
val wordCounts = pairs.reduceByKey(_ + _) ④
wordCounts.print() ⑤
ssc.start() ⑥
ssc.awaitTermination() ⑦
//In another terminator, using #nc -lk 9999 to try the task!
}
① ssc支援從多個不同來源端
接收串流,像我們後續會講到從檔案(Local、HDFS、S3等)或是從外部系統(Kafka、Akka、Flume等),此範例直接用聽某個socketPort(聽在9999),並把內容當作文字接進來的socketTextStream
,
② 這個動作不陌生了吧?切開 + 攤平靠flatMap
③④ 轉成pairRDD
,接reduceByKey連續技,阿...這不就是經典(用到爛掉)的wordCount嗎XD
⑤ Streaming處理完了必須要有個輸出,print()
就是個建議的輸出方式,將輸出印在console上(預設印出RDD中的前10筆),就像SparkSQL的show
一樣,但真實情景常用的可能是寫入檔案
,或是foreachRDD轉入DB
之類的。
Spark Streaming會要求你將處理過的資訊丟到某個地方,如果不輸出(將print拿掉)會如何:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
噴錯~
⑥重點來了,Spark的lazy evaluation在Streaming一樣有用,就像是一般RDD的action類型操作一樣,即便以上都設定好了,你還另外必須執行start
才會開始運作,產生真正的Steaming物件(DStreams),然後啟動receivers並開始處理
⑦如果不讓Driver端的主Thread離開(exit),加個awaitTermination()
吧
執行下去吧!!
17/01/05 03:11:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/01/05 03:11:46 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:9999
java.net.ConnectException: 連線被拒絕
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
....
立馬噴錯XD,為啥!!
喔喔,原來是因為socketTextStream預設那個port會有服務
了,也就是有程序咬住那個port
。現在根本沒有服務在上面阿XD~
開個terminator用netcat
指令送資料給某個port吧:
joechh@joechh:~$ nc -lk 9999
用netstat
指令驗證9999 port上面有程序了
joechh@joechh:~$ sudo netstat -tnlpn
恩,有了。再次執行Spark Streaming程式!
剛開始應該是類似這樣的畫面:
-------------------------------------------
Time: 1483557535000 ms
-------------------------------------------
-------------------------------------------
Time: 1483557540000 ms
-------------------------------------------
因為沒有收到任何東西麻~在nc那個terminator隨便打點東西進去
joechh@joechh:~$ nc -lk 9999
Hello, I am Joe
Spark Streaming is so cool
AAAA
AA AA AA
有東西了!
-------------------------------------------
Time: 1483557560000 ms
-------------------------------------------
(Joe,1)
(am,1)
(Hello,,1)
(I,1)
-------------------------------------------
Time: 1483557565000 ms
-------------------------------------------
-------------------------------------------
Time: 1483557570000 ms
-------------------------------------------
-------------------------------------------
Time: 1483557575000 ms
-------------------------------------------
(is,1)
(so,1)
(Streaming,1)
(cool,1)
(Spark,1)
(AAAA,1)
(AA,3)
想一下,如果不想玩wordCounting了,我想讓該筆事件中有含"error"才輸出到螢幕上
,怎麼做?
將中間WordCount換成:
val errorLines = lines.filter(_.contains("error"));
errorLines.print()
自己玩玩看吧:)