DAY 9
1
Big Data

## [Spark-Day9](core API實戰篇) Pair RDD-2

1. 消費次數最多的贈予一支bear doll（已完成）
2. 給予一次購買兩隻以上的Barbie交易結帳金額95%折的優待
3. 購買五本字典的客戶贈予一支牙刷
4. 贈予總消費金額最高的客戶一件睡衣

PS. 商品ID皆為已知，且贈予的物品也必須成為一筆金額為0的transaction

Barbie ID:25

``````scala > transByCust = transByCust.mapValues(tran => { ①
if(tran(3).toInt == 25 && tran(4).toInt > 1) ②
tran(5) = (tran(5).toDouble * 0.95).toString ③
tran ④
})
``````

①傳入一個函式，交易紀錄在函式以tran表示
②若商品ID(`tran(3).toInt == 25`)與數量(`tran(4).toDouble > 1`)吻合，進入③
③修改結帳金額(打95折)
④回傳交易紀錄

``````scala> transByCust = transByCust.flatMapValues(tran => {
if(tran(3).toInt == 81 && tran(4).toInt >= 5) { ①
val cloned = tran.clone() ②
cloned(5) = "0.00"; cloned(3) = "70"; cloned(4) = "1"; ③
List(tran, cloned) ④
}
else
List(tran) ⑤
})
``````

①判斷此筆紀錄是否符合條件
②若符合條件則clone一筆紀錄,不clone自己建一個String Array也OK
③更新陣列值
④回傳一個元素的集合物件，之後會攤平，所以要用一個List將兩個Array wrapper起來
⑤不符合條件的話只能wrapper原始的tran陣列

``````scala> val amouts = transByCust.mapValues(t=>t(5).toDouble)
amouts: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[6] at mapValues at <console>:30
``````

``````scala> amouts.collect
res3: Array[(Int, Double)] = Array((51,9506.21), (99,4107.59), (79,2987.22), (51,7501.89), (86,8370.2), (63,1023.57), (23,5892.41), (49,9298.18), (97,9462.89), (94,4199.15),  (59,5984.68), (8,1859.2)...
``````

``````scala> val total = amouts.reduceByKey(_+_)
total: org.apache.spark.rdd.RDD[(Int, Double)] = ShuffledRDD[7] at reduceByKey at <console>:32
``````

``````scala> total.collect
total: Array[(Int, Double)] = Array((34,77332.59), (52,58348.020000000004), (96,36928.57), (4,41801.35), (16,40696.020000000004), (82,58722.58), (66,52130.009999999995), (28,45534.299999999996), (54,36307.04) ...
``````

``````scala> val total = amouts.reduceByKey(_+_).collect.sortBy(_._2)
total: Array[(Int, Double)] = Array((60,17333.71), (48,17949.850000000002), (30,19194.91), (67,23201.989999999998), (6,30549.28), (44,31061.99), (29,31389.32), (19,33422.229999999996)....
``````

OK，看來第1名消費王就是我們的ID60大戶XD~(`堪誤，請看留言`)

``````scala> complTrans = complTrans :+ Array("2015-03-30", "11:59 PM", "76",
"63", "1", "0.00")
``````

`:+`的用法在官網API文件描述為`def :+(elem: A): Array[A]`

``````scala> val a = List(1)
a: List[Int] = List(1)

scala> val b = a :+ 2
b: List[Int] = List(1, 2)
``````

``````scala> transByCust = transByCust.union(sc.parallelize(complTrans).map(t =>
(t(2).toInt, t)))
``````
• `sc.parallelize(complTrans)`將complTrans轉成RDD
• `.map(t =>(t(2).toInt, t))`將complTrans轉成符合transByCust的格式:PairRDD
• 透過`union`將合併兩個RDD

### 3 則留言

0
Mike
iT邦新手 5 級 ‧ 2018-02-01 11:39:26

joechh iT邦新手 5 級 ‧ 2018-02-01 16:42:16 檢舉

0
Darwin Watterson
iT邦研究生 3 級 ‧ 2018-03-22 10:47:29

K到第九天就明顯感受到用spark.read.textFile("xxx")與sc.textFile("xxx")的差異了!mapValues方法的實作: tran => {if(...) ...修改陣列值 回傳tran} 用sc讀入的話因為都是RDD,所以正常運作!不過用spark讀入的話, rdd.mapValues實作的話:
error: type mismatch:
found : org.apache.spark.rdd.RDD[(Int, Array[String])]
required: org.apache.spark.sql.Dataset[(Int, Array[String])]

0
Darwin Watterson
iT邦研究生 3 級 ‧ 2018-03-23 18:08:49

PO 一下我用Java改寫完跑出來的結果: https://goo.gl/CKwGwb