了解Joining用法後來看看Sorting跟Grouping,Grouping的high-level API我們前幾天已經看過了,今天會講一個底層的CombineByKey泛用聚合函式。並完成我們昨日剩餘的任務!
還記得昨天的totalsAndProds嗎?
scala> totalsAndProds.first
res7: (Int, (Double, Array[String])) = (34,(62592.43000000001,Array(34, GAM X360 Assassins Creed 3, 6363.95, 9)))
若我們希望根據商品名稱排序,該如何做?在一般RDD(一般或pairRDD都算),最常見的三種sorting函式就是:
sortBy使用起來我個人覺最簡單XD:
[Snippet.30] sortBy
scala> val sortedProds=totalsAndProds.sortBy(_._2._2(1)).collect
sortedProds: Array[(Int, (Double, Array[String]))] = Array((90,(48601.89,Array(90, AMBROSIA TRIFIDA POLLEN, 5887.49, 1))), (94,(31049.07,Array(94, ATOPALM MUSCLE AND JOINT, 1544.25, 7))), (87,(26047.72,Array(87, Acyclovir, 6252.58, 4))), (79,(50917.700000000004,Array(79, Alphanate, 4218.17, 4))), (83,(314...
直接定義要排序的欄位就可以了,預設為ascending,字母就是字典排序,數字就是小到大。
要倒序帶個boolean進去即可:
//descending
scala> val sortedProds=totalsAndProds.sortBy(_._2._2(1),false).collect
sortedProds: Array[(Int, (Double, Array[String]))] = Array((90,(48601.89,Array(90, AMBROSIA TRIFIDA POLLEN, 5887.49, 1))), (94,(31049.07,Array(94, ATOPALM MUSCLE AND JOINT, 1544.25, 7))), (87,(26047.72,Array(87, Acyclovir, 6252.58, 4))), (79,(50917.700000000004,Array(79, Alphanate, 4218.17, 4))), (83,(314...
[Snippet.31] sortByKey
而sortByKey呢?顧名思義就要有個key阿,所以這會用在pairRDD
上:
建立一個簡易範例吧:
//sortByey
scala> val a =sc.parallelize(List("joe","doris","john","vis","mary"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> val b =sc.parallelize(1 to 5)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> val simpleKV=a.zip(b) ①
simpleKV: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[30] at zip at <console>:28
scala> simpleKV.collect
res3: Array[(String, Int)] = Array((joe,1), (doris,2), (john,3), (vis,4), (mary,5))
scala> simpleKV.sortByKey().collect ②
res5: Array[(String, Int)] = Array((doris,2), (joe,1), (john,3), (mary,5), (vis,4))
scala> simpleKV.sortByKey(false).collect ③
res6: Array[(String, Int)] = Array((vis,4), (mary,5), (john,3), (joe,1), (doris,2))
①這是Scala提供的一個zip方式,可以很容易的將兩個集合物件黏
起來,可以用來方便產生KV
②ByKey排序,預設也是字典排序
③反向
此外要特別注意的就是,既然要排序,那元素本身就要是所謂的可排序
、可比較的
(sortable)。另外,能不能自己定義比較的邏輯?這是很常見的需求,自訂比較器!Scala類似Java,提供了Ordered與Ordering兩種trait(類似Java的Interface)!來看看要如何用吧?當兩個元素相比時,自己比較大(無論時哪種定義)時,回傳正數
,對方比較大時,回傳負
數,一樣大時回傳0
[Snippet.32]定義實作Ordered的類別
以一個簡易的Employee類別為例,本身有一個lastName(String型別)的類別欄位:
scala> case class Employee(lastName: String) extends Ordered[Employee] {
override def compare(that: Employee) =this.lastName.compare(that.lastName)}
defined class Employee
實作Ordered
時,必須實作compare
:override def compare(that: Employee) =this.lastName.compare(that.lastName)}
範例的實作直接偷用String的比較函式結果當作回傳值~
建一個employees集合的pairRDD來玩吧,K為Employee,V為Int:
scala> val employees= sc.parallelize(List((Employee("joe"),30),(Employee("doris"),20),(Employee("john"),10)))
employees: org.apache.spark.rdd.RDD[(Employee, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:28
scala> employees.sortByKey().collect
res18: Array[(Employee, Int)] = Array((Employee(doris),20), (Employee(joe),30), (Employee(john),10))
OK,有符合結果,按照lastName的字典排序。
若想換個比較規則,例如根據lastName的字串長度(短到長)
勒?
scala> case class Employee(lastName: String) extends Ordered[Employee] {
override def compare(that: Employee) =this.lastName.length - that.lastName.length}
defined class Employee
可以看到compare
的比較實作已經換掉了,換成比長度。
用用看吧:
scala> val employees= sc.parallelize(List((Employee("joe"),30),(Employee("doris"),20),(Employee("john"),10)))
employees: org.apache.spark.rdd.RDD[(Employee, Int)] = ParallelCollectionRDD[58] at parallelize at <console>:28
scala> employees.sortByKey().collect
res21: Array[(Employee, Int)] = Array((Employee(joe),30), (Employee(john),10), (Employee(doris),20))
OK,現在由短到長排序了!
如果現在我取得一個外部物件,某些原因我不能改變他(也就是讓他繼承Ordered),還有其他方式可以自定義排序方式嗎?有的:透過Ordering implicit
,接著看下去吧:
[Snippet.33]定義實作Ordering implicit函式
定義一個K為String的簡易pairRDD
scala> val simpleKV =sc.parallelize(List(("zz",4),("mary",5),("john",3),("joe",1),("doris",2)))
simpleKV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> simpleKV.sortByKey().collect
res1: Array[(String, Int)] = Array((doris,2), (joe,1), (john,3), (mary,5), (zz,4))
好,預設String的比較是字典排序,但我不想修改String類別,怎麼讓他變成依字串長度排序
呢:
scala> implicit val simpleOrdering:Ordering[String] = Ordering.by(_.length)
simpleOrdering: Ordering[String] = scala.math.Ordering$$anon$9@5d965dbd
這就是了,簡單吧!定義一個implicit轉換函式,收到String時,會幫你轉換!,並依Ordering.by(_.length)
,範例中為長度進行排序,看看結果:
scala> simpleKV.sortByKey().collect
res2: Array[(String, Int)] = Array((zz,4), (joe,1), (mary,5), (john,3), (doris,2))
implicit的函式名稱(在此為simpleOrdering)通常不重要,因為是隱含轉換阿XD~
如果有需求是ByKey Grouping,對每組V進行排序勒?
有許多種結果可以處理,但最好的解法有點複雜,流程大概是這樣
repartitionAndSortWithinPartition
降低網路IO,每個partiton內部自己排自己的XD接著要說明泛用的CombineByKey,他是許多其他聚合函數底層使用的函式,完整格式如下:
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
最常需要自定義的就是createCombiner
、mergeValue
、mergeCombiners
這3個函式。
createCombiner
:將第一個鍵的值(型別為V)轉成另一個型別C,有點類似map+初始值的概念mergeValue
:使用轉出來的型別C元素進行reduce函式操作(V轉C),同Aggregate的概念,這是發生在同一個partitoin中mergeCombiners
:跨partition的合併,這不陌生了吧先看個簡易求每個K的V個別平均值例子吧:
先定義簡易的pairRDD:
scala> val data =sc.parallelize(List(("a",1),("a",2),("a",3),("b",10),("b",20)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24
[Snippet.34]combineByKey使用範例
scala> val res= data.combineByKey(
(v)=>(v,1), ①
(acc:(Int,Int),v)=>(acc._1+v,acc._2+1), ②
(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)) ③
res: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[13] at combineByKey at <console>:26
scala> res.map{case(key,value)=>(key,value._1/value._2.toFloat)}.collect ④
res4: Array[(String, Float)] = Array((a,2.0), (b,15.0))
①定義createCombiner
:將V轉成一個KV值
②定義mergeValue
:遍歷整個partition內將V值更新到KV中,從疊加的方式看的出來KV中的K負責總數,V負責個數
③定義mergeCombiners
:疊加各partition的結果。
④將res
的KV的V進行轉換,V本身也是一個另外一個KV(總和->個數),透過map將KV轉成一個平均值,另外可以觀察學習Scala的 pattern match用法,另外還有透過value._2.toFloat
將結果轉成浮點數。
回到我們先前的最後的任務需求:從前面的例子可以看出來,求avg可以轉換成total & count,而total剛好又符合總金額的需求XD~先想一下,total/max/min/count要怎麼建立(max/min指的是購買單價)createCombiner
?那目標RDD勒?看一下需求應該用之前用的transByCust
就可以滿足需求了,還記得transByCust
嗎?照例上cheat sheet XD:
scala> val transFile= sc.textFile("data_transactions.txt") ①
transFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[76] at textFile at <console>:24
scala> val transData = transFile.map(_.split("#")) ②
transData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[77] at map at <console>:26
scala> var transByCust = transData.map(tran => (tran(2).toInt,tran)) ③
transByCust: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[78] at map at <console>:28
想到要怎麼轉換V了嗎?當然就是開4個欄位分別是min/max/count/total阿XDD,也就是將原本的Array轉成這4個欄位的意思~在Scala怎麼開4個欄位的Tuple呢?直接開阿XD~他會幫你轉成Tuple4!
scala> val testTuple = (1,2,3,4)
testTuple: (Int, Int, Int, Int) = (1,2,3,4)
scala> testTuple
res8: (Int, Int, Int, Int) = (1,2,3,4)
scala> testTuple._1
res9: Int = 1
scala> testTuple._2
res10: Int = 2
scala> testTuple._3
res11: Int = 3
scala> testTuple._4
res12: Int = 4
OK來開吧:
scala> def createComb = (t:Array[String]) => { ①
val total = t(5).toDouble ②
val q = t(4).toInt ③
(total/q, total/q, q, total) ④
|}
createComb: Array[String] => (Double, Double, Int, Double)
①transByCust的V
是一個Array[String]型別,沒錯,宣告成名稱為t
的陣列帶入
②建立一個變數total接t(5)的元素並轉型(因為原本為String阿)
③建立一個變數q(quantity)接t(4)商品數量並轉型
④return一個Tuple4,初始值的購買單價只有一筆,就先用total/q擋著吧
好了,完成最重要的一步了,再來就簡單啦~mergeValue
該怎麼做勒?
scala> def mergeVal:((Double,Double,Int,Double),Array[String]) => ①
(Double,Double,Int,Double) =
{ case((mn,mx,c,tot),t) => { ②
val total = t(5).toDouble ③
val q = t(4).toInt ④
(scala.math.min(mn,total/q),scala.math.max(mx,total/q),c+q,tot+total) ⑤
}}
mergeVal: ((Double, Double, Int, Double), Array[String]) => (Double, Double, Int, Double)
①別忘了acc是Tuple4,要滾的V值還是Array[String]
②應用case的寫法,示範範例有用過了,基本上這邊只會match這個patternXD
③④一樣要從Array中反覆讀取該筆的金額與數量
⑤輸出新的Tuple4,別忘了看一下min/max的寫法
mergeValue
是在遍歷同一個partition,再來合併每個partition的Tuple4要怎麼寫?想一下
scala> def mergeComb:((Double,Double,Int,Double),(Double,Double,Int,Double))=>
(Double,Double,Int,Double) =
{ case((mn1,mx1,c1,tot1),(mn2,mx2,c2,tot2)) =>
(scala.math.min(mn1,mn1),scala.math.max(mx1,mx2),c1+c2,tot1+tot2) }
mergeComb: ((Double, Double, Int, Double), (Double, Double, Int, Double)) => (Double, Double, Int, Double)
這邊我就不逐步講了,太easy~
將辛苦寫好的combineByKey啟用吧!
scala> val avgByCust = transByCust.combineByKey(createComb, mergeVal, mergeComb).
mapValues({case(mn,mx,cnt,tot) => (mn,mx,cnt,tot,tot/cnt)}) ①
avgByCust: org.apache.spark.rdd.RDD[(Int, (Double, Double, Int, Double, Double))] = MapPartitionsRDD[9] at mapValues at <console>:38
①順便將Tuple4換成Tuple5,那的出有哪5個嗎XD~
Show一下吧~
scala> avgByCust.collect()
res0: Array[(Int, (Double, Double, Int, Double, Double))] = Array((34,(3.942,2212.7425,82,77332.59,943.0803658536585)), (96,(856.2885714285715,4975.08,57,36928.57,647.869649122807)), (4,(63.18222222222222,6614.49,58,41801.35,720.7129310344827)), (16,(359.59875,3220.685,43,40696.020000000004,946.419069767442)), (82,(78.32333333333334,5932.36,66,58722.58,889.7360606060606))....
有了,格式為(顧客ID,(購買最低單價、購買最高單價、數量,總金額,平均)),或許可以考慮將這個RDD輸出成檔案,要怎麼做勒?
scala> avgByCust.map{ case (id, (min, max, cnt, tot, avg)) =>
"%d#%.2f#%.2f#%d#%.2f#%.2f".format(id, min, max, cnt, tot, avg)}.
saveAsTextFile("output-avgByCust")
"%d#%.2f#%.2f#%d#%.2f#%.2f"
是String format的用法,這就留給各位去玩囉~
存檔勒?呼叫saveAsTextFile(PATH)
即可。
PS:#
號只是分隔符號而已別被騙了XD~