DAY 9
0
Big Data

## [Spark-Day9](core API實戰篇) Pair RDD-2

1. 消費次數最多的贈予一支bear doll（已完成）
2. 給予一次購買兩隻以上的Barbie交易結帳金額95%折的優待
3. 購買五本字典的客戶贈予一支牙刷
4. 贈予總消費金額最高的客戶一件睡衣

PS. 商品ID皆為已知，且贈予的物品也必須成為一筆金額為0的transaction

### Task.2 一次購買Barbie兩隻以上給予結帳金額95%優惠

Barbie ID:25

``````scala > transByCust = transByCust.mapValues(tran => { ①
if(tran(3).toInt == 25 && tran(4).toInt > 1) ②
tran(5) = (tran(5).toDouble * 0.95).toString ③
tran ④
})
``````

①傳入一個函式，交易紀錄在函式以tran表示
②若商品ID(`tran(3).toInt == 25`)與數量(`tran(4).toDouble > 1`)吻合，進入③
③修改結帳金額(打95折)
④回傳交易紀錄

### Task.3 購買五本字典的客戶贈予一支牙刷

``````scala> transByCust = transByCust.flatMapValues(tran => {
if(tran(3).toInt == 81 && tran(4).toInt >= 5) { ①
val cloned = tran.clone() ②
cloned(5) = "0.00"; cloned(3) = "70"; cloned(4) = "1"; ③
List(tran, cloned) ④
}
else
List(tran) ⑤
})
``````

①判斷此筆紀錄是否符合條件
②若符合條件則clone一筆紀錄,不clone自己建一個String Array也OK
③更新陣列值
④回傳一個元素的集合物件，之後會攤平，所以要用一個List將兩個Array wrapper起來
⑤不符合條件的話只能wrapper原始的tran陣列

### Task.4 贈予總消費金額最高的客戶一件睡衣

``````scala> val amouts = transByCust.mapValues(t=>t(5).toDouble)
amouts: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[6] at mapValues at <console>:30
``````

``````scala> amouts.collect
res3: Array[(Int, Double)] = Array((51,9506.21), (99,4107.59), (79,2987.22), (51,7501.89), (86,8370.2), (63,1023.57), (23,5892.41), (49,9298.18), (97,9462.89), (94,4199.15),  (59,5984.68), (8,1859.2)...
``````

``````scala> val total = amouts.reduceByKey(_+_)
total: org.apache.spark.rdd.RDD[(Int, Double)] = ShuffledRDD[7] at reduceByKey at <console>:32
``````

``````scala> total.collect
total: Array[(Int, Double)] = Array((34,77332.59), (52,58348.020000000004), (96,36928.57), (4,41801.35), (16,40696.020000000004), (82,58722.58), (66,52130.009999999995), (28,45534.299999999996), (54,36307.04) ...
``````

``````scala> val total = amouts.reduceByKey(_+_).collect.sortBy(_._2)
total: Array[(Int, Double)] = Array((60,17333.71), (48,17949.850000000002), (30,19194.91), (67,23201.989999999998), (6,30549.28), (44,31061.99), (29,31389.32), (19,33422.229999999996)....
``````

OK，看來第1名消費王就是我們的ID60大戶XD~

``````scala> complTrans = complTrans :+ Array("2015-03-30", "11:59 PM", "76",
"63", "1", "0.00")
``````

`:+`的用法在官網API文件描述為`def :+(elem: A): Array[A]`

``````scala> val a = List(1)
a: List[Int] = List(1)

scala> val b = a :+ 2
b: List[Int] = List(1, 2)
``````

``````scala> transByCust = transByCust.union(sc.parallelize(complTrans).map(t =>
(t(2).toInt, t)))
``````
• `sc.parallelize(complTrans)`將complTrans轉成RDD
• `.map(t =>(t(2).toInt, t))`將complTrans轉成符合transByCust的格式:PairRDD
• 透過`union`將合併兩個RDD

12 人訂閱