終於要進入core API實戰的最後一篇~Accumulator
與Broadcast
,趕快開始吧!
Accumulator
可在不同的executor間共享,但各個executor只能對Accumulator執行add
操作。Accumulator通常可來即時追蹤一些統計值,例如總和、計數等。
[Snippet.36]建立一個Accumulator
先看看怎麼建立一個accumulator物件吧~
acc = sc.accumulator(initialValue)
或
acc = sc.accumulator(initialValue,AccumulatorName)
來看個簡易範例吧:
scalaval acc = sc.accumulator(0, "acc name") ①
warning: there were two deprecation warnings; re-run with -deprecation for details
acc: org.apache.spark.Accumulator[Int] = 0
scala> val list = sc.parallelize(1 to 1000000) ②
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> list.foreach(x => acc.add(1)) ③
scala> acc.value ④
res14: Int = 1000000
①建立一個accumulator物件
②建立一個1000000個元素的RDD
③透過add
方法迭帶增加accumulator值,因為RDD的size為1000000,所以會執行add(1)
1000000次
④透過.value
取值
這邊要特別注意.value
取值只能在Driver程式端,executor端只能add
值
如果在executor讀值會如何?
scala> list.foreach(x => acc.value)
16/12/31 13:33:59 ERROR Executor: Exception in task 5.0 in stage 8.0 (TID 50)
java.lang.UnsupportedOperationException: Can't read accumulator value in task
at org.apache.spark.Accumulable.value(Accumulable.scala:117)
at $line65.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply$mcVI$sp(<console>:29)
at $line65.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
at $line65.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
...
...
恩~噴錯XD,別忘了Executor端只能寫入(Write-Only)唷
再來看個追蹤檔案內空行總筆數的案例,順便將檔案內每個字串都切開:
scala> val file =sc.textFile("README.md")
file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
scala> val blankLines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blankLines: org.apache.spark.Accumulator[Int] = 0
scala> val records = file.flatMap(line=>{ ①
if(line == "") ②
blankLines+=1 ③
line.split(" ") ④
})
records: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:28
scala> records.saveAsTextFile("output.txt") ⑤
scala> print("blank lines:"+blankLines.value)
blank lines:36
①開一個flatMap
搭配④的split
把parse出的字串攤平
②③check如果line為空,blankLines則加一
⑤將結果存入檔案
看看輸出:
joechh@joechh:~/Scala/sparkIronMan/Spark/src/main/resources/day17$ ll output.txt/
總計 28
drwxrwxr-x 2 joechh joechh 4096 12月 31 13:12 ./
drwxrwxr-x 3 joechh joechh 4096 12月 31 13:12 ../
-rw-r--r-- 1 joechh joechh 1915 12月 31 13:11 part-00000
-rw-r--r-- 1 joechh joechh 24 12月 31 13:11 .part-00000.crc
-rw-r--r-- 1 joechh joechh 1913 12月 31 13:11 part-00001
-rw-r--r-- 1 joechh joechh 24 12月 31 13:11 .part-00001.crc
-rw-r--r-- 1 joechh joechh 0 12月 31 13:11 _SUCCESS
-rw-r--r-- 1 joechh joechh 8 12月 31 13:11 ._SUCCESS.crc
joechh@joechh:~/Scala/sparkIronMan/Spark/src/main/resources/day17$
上述只是一個簡單的整數累accumulator,如果我想建立一個自定義物件的accumulator可以嗎?當然OK囉
[Snippet.37]建立自定義Accumulator
自定義Accumulator最簡單的方式就是實作一個implict的物件,並且extends AccumulableParam,其中必須實作3個函式:
看範例吧:
scala> val rdd = sc.parallelize (1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> import org.apache.spark.AccumulableParam ①
import org.apache.spark.AccumulableParam
scala> implicit object AvgAccParam extends AccumulableParam[(Int, Int), Int] { ②
def zero(v:(Int, Int)) = (0, 0) ③
def addInPlace(v1:(Int, Int), v2:(Int, Int)) = (v1._1+v2._1, v1._2+v2._2) ④
def addAccumulator(v1:(Int, Int), v2:Int) = (v1._1+1, v1._2+v2) ⑤
}
warning: there was one deprecation warning; re-run with -deprecation for details
defined object AvgAccParam
①匯入AccumulableParam類別
②extends AccumulableParam的AvgAccParam物件
③定義zero值
為一個(0,0)的pairRDD
④定義addInPlace
,為兩個不同kv
累積值的函式
⑤定義如何將新值加入Accumulator中
定義好了用看看吧:
scala> val acc=sc.accumulable((0,0))
warning: there was one deprecation warning; re-run with -deprecation for details
acc: org.apache.spark.Accumulable[(Int, Int),Int] = (0,0)
scala> rdd.foreach(x=>acc+=x)
檢查Accumulator的結果:
scala> acc.value._1
res17: Int = 100
scala> acc.value._2
res18: Int = 5050
scala> val mean =acc.value._2.toDouble/acc.value._1
mean: Double = 50.5
如果不想自己定義Accumulator,也能建立一個mutable的accumulableCollection來放累積值,看個範例吧:
scala> val colacc=sc.accumulableCollection(MutableList[Int]())
warning: there was one deprecation warning; re-run with -deprecation for details
colacc: org.apache.spark.Accumulable[scala.collection.mutable.MutableList[Int],Int] = MutableList()
用先前定義的rdd更新累積值:
scala> rdd.foreach(x=>colacc+=x)
scala> colacc.value
res20: scala.collection.mutable.MutableList[Int] = MutableList(13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 1, 2, 3, 4, 5, 6, 7, 8, 9, ....)
範例中每次更新的值都會是一個collection中的一個新元素,但必須注意寫入結果是無序的。
broadCast在鐵人賽開沒多久就有使用過了,再這裡就不重複講使用方式了,講一點使用broadCast變數的注意事項:
serializable
的物件都可宣告成broadCast.value
,直接讀取物件會造成重新將變數ship到該Executor,喪失broadCast的優點destroy
函式清除掉,但Spark預設也會將沒有被使用的broadCast(沒有被reference的廣播變數)清掉~補充在Scala下的 MutableList[Int]()
:
記得先 import collection.mutable._
不然執行 val colacc=sc.accumulableCollection(MutableList[Int]())
scala console會告訴你: error: not found: value MutableList