iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 14
1
Big Data

Spark 2.0 in Scala系列 第 14

[Spark-Day14](core API實戰篇) Joining by Use Case

  • 分享至 

  • xImage
  •  

先前處理的都是單一RDD然後轉換得到我們要的結果,假設我的需要一次處理兩個RDD以上呢?這不就像DB的join嗎?沒錯,接下來要談的主題就是RDD的 Join、Sorting跟Grouping。這些操作在有些時候非常有用,延續先前的use Case:

你已經針對購買清單進行了一些分析,Good Job!,但我們有另外一份商品總表,我們想知道:

  • 每個產品的總銷售金額
  • 哪些產品沒有人購買
  • 一些關於昨日客戶購買行為的相關描述統計:avg、max、min與總金額等

要完成這些任務有很多不同的解法,但在此我們嘗試用Join、Sorting跟Grouping等功能解決這些議題。

還記得我們之前拿到的pairRDD:tranData嗎,若忘記了以下面有提示XD:

scala> val tranFile =sc.textFile("data_transactions.txt")
tranFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[5] at textFile at <console>:24

scala> val tranData=tranFile.map(_.split("#"))
tranData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:26

拿到tranData之後,先將其轉成KV型態的pairRDD以便處理,在此我們K選擇商品ID:

scala> val prodsFromTrans = tranData.map(tran=>(tran(3).toInt,tran)) ①
prodsFromTrans: org.apache.spark.rdd.RDD[(String, Array[String])] = MapPartitionsRDD[7] at map at <console>:28

拿到tranData之後,其實我們目前我們V只需要消費的商品ID跟金額就好,接著簡化我們的KV:

scala> val totals =prodsFromTrans.mapValues(_(5).toDouble)
totals: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[8] at mapValues at <console>:30

任務:每個產品的總銷售金額

OK,我們拿到了(商品ID,消費金額)pairRDD了,如果要得到每個商品的總消費金額該怎麼做?沒錯Aggregate他,最簡單的就用reduceByKey啦~(要用foldByKey、aggregatByKey或groupByKey也都可以~)

scala> val totalsByProds=totals.reduceByKey(_+_)
totalsByProds: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[14] at reduceByKey at <console>:32

觀察一下:

scala> totalsByProds.collect
res13: Array[(String, Double)] = Array((88,46411.33), (4,63520.21000000001), (82,59612.96), (80,34805.73), (19,46486.0), ,...
scala> 

好,現在商品部門給我們一張商品總表:LINK,格式如下:

1#ROBITUSSIN PEAK COLD NIGHTTIME COLD PLUS FLU#9721.89#10
2#Mattel Little Mommy Doctor Doll#6060.78#6
3#Cute baby doll, battery#1808.79#2
4#Bear doll#51.06#6
5#LEGO Legends of Chima#849.36#6
6#LEGO Castle#4777.51#10
7#LEGO Mixels#8720.91#1
8#LEGO Star Wars#7592.44#4
9#LEGO Lord of the Rings#851.67#2

格式為:商品ID#名稱#金額#剩餘數量

一氣呵成讀進來+轉成KV吧(K為商品ID,V為完整record)

scala> val products = sc.textFile("data_products.txt"). ②
       map(line=>line.split("#")).
       map(prod=>(prod(0).toInt,prod))
products: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[28] at map at <console>:26

OK,拿到products的pairRDD了。
要如何處理兩個有相同Key的RDD呢?


Join、left/rightJoin、fullJoin

就像DB一樣,RDD,尤其是pairRDD有Join、left/rightJoin、fullJoin可以用。
直接看個小範例吧:
先定義兩個簡單的KV:a與b

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24

scala> val b = sc.parallelize(List((3,9)))
b: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[92] at parallelize at <console>:24

[Snippet.26] RDDs Join

scala> a.join(b).collect
res68: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))

只會包含兩個RDD皆有的K


[Snippet.27] RDD leftOuterJoin

scala> a.leftOuterJoin(b).collect
res25: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9))))

以left為主(也就是範例中的a),若b沒有對應的K,則值為None


[Snippet.28] RDD rightOuterJoin

scala> a.rightOuterJoin(b).collect
res26: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),9)), (3,(Some(6),9)))

反過來以right為主(也就是範例中的b),若a沒有對應的K,則值為None。但這題剛好沒有產生None都有XD


[Snippet.29] RDD fullOuterJoin

scala> a.fullOuterJoin(b).collect
res27: Array[(Int, (Option[Int], Option[Int]))] = Array((1,(Some(2),None)), (3,(Some(4),Some(9))), (3,(Some(6),Some(9))))

兩邊的K都考慮,對方沒有對應K的V就放None

另外Scala中的Some(Value)用法就像Java的Option一樣,來避免NullPointException用的,取值最常見的方式是getOrElse

