iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 15
1
Big Data

Spark 2.0 in Scala系列 第 15

[Spark-Day15](core API實戰篇) Sorting, Grouping by Use Case

  • 分享至 

  • xImage
  •  

了解Joining用法後來看看Sorting跟Grouping,Grouping的high-level API我們前幾天已經看過了,今天會講一個底層的CombineByKey泛用聚合函式。並完成我們昨日剩餘的任務!

Sorting

還記得昨天的totalsAndProds嗎?

scala> totalsAndProds.first
res7: (Int, (Double, Array[String])) = (34,(62592.43000000001,Array(34, GAM X360 Assassins Creed 3, 6363.95, 9)))

若我們希望根據商品名稱排序,該如何做?在一般RDD(一般或pairRDD都算),最常見的三種sorting函式就是:

  • sortBy
  • sortByKey
  • repartitionAndSortWithinPartition

sortBy使用起來我個人覺最簡單XD:


[Snippet.30] sortBy

scala> val sortedProds=totalsAndProds.sortBy(_._2._2(1)).collect
sortedProds: Array[(Int, (Double, Array[String]))] = Array((90,(48601.89,Array(90, AMBROSIA TRIFIDA POLLEN, 5887.49, 1))), (94,(31049.07,Array(94, ATOPALM MUSCLE AND JOINT, 1544.25, 7))), (87,(26047.72,Array(87, Acyclovir, 6252.58, 4))), (79,(50917.700000000004,Array(79, Alphanate, 4218.17, 4))), (83,(314...

直接定義要排序的欄位就可以了,預設為ascending,字母就是字典排序,數字就是小到大。

要倒序帶個boolean進去即可:

//descending
scala> val sortedProds=totalsAndProds.sortBy(_._2._2(1),false).collect
sortedProds: Array[(Int, (Double, Array[String]))] = Array((90,(48601.89,Array(90, AMBROSIA TRIFIDA POLLEN, 5887.49, 1))), (94,(31049.07,Array(94, ATOPALM MUSCLE AND JOINT, 1544.25, 7))), (87,(26047.72,Array(87, Acyclovir, 6252.58, 4))), (79,(50917.700000000004,Array(79, Alphanate, 4218.17, 4))), (83,(314...

[Snippet.31] sortByKey
而sortByKey呢?顧名思義就要有個key阿,所以這會用在pairRDD上:
建立一個簡易範例吧:

//sortByey
scala> val a =sc.parallelize(List("joe","doris","john","vis","mary"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> val b =sc.parallelize(1 to 5)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24

scala> val simpleKV=a.zip(b) ①
simpleKV: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[30] at zip at <console>:28

scala> simpleKV.collect
res3: Array[(String, Int)] = Array((joe,1), (doris,2), (john,3), (vis,4), (mary,5))

scala> simpleKV.sortByKey().collect ②
res5: Array[(String, Int)] = Array((doris,2), (joe,1), (john,3), (mary,5), (vis,4))

scala> simpleKV.sortByKey(false).collect ③
res6: Array[(String, Int)] = Array((vis,4), (mary,5), (john,3), (joe,1), (doris,2))

①這是Scala提供的一個zip方式,可以很容易的將兩個集合物件起來,可以用來方便產生KV
②ByKey排序,預設也是字典排序
③反向

此外要特別注意的就是,既然要排序,那元素本身就要是所謂的可排序可比較的(sortable)。另外,能不能自己定義比較的邏輯?這是很常見的需求,自訂比較器!Scala類似Java,提供了Ordered與Ordering兩種trait(類似Java的Interface)!來看看要如何用吧?當兩個元素相比時,自己比較大(無論時哪種定義)時,回傳正數,對方比較大時,回傳數,一樣大時回傳0


[Snippet.32]定義實作Ordered的類別
以一個簡易的Employee類別為例,本身有一個lastName(String型別)的類別欄位:

scala> case class Employee(lastName: String) extends Ordered[Employee] {
          override def compare(that: Employee) =this.lastName.compare(that.lastName)}
defined class Employee

實作Ordered時,必須實作compare:
override def compare(that: Employee) =this.lastName.compare(that.lastName)}
範例的實作直接偷用String的比較函式結果當作回傳值~

建一個employees集合的pairRDD來玩吧,K為Employee,V為Int:

scala> val employees= sc.parallelize(List((Employee("joe"),30),(Employee("doris"),20),(Employee("john"),10)))
employees: org.apache.spark.rdd.RDD[(Employee, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:28

scala> employees.sortByKey().collect
res18: Array[(Employee, Int)] = Array((Employee(doris),20), (Employee(joe),30), (Employee(john),10))

OK,有符合結果,按照lastName的字典排序。

若想換個比較規則,例如根據lastName的字串長度(短到長)勒?

scala> case class Employee(lastName: String) extends Ordered[Employee] {
          override def compare(that: Employee) =this.lastName.length - that.lastName.length}
defined class Employee

可以看到compare的比較實作已經換掉了,換成比長度。

用用看吧:

scala> val employees= sc.parallelize(List((Employee("joe"),30),(Employee("doris"),20),(Employee("john"),10)))
employees: org.apache.spark.rdd.RDD[(Employee, Int)] = ParallelCollectionRDD[58] at parallelize at <console>:28

scala> employees.sortByKey().collect
res21: Array[(Employee, Int)] = Array((Employee(joe),30), (Employee(john),10), (Employee(doris),20))

OK,現在由短到長排序了!

如果現在我取得一個外部物件,某些原因我不能改變他(也就是讓他繼承Ordered),還有其他方式可以自定義排序方式嗎?有的:透過Ordering implicit,接著看下去吧:


[Snippet.33]定義實作Ordering implicit函式
定義一個K為String的簡易pairRDD

scala> val simpleKV =sc.parallelize(List(("zz",4),("mary",5),("john",3),("joe",1),("doris",2)))
simpleKV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> simpleKV.sortByKey().collect
res1: Array[(String, Int)] = Array((doris,2), (joe,1), (john,3), (mary,5), (zz,4))

好,預設String的比較是字典排序,但我不想修改String類別,怎麼讓他變成依字串長度排序呢:

scala> implicit val simpleOrdering:Ordering[String] = Ordering.by(_.length)
simpleOrdering: Ordering[String] = scala.math.Ordering$$anon$9@5d965dbd

這就是了,簡單吧!定義一個implicit轉換函式,收到String時,會幫你轉換!,並依Ordering.by(_.length),範例中為長度進行排序,看看結果:

scala> simpleKV.sortByKey().collect
res2: Array[(String, Int)] = Array((zz,4), (joe,1), (mary,5), (john,3), (doris,2))

implicit的函式名稱(在此為simpleOrdering)通常不重要,因為是隱含轉換阿XD~


Secondary Sort議題

如果有需求是ByKey Grouping,對每組V進行排序勒?

有許多種結果可以處理,但最好的解法有點複雜,流程大概是這樣

  1. 先將RDD[(K,V)]map成RDD[((K,V),null)]
  2. 使用自定義的partitioner,將(K,V)中相同K的元素分到同一個partition中
  3. 因為相同的K都放在同一個分區了,所以使用repartitionAndSortWithinPartition降低網路IO,每個partiton內部自己排自己的XD
  4. 再將RDD從RDD[((K,V),null)]轉回RDD[(K,V)]

Grouping by CombineByKey

接著要說明泛用的CombineByKey,他是許多其他聚合函數底層使用的函式,完整格式如下:

def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]

最常需要自定義的就是createCombinermergeValuemergeCombiners這3個函式。

  • createCombiner:將第一個鍵的值(型別為V)轉成另一個型別C,有點類似map+初始值的概念
  • mergeValue:使用轉出來的型別C元素進行reduce函式操作(V轉C),同Aggregate的概念,這是發生在同一個partitoin中
  • mergeCombiners:跨partition的合併,這不陌生了吧

先看個簡易求每個K的V個別平均值例子吧:
先定義簡易的pairRDD:

scala> val data =sc.parallelize(List(("a",1),("a",2),("a",3),("b",10),("b",20)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24

[Snippet.34]combineByKey使用範例

scala> val res= data.combineByKey(
      (v)=>(v,1),  ①
      (acc:(Int,Int),v)=>(acc._1+v,acc._2+1), ②
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)) ③
res: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[13] at combineByKey at <console>:26

scala> res.map{case(key,value)=>(key,value._1/value._2.toFloat)}.collect ④
res4: Array[(String, Float)] = Array((a,2.0), (b,15.0))

①定義createCombiner:將V轉成一個KV值
②定義mergeValue:遍歷整個partition內將V值更新到KV中,從疊加的方式看的出來KV中的K負責總數,V負責個數
③定義mergeCombiners:疊加各partition的結果。
④將res的KV的V進行轉換,V本身也是一個另外一個KV(總和->個數),透過map將KV轉成一個平均值,另外可以觀察學習Scala的 pattern match用法,另外還有透過value._2.toFloat將結果轉成浮點數。

任務:客戶購買行為的相關描述統計:avg、max、min與總金額

回到我們先前的最後的任務需求:從前面的例子可以看出來,求avg可以轉換成total & count,而total剛好又符合總金額的需求XD~先想一下,total/max/min/count要怎麼建立(max/min指的是購買單價)createCombiner?那目標RDD勒?看一下需求應該用之前用的transByCust就可以滿足需求了,還記得transByCust嗎?照例上cheat sheet XD:

scala> val transFile= sc.textFile("data_transactions.txt") ①
transFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[76] at textFile at <console>:24

scala> val transData = transFile.map(_.split("#")) ②
transData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[77] at map at <console>:26

scala> var transByCust = transData.map(tran => (tran(2).toInt,tran)) ③
transByCust: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[78] at map at <console>:28

想到要怎麼轉換V了嗎?當然就是開4個欄位分別是min/max/count/total阿XDD,也就是將原本的Array轉成這4個欄位的意思~在Scala怎麼開4個欄位的Tuple呢?直接開阿XD~他會幫你轉成Tuple4!

scala> val testTuple = (1,2,3,4)
testTuple: (Int, Int, Int, Int) = (1,2,3,4)

scala> testTuple
res8: (Int, Int, Int, Int) = (1,2,3,4)

scala> testTuple._1
res9: Int = 1

scala> testTuple._2
res10: Int = 2

scala> testTuple._3
res11: Int = 3

scala> testTuple._4
res12: Int = 4

OK來開吧:

scala> def createComb = (t:Array[String]) => { ①
      val total = t(5).toDouble ②
      val q = t(4).toInt ③
      (total/q, total/q, q, total) ④
     |}
createComb: Array[String] => (Double, Double, Int, Double)

①transByCust的V是一個Array[String]型別,沒錯,宣告成名稱為t的陣列帶入
②建立一個變數total接t(5)的元素並轉型(因為原本為String阿)
③建立一個變數q(quantity)接t(4)商品數量並轉型
④return一個Tuple4,初始值的購買單價只有一筆,就先用total/q擋著吧

好了,完成最重要的一步了,再來就簡單啦~mergeValue該怎麼做勒?

scala> def mergeVal:((Double,Double,Int,Double),Array[String]) =>  ①
      (Double,Double,Int,Double) =
          { case((mn,mx,c,tot),t) => { ②
            val total = t(5).toDouble ③
            val q = t(4).toInt ④
            (scala.math.min(mn,total/q),scala.math.max(mx,total/q),c+q,tot+total) ⑤
      }}
mergeVal: ((Double, Double, Int, Double), Array[String]) => (Double, Double, Int, Double)

①別忘了acc是Tuple4,要滾的V值還是Array[String]
②應用case的寫法,示範範例有用過了,基本上這邊只會match這個patternXD
③④一樣要從Array中反覆讀取該筆的金額與數量
⑤輸出新的Tuple4,別忘了看一下min/max的寫法

mergeValue是在遍歷同一個partition,再來合併每個partition的Tuple4要怎麼寫?想一下

scala> def mergeComb:((Double,Double,Int,Double),(Double,Double,Int,Double))=>
      (Double,Double,Int,Double) =
      { case((mn1,mx1,c1,tot1),(mn2,mx2,c2,tot2)) =>
      (scala.math.min(mn1,mn1),scala.math.max(mx1,mx2),c1+c2,tot1+tot2) }
mergeComb: ((Double, Double, Int, Double), (Double, Double, Int, Double)) => (Double, Double, Int, Double)

這邊我就不逐步講了,太easy~

將辛苦寫好的combineByKey啟用吧!

scala> val avgByCust = transByCust.combineByKey(createComb, mergeVal, mergeComb).
      mapValues({case(mn,mx,cnt,tot) => (mn,mx,cnt,tot,tot/cnt)}) ①
avgByCust: org.apache.spark.rdd.RDD[(Int, (Double, Double, Int, Double, Double))] = MapPartitionsRDD[9] at mapValues at <console>:38

①順便將Tuple4換成Tuple5,那的出有哪5個嗎XD~

Show一下吧~

scala> avgByCust.collect()
res0: Array[(Int, (Double, Double, Int, Double, Double))] = Array((34,(3.942,2212.7425,82,77332.59,943.0803658536585)),  (96,(856.2885714285715,4975.08,57,36928.57,647.869649122807)), (4,(63.18222222222222,6614.49,58,41801.35,720.7129310344827)), (16,(359.59875,3220.685,43,40696.020000000004,946.419069767442)), (82,(78.32333333333334,5932.36,66,58722.58,889.7360606060606))....

有了,格式為(顧客ID,(購買最低單價、購買最高單價、數量,總金額,平均)),或許可以考慮將這個RDD輸出成檔案,要怎麼做勒?

scala> avgByCust.map{ case (id, (min, max, cnt, tot, avg)) =>
      "%d#%.2f#%.2f#%d#%.2f#%.2f".format(id, min, max, cnt, tot, avg)}.
      saveAsTextFile("output-avgByCust")

"%d#%.2f#%.2f#%d#%.2f#%.2f"是String format的用法,這就留給各位去玩囉~
存檔勒?呼叫saveAsTextFile(PATH)即可。

PS:#號只是分隔符號而已別被騙了XD~


上一篇
[Spark-Day14](core API實戰篇) Joining by Use Case
下一篇
[Spark-Day16](core API實戰篇) Task、Stages、Checkpoint
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言