iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 9
2
Big Data

Spark 2.0 in Scala系列 第 9

[Spark-Day9](core API實戰篇) Pair RDD-2

繼續進行昨天未完成的課題
我們想要進行以下分析:

  1. 消費次數最多的贈予一支bear doll(已完成)
  2. 給予一次購買兩隻以上的Barbie交易結帳金額95%折的優待
  3. 購買五本字典的客戶贈予一支牙刷
  4. 贈予總消費金額最高的客戶一件睡衣

PS. 商品ID皆為已知,且贈予的物品也必須成為一筆金額為0的transaction


Task.2 一次購買Barbie兩隻以上給予結帳金額95%優惠

Barbie ID:25

思路:
找出交易紀錄中,商品ID=25數量>=2的紀錄,將結帳金額\*0.95,並更新回transByCust:
更新回transByCust等於修改KV中的V值對吧,該用昨天講的哪個方法呢?
可以考慮使用mapValues(func):(對每個KV中的V執行某個func,但K值不變):

scala > transByCust = transByCust.mapValues(tran => { ①
    if(tran(3).toInt == 25 && tran(4).toInt > 1) ②
        tran(5) = (tran(5).toDouble * 0.95).toString ③
    tran ④
})

①傳入一個函式,交易紀錄在函式以tran表示
②若商品ID(tran(3).toInt == 25)與數量(tran(4).toDouble > 1)吻合,進入③
③修改結帳金額(打95折)
④回傳交易紀錄
注意傳入的函式,處理方式是By每個KV中的V,所以撰寫函式時只需要考量如何處理好一個V即可。
因為之前將transByCust宣告成mutable的var,所以可以對裡面的元素進行修改操作。


Task.3 購買五本字典的客戶贈予一支牙刷

字典ID:81
牙刷:70
思路:這次不是用mapValues修改值了,有一種方式是用購買五本字典的客戶以上的客戶,然後用類似任務1的作法手動新增贈品的交易紀錄,但是任務1因為是找極值所以筆數不會多,但如果購買字典5本字典以上的人很多怎麼辦XDD,因此我們要想個自動新增交易紀錄(贈品)的方式:

scala> transByCust = transByCust.flatMapValues(tran => {
       if(tran(3).toInt == 81 && tran(4).toInt >= 5) { ①
           val cloned = tran.clone() ②
           cloned(5) = "0.00"; cloned(3) = "70"; cloned(4) = "1"; ③
           List(tran, cloned) ④
           }
       else
           List(tran) ⑤
       })

可以用flatMapValues,傳入一個會產生集合的函式,它可以讓一個元素變成多個元素,並且幫你攤平,等於由一筆紀錄變成多筆紀錄。
①判斷此筆紀錄是否符合條件
②若符合條件則clone一筆紀錄,不clone自己建一個String Array也OK
③更新陣列值
④回傳一個元素的集合物件,之後會攤平,所以要用一個List將兩個Array wrapper起來
⑤不符合條件的話只能wrapper原始的tran陣列
這題用了一個比較複雜的解法,但可以從範例中體會flatMapValues的用法


Task.4 贈予總消費金額最高的客戶一件睡衣

睡衣ID:63

因為每個人可能用多筆紀錄,該如何將每個人的紀錄各自加總呢?並且目前是String陣列,應該要轉換一下格式

先將紀錄轉換成簡單的(customerID,消費紀錄):

scala> val amouts = transByCust.mapValues(t=>t(5).toDouble)
amouts: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[6] at mapValues at <console>:30

沒錯,就是用mapValues

觀察一下:

scala> amouts.collect
res3: Array[(Int, Double)] = Array((51,9506.21), (99,4107.59), (79,2987.22), (51,7501.89), (86,8370.2), (63,1023.57), (23,5892.41), (49,9298.18), (97,9462.89), (94,4199.15),  (59,5984.68), (8,1859.2)...

進入正題,該如何將每個人的消費金額加總,類似DB中的GroupBy搭配SUM呢?

scala> val total = amouts.reduceByKey(_+_)
total: org.apache.spark.rdd.RDD[(Int, Double)] = ShuffledRDD[7] at reduceByKey at <console>:32

就是reduceByKey啦!

觀察一下:

scala> total.collect
total: Array[(Int, Double)] = Array((34,77332.59), (52,58348.020000000004), (96,36928.57), (4,41801.35), (16,40696.020000000004), (82,58722.58), (66,52130.009999999995), (28,45534.299999999996), (54,36307.04) ...

恩,要排序一下,還記得sortBy嗎:

scala> val total = amouts.reduceByKey(_+_).collect.sortBy(_._2)
total: Array[(Int, Double)] = Array((60,17333.71), (48,17949.850000000002), (30,19194.91), (67,23201.989999999998), (6,30549.28), (44,31061.99), (29,31389.32), (19,33422.229999999996)....

OK,看來第1名消費王就是我們的ID60大戶XD~(堪誤,請看留言)


新增一筆紀錄到前一天的建立的complTrans集合物件中

scala> complTrans = complTrans :+ Array("2015-03-30", "11:59 PM", "76",
"63", "1", "0.00")

:+的用法在官網API文件描述為 def :+(elem: A): Array[A]
也就是在:+左邊放上Array,右邊放上元素,則結果會將元素append在Array最後:

簡易範例:

scala> val a = List(1)
a: List[Int] = List(1)

scala> val b = a :+ 2
b: List[Int] = List(1, 2)

最後將complTrans寫入transByCust中,該如何做?

scala> transByCust = transByCust.union(sc.parallelize(complTrans).map(t =>
(t(2).toInt, t)))
  • sc.parallelize(complTrans)將complTrans轉成RDD
  • .map(t =>(t(2).toInt, t))將complTrans轉成符合transByCust的格式:PairRDD
  • 透過union將合併兩個RDD

上一篇
[Spark-Day8](core API實戰篇) Pair RDD-1
下一篇
[Spark-Day10](Scala番外篇) Currying
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中
0
Mike
iT邦新手 5 級 ‧ 2018-02-01 11:39:26

感謝你的精采文章!
小小勘誤: totalsortBy(_._2) 之後應該是遞增排序,所以消費最高的 ID 應該是最後一筆 76 才對~

joechh iT邦新手 5 級 ‧ 2018-02-01 16:42:16 檢舉

是的,不好意思!!,
我現在覺得list.sortWith(_._2 > _._2).head._1
做成遞減然後取第一筆的Key(也就是ID)似乎更好

0
Darwin Watterson
iT邦好手 1 級 ‧ 2018-03-22 10:47:29

K到第九天就明顯感受到用spark.read.textFile("xxx")與sc.textFile("xxx")的差異了!mapValues方法的實作: tran => {if(...) ...修改陣列值 回傳tran} 用sc讀入的話因為都是RDD,所以正常運作!不過用spark讀入的話, rdd.mapValues實作的話:
error: type mismatch:
found : org.apache.spark.rdd.RDD[(Int, Array[String])]
required: org.apache.spark.sql.Dataset[(Int, Array[String])]
所以RDD有辦法轉Dataset嗎???

0
Darwin Watterson
iT邦好手 1 級 ‧ 2018-03-23 18:08:49

PO 一下我用Java改寫完跑出來的結果: https://goo.gl/CKwGwb
文件有列出哪些人符合95折, 獲得牙刷, 及 flatMapValue與union後transByCust總數的變化!

我要留言

立即登入留言