OK,回到主題,如果要為被購買的商品添加商品名稱該如何做?

scala> val totalsAndProds = totalsByProds.join(products)
totalsAndProds: org.apache.spark.rdd.RDD[(Int, (Double, Array[String]))] = MapPartitionsRDD[65] at join at <console>:36

沒錯,在本例用leftJoin或Join也可以(購買的商品不會脫離商品總表,因為他是總表的子集合)

看看結果:

scala> totalsAndProds.first
res30: (Int, (Double, Array[String])) = (34,(62592.43000000001,Array(34, GAM X360 Assassins Creed 3, 6363.95, 9)))

任務:哪些產品沒有人購買

該用哪種Join呢?想想:

scala> val totalsWithMissingProds = totalsByProds.rightOuterJoin(products)
totalsWithMissingProds: org.apache.spark.rdd.RDD[(Int, (Option[Double], Array[String]))] = MapPartitionsRDD[71] at rightOuterJoin at <console>:36

沒錯,可以用rightOuterJoin(以總表為主)+判斷是否有欄位為null

先觀察一下:

scala> totalsWithMissingProds.collect
res32: Array[(Int, (Option[Double], Array[String]))] = Array((34,(Some(62592.43000000001),Array(34, GAM X360 Assassins Creed 3, 6363.95, 9))), ...

OK,接著取出KV中V(還是一個(Option[Double], Array[String])的KV)的K是否為null

scala> val missingProds = totalsWithMissingProds.
      filter(_._2._1==None). ①
      map(_._2._2) ②
missingProds: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[76] at map at <console>:40

scala> missingProds.first
res36: Array[String] = Array(20, LEGO Elves, 4589.79, 4)

①先過慮,想想_._2._1是啥意思,第一個_是placeholder,然後兩層KV,第1層取V,第2層取K
②然後將過慮出的record轉成商品資訊,也就是取兩層V(_._2._2)

看看結果:

scala> missingProds.foreach(p=>println(p.mkString(", "))) ①
20, LEGO Elves, 4589.79, 4
63, Pajamas, 8131.85, 3
43, Tomb Raider PC, 2718.14, 1
3, Cute baby doll, battery, 1808.79, 2

①將Array轉常String可用mkString,後面帶入分隔符號,範例中為", "
OK,從結果看出來有四項商品沒有人購買囧,完成。


另外這題還能用subtractByKey來做,直接看例子該如何用吧
[Snippet.30] RDD subtractByKey

scala> val missingProds = products.subtractByKey(totalsByProds).values
missingProds: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[84] at values at <console>:36

其實滿直覺的,就是參數項內的K減去呼叫項的K,剩餘的概念上就是沒有購買的~


除了上述方式,還能用所謂的cogroup來解,標準定義如下:

cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]):
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

看起來有點複雜嗎,基本上就是可以放入多個RDD(目前最多為3),然後這些RDD會GroupByKey這樣的概念,V值是多個可以走訪的Iterable物件,上範例吧!

scala> val prodTotCogroup=totalsByProds.cogroup(products)
prodTotCogroup: org.apache.spark.rdd.RDD[(Int, (Iterable[Double], Iterable[Array[String]]))] = MapPartitionsRDD[86] at cogroup at <console>:36

scala> prodTotCogroup.collect
res55: Array[(Int, (Iterable[Double], Iterable[Array[String]]))] = Array((34,(CompactBuffer(62592.43000000001),CompactBuffer([Ljava.lang.String;@1bb0913a))), (52,(CompactBuffer(57708.119999999995),....

最後去掉totalsByProd項為空的即可得到一樣的結果:

scala> prodTotCogroup.
    filter(_._2._1.isEmpty).
    foreach(x=>println(x._2._2.head.mkString(" "))) ①
63 Pajamas 8131.85 3
20 LEGO Elves 4589.79 4
43 Tomb Raider PC 2718.14 1
3 Cute baby doll, battery 1808.79 2

①的x._2._2.head看起來有點複雜,其實就是取兩層V後,因為只有一個元素(商品總資訊),取head就等於取全部XD

而先前的totalsAndProds也能用cogroup的結果做出來:

scala> val totalsAndProds = prodTotCogroup.
      filter(!_._2._1.isEmpty).
      map(x=>(x._2._2.head(0).toInt,(x._2._1.head,x._2._2.head)))

怎麼辦到的?head(0)啥意思?想一下吧:

  • x._2._2.head(0):商品ID,Array中的第一個元素
  • x._2._1.head:總金額
  • x._2._2.head:商品資訊

上一篇
[Spark-Day13](core API實戰篇)Partition
下一篇
[Spark-Day15](core API實戰篇) Sorting, Grouping by Use Case
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言