iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 17
1
Big Data

Spark 2.0 in Scala系列 第 17

[Spark-Day17](core API實戰篇) Shared Variable

  • 分享至 

  • xImage
  •  

終於要進入core API實戰的最後一篇~AccumulatorBroadcast,趕快開始吧!

Accumulator

Accumulator可在不同的executor間共享,但各個executor只能對Accumulator執行add操作。Accumulator通常可來即時追蹤一些統計值,例如總和、計數等。


[Snippet.36]建立一個Accumulator
先看看怎麼建立一個accumulator物件吧~

 acc = sc.accumulator(initialValue)
 或
 acc = sc.accumulator(initialValue,AccumulatorName)

來看個簡易範例吧:

scalaval acc = sc.accumulator(0, "acc name") ①
warning: there were two deprecation warnings; re-run with -deprecation for details
acc: org.apache.spark.Accumulator[Int] = 0

scala>  val list = sc.parallelize(1 to 1000000) ②
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala>  list.foreach(x => acc.add(1)) ③

scala>  acc.value ④
res14: Int = 1000000

①建立一個accumulator物件
②建立一個1000000個元素的RDD
③透過add方法迭帶增加accumulator值,因為RDD的size為1000000,所以會執行add(1)1000000次
④透過.value取值
這邊要特別注意.value取值只能在Driver程式端,executor端只能add

如果在executor讀值會如何?

scala>  list.foreach(x => acc.value)
16/12/31 13:33:59 ERROR Executor: Exception in task 5.0 in stage 8.0 (TID 50)
java.lang.UnsupportedOperationException: Can't read accumulator value in task
	at org.apache.spark.Accumulable.value(Accumulable.scala:117)
	at $line65.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply$mcVI$sp(<console>:29)
	at $line65.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
	at $line65.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
	...
    ...

恩~噴錯XD,別忘了Executor端只能寫入(Write-Only)唷

再來看個追蹤檔案內空行總筆數的案例,順便將檔案內每個字串都切開:

scala> val file =sc.textFile("README.md")
file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> val blankLines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blankLines: org.apache.spark.Accumulator[Int] = 0

scala> val records = file.flatMap(line=>{ ①
      if(line == "") ②
        blankLines+=1 ③
      line.split(" ") ④
      })
records: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:28

scala> records.saveAsTextFile("output.txt") ⑤

scala> print("blank lines:"+blankLines.value)
blank lines:36

①開一個flatMap搭配④的split把parse出的字串攤平
②③check如果line為空,blankLines則加一
⑤將結果存入檔案

看看輸出:

joechh@joechh:~/Scala/sparkIronMan/Spark/src/main/resources/day17$ ll output.txt/
總計 28
drwxrwxr-x 2 joechh joechh 4096 12月 31 13:12 ./
drwxrwxr-x 3 joechh joechh 4096 12月 31 13:12 ../
-rw-r--r-- 1 joechh joechh 1915 12月 31 13:11 part-00000
-rw-r--r-- 1 joechh joechh   24 12月 31 13:11 .part-00000.crc
-rw-r--r-- 1 joechh joechh 1913 12月 31 13:11 part-00001
-rw-r--r-- 1 joechh joechh   24 12月 31 13:11 .part-00001.crc
-rw-r--r-- 1 joechh joechh    0 12月 31 13:11 _SUCCESS
-rw-r--r-- 1 joechh joechh    8 12月 31 13:11 ._SUCCESS.crc
joechh@joechh:~/Scala/sparkIronMan/Spark/src/main/resources/day17$ 

上述只是一個簡單的整數累accumulator,如果我想建立一個自定義物件的accumulator可以嗎?當然OK囉


[Snippet.37]建立自定義Accumulator
自定義Accumulator最簡單的方式就是實作一個implict的物件,並且extends AccumulableParam,其中必須實作3個函式:

  1. zero(initialValue:T):定義Accumulator的初始值,此初始值會傳遞給每個executor
  2. addInPlace(v1:T,v2:T):定義merge兩個累積值的函式
  3. addAccumulator(v1:T,v2:T):定義如何將值加入Accumulator中

看範例吧:

scala> val rdd = sc.parallelize (1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> import org.apache.spark.AccumulableParam ①
import org.apache.spark.AccumulableParam

scala> implicit object AvgAccParam extends AccumulableParam[(Int, Int), Int] { ②
        def zero(v:(Int, Int)) = (0, 0) ③
        def addInPlace(v1:(Int, Int), v2:(Int, Int)) = (v1._1+v2._1, v1._2+v2._2) ④
        def addAccumulator(v1:(Int, Int), v2:Int) = (v1._1+1, v1._2+v2) ⑤
      }
warning: there was one deprecation warning; re-run with -deprecation for details
defined object AvgAccParam

①匯入AccumulableParam類別
②extends AccumulableParam的AvgAccParam物件
③定義zero值一個(0,0)的pairRDD
④定義addInPlace,為兩個不同kv累積值的函式
⑤定義如何將新值加入Accumulator中

定義好了用看看吧:

scala> val acc=sc.accumulable((0,0))
warning: there was one deprecation warning; re-run with -deprecation for details
acc: org.apache.spark.Accumulable[(Int, Int),Int] = (0,0)

scala> rdd.foreach(x=>acc+=x)

檢查Accumulator的結果:

scala> acc.value._1
res17: Int = 100

scala> acc.value._2
res18: Int = 5050

scala> val mean =acc.value._2.toDouble/acc.value._1
mean: Double = 50.5

如果不想自己定義Accumulator,也能建立一個mutable的accumulableCollection來放累積值,看個範例吧:

scala> val colacc=sc.accumulableCollection(MutableList[Int]())
warning: there was one deprecation warning; re-run with -deprecation for details
colacc: org.apache.spark.Accumulable[scala.collection.mutable.MutableList[Int],Int] = MutableList()

用先前定義的rdd更新累積值:

scala> rdd.foreach(x=>colacc+=x)

scala> colacc.value
res20: scala.collection.mutable.MutableList[Int] = MutableList(13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 1, 2, 3, 4, 5, 6, 7, 8, 9, ....)

範例中每次更新的值都會是一個collection中的一個新元素,但必須注意寫入結果是無序的。

BroadCast

broadCast在鐵人賽開沒多久就有使用過了,再這裡就不重複講使用方式了,講一點使用broadCast變數的注意事項:

  1. Driver建立broadCast,Executor端只能讀取,無法修改(跟accumulator相反~)
  2. 大型且重複被Executor使用的資料即可能是使用broadcast的好目標
  3. 任何serializable的物件都可宣告成broadCast
  4. 讀值一定要透過.value,直接讀取物件會造成重新將變數ship到該Executor,喪失broadCast的優點
  5. 如果broadCast不用了,可以透過destroy函式清除掉,但Spark預設也會將沒有被使用的broadCast(沒有被reference的廣播變數)清掉~

上一篇
[Spark-Day16](core API實戰篇) Task、Stages、Checkpoint
下一篇
[Spark-Day18](Spark Streaming篇)HDFS、Kafka環境設定
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
Darwin Watterson
iT邦好手 1 級 ‧ 2018-04-20 10:39:22

補充在Scala下的 MutableList[Int]() :
記得先 import collection.mutable._
不然執行 val colacc=sc.accumulableCollection(MutableList[Int]())
scala console會告訴你: error: not found: value MutableList

我要留言

立即登入留言