往下介紹之前,先講解一下Scala中的Implicit Conversion(隱式轉換),因為Spark大量使用宣告方式,所以了解Implicit Conversion對看API文件或原始碼時很有幫助。但Implicit說起來有點抽象,還是看例子吧:
[Snippet.4]宣告一個 type parameterized的Class類別
scala> class ClassOne[T](val input: T) { }
defined class ClassOne
ClassOne
的類別,跟在其後的是[T]
,這在Scala中的講法是type parameterized,代表這個類別可以儲存不定類別的物件,這東西就先把它當作Java或C++中的泛型看待就可以了。(val input: T)
是宣告類別成員,Scala在Class宣告時可以直接宣告field memeber類別成員,範例中就是類別T
的input變數,然後還自帶getter與setter不用自己宣告,酷吧XD。都會宣告type parameterized的Class,那宣告一般Class就不在話下了:
scala> class ClassOneStr(val one: ClassOne[String]) {
def duplicatedString() = one.input + one.input
}
defined class ClassOneStr
scala> class ClassOneInt(val one: ClassOne[Int]) {
def duplicatedInt() = one.input.toString + one.input.toString
}
defined class ClassOneInt
duplicatedXXX
的簡易函式,一個重複Str,一個重複Int(為了輸出成字串所以透過toString轉型)。OK,有趣的事情來了,宣告implict 函式!
scala> implicit def toStrMethods(one: ClassOne[String]) = new ClassOneStr(one)
toStrMethods: (one: ClassOne[String])ClassOneStr
scala> implicit def toIntMethods(one: ClassOne[Int]) = new ClassOneInt(one)
toIntMethods: (one: ClassOne[Int])ClassOneInt
宣告兩個隱式轉換函式toStrMethods
與toIntMethods
,舉toStrMethods為例,我認為容易解讀的方式為以下:
(one: ClassOne[String]) = new ClassOneStr(one)
,這代表使用ClassOne[String]的物件時one,會隱密的幫你轉換成new ClassOneStr(one)的ClassOneStr物件,而且類別成員都幫你指定好了XD因此,如果我們有一個物件如下宣告:
scala> val oneStrTest= new ClassOne("test")
oneStrTest: ClassOne[String] = ClassOne@1d8e9d3e
oneStrTest使用時會幫你轉成ClassOneStr,這有啥好處勒?
這樣就能用ClassOneStr的duplicatedString()啦,並且保證型別是對的
scala> oneStrTest.duplicatedString
res0: String = testtest
驗證另外一種type是否能呼叫duplicatedString:
scala> val oneIntTest = new ClassOne(123)
oneIntTest: ClassOne[Int] = ClassOne@5617168c
scala> oneIntTest.duplicatedString
<console>:33: error: value duplicatedString is not a member of ClassOne[Int]
oneIntTest.duplicatedString
你看,若宣告為Int類別,呼叫duplicatedString就會報錯啦。
implicit conversion
來達成的,讓API呼叫端比較無感XDDouble RDD
Double類RDD有許多Int類沒有的好用函式(但結果可能為Double,所以為Double類XDDD)
用昨天的distIds當作例子吧(忘記這啥的趕快去看一下昨天的distinct)
scala> distIds
res9: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at distinct at <console>:35
scala> distIds.collect
res3: Array[String] = Array(80, 20, 15, 31, 17, 13, 77, 36, 16, 52, 30, 94)
STOP!別忘了現在distIds還是String類別,那該如何做勒?沒錯,就是用map轉換
scala> val distIdsNum = distIds.map(_.toInt)
distIdsNum: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:37
OK,來個描述統計連續技:
scala> distIdsNum.mean
res5: Double = 40.083333333333336
scala> distIdsNum.sum
res6: Double = 481.0
scala> distIdsNum.variance
res7: Double = 757.0763888888888
scala> distIdsNum.stdev
res8: Double = 27.515021150071625
另外若在大資料集上,求準確的sum與means的時間太長,有兩個特別的sumApprox
與meanApprox
action類別函式可以玩玩。
2018也還是有人看你的文章,感動吧! (目前正努力啃Java API,把你的30天改成Java版 XD)
不過本篇的範例統計連續技在Spark v2.2.1要改成
distIdsNum.rdd.mean, distIdsNum.rdd.sum, distIdsNum.rdd.variance, distIdsNum.rdd.stdev; 因為 distIds.map(_.toInt) 在 v2.2.1中的資料型態從 RDD[Int] 變成 Dataset[Int] 了 !
哈哈, 範例沒錯! 應該是說當時讀檔時是用 sc.textFile("xxx")讀入還是用spark.read.textFile("xxx")讀入的差異!本來以為v2.2.1後,spark-shell已經沒有sc了(因為官網的quick start是從spark.read.textFile("README.md")開始的),後來才發現spark-shell的歡迎畫面有列: Spark context available as 'sc'以及Spark session available as 'spark'!