DAY 8
Big Data

## [Spark-Day8](core API實戰篇) Pair RDD-1

• PairRDD
• Partition
• Aggregation類操作
• RDD相依性
• 累加器與廣播變數

### Spark PairRDD

[Snippet. 10]建立簡易pairRDD

``````scala> val pair = sc.parallelize(List((1,3),(3,5),(3,9)))
pair: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[56] at parallelize at <console>:24
``````

[Snippet. 11]keys() & values()

``````scala> pair.keys.collect
res38: Array[Int] = Array(1, 3, 3)

scala> pair.values.collect
res39: Array[Int] = Array(3, 5, 9)
``````

[Snippet. 12]groupByKey()

``````scala> pair.groupByKey.collect
res41: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(3)), (3,CompactBuffer(5, 9)))
``````

`CompactBuffer`是一個iterable的RDD集合物件，有機會會再多講一點，反正現在當作一個集合物件就對哩~

[Snippet. 13]mapValues(func)

``````scala> pair.mapValues(x=>x+1).collect
res44: Array[(Int, Int)] = Array((1,4), (3,6), (3,10))
``````

``````scala> pair.mapValues(_+1).collect
res45: Array[(Int, Int)] = Array((1,4), (3,6), (3,10))
``````

[Snippet. 14]flatmapValues(func)

``````scala> pair.flatMapValues(x=>(x to 6)).collect
res46: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (3,5), (3,6))
``````

• (1,3)變成[(1,3),(1,4),(1,5),(1,6)]
• (3,5)變成[(3,5),(3,6)]
• (3,9)為空(9超過6了)
然後再將(1,3),(1,4),(1,5),(1,6),(3,5),(3,6)攤平收集起來成為一個結果

[Snippet. 15]reduceByKey(func)
V要ByKey組合，並傳入一個函式說明同key的V如何被處理：

``````scala> pair.reduceByKey((x,y)=>x+y).collect
res47: Array[(Int, Int)] = Array((1,3), (3,14))
``````

``````scala> pair.reduceByKey(_+_).collect
res48: Array[(Int, Int)] = Array((1,3), (3,14))
``````

`(_+_)`就指定我要把reduce的兩個對象(累積值與新值)進行哪種處理(在此就是疊加起來變成新的累積值)

[Snippet. 16]CountByKey()

``````scala> pair.countByKey
res52: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2)
``````

[Snippet. 17]sortByKey()

``````scala> pair.sortByKey().collect
res50: Array[(Int, Int)] = Array((1,3), (3,5), (3,9))
``````

## use Case（購買行為資料分析）:

``````2015-03-30#6:55 AM#51#68#1#9506.21 ①
2015-03-30#7:39 PM#99#86#5#4107.59
2015-03-30#11:57 AM#79#58#7#2987.22
2015-03-30#12:46 AM#51#50#6#7501.89 ②
2015-03-30#11:39 AM#86#24#5#8370.2
2015-03-30#10:35 AM#63#19#5#1023.57
2015-03-30#2:30 AM#23#77#7#5892.41
2015-03-30#7:41 PM#49#58#4#9298.18
...
...
``````

1. 消費次數最多的贈予一支bear doll（已完成）
2. 給予一次購買兩隻以上的Barbie交易結帳金額95%折的優待
3. 購買五本字典的客戶贈予一支牙刷
4. 贈予總消費金額最高的客戶一件睡衣

PS. 商品ID皆為已知，且贈予的物品也必須成為一筆金額為0的transaction

``````scala> val transFile= sc.textFile("data_transactions.txt") ①
transFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[76] at textFile at <console>:24

scala> val transData = transFile.map(_.split("#")) ②
transData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[77] at map at <console>:26

scala> var transByCust = transData.map(tran => (tran(2).toInt,tran)) ③
transByCust: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[78] at map at <console>:28
``````

① 先用sc讀檔囉，檔案視情況要修改路徑
② 將每行字串切開成為矩陣(每行皆為一個陣列)
③ 從每個矩陣中的抽出顧客ID(也就是陣列中的第3個元素`tran(2)）`與陣列本身，並透過map成新的RDD。最終會成為一個[Int, Array]的pairRDD。
③ 另外注意，這邊我們宣告的方式是`var transByCust`而不是val(value)，這代表transByCust是mutable，也就是可以更新的，方便我們直接修改value而不用另外產生RDD*

*分散式環境下最好還是儘量以immutable的方式使用變數，只是這樣更新map就會麻煩一點。

``````scala> transByCust.keys.distinct.count
res51: Long = 100
``````

OK，進入正題，找消費次數最多的客戶該如何做？

• 將同樣key的資料放在一起，然後count，類似DB的groupBy加count，能用先前介紹的哪個函式嗎？
• 當然是`CountByKey`啦~
``````scala> transByCust.countByKey
res53: scala.collection.Map[Int,Long] = Map(69 -> 7, 88 -> 5, 5 -> 11, 10 -> 7, 56 -> 17, 42 -> 7, 24 -> 9, 37 -> 7, 25 -> 12, 52 -> 9, 14 -> 8, 20 -> 8, 46 -> 9, 93 -> 12, 57 -> 8, 78 -> 11, 29 -> 9, 84 -> 9, 61 -> 8, 89 -> 9, 1 -> 9, 74 -> 11, 6 -> 7, 60 -> 4, 85 -> 9, 28 -> 11, 38 -> 9, 70 -> 8, ....
scala>
``````

``````scala> transByCust.countByKey.toSeq
res1: Seq[(Int, Long)] = ArrayBuffer((69,7), (88,5), (5,11), (10,7), (56,17), (42,7), (24,9), (37,7), (25,12), (52,9), (14,8), (20,8), (46,9), (93,12), (57,8), (78,11), (29,9), (84,9), (61,8), (89,9), (1,9), (74,11), (6,7), (60,4), (85,9), (28,11), (38,9), (70,8), (21,13), (33,9), (92,8), ...
scala>
``````

OK，加上SortBy

``````scala> transByCust.countByKey.toSeq.sortBy(_._2)
res2: Seq[(Int, Long)] = ArrayBuffer((60,4), (88,5), (48,5), (67,5), (36,5), (30,5), (19,6), (62,6), (69,7), (10,7), (42,7), (37,7), (6,7), (9,7), (73,7), (27,7), (12,7), (54,7), (80,7), (72,7), (14,8), (20,8)......
``````

`sortBy(_._2)`是炫炮的placeholder寫法，比較清楚的方式是sortBy(pair=>pair._2)
`pair._2`代表取KV中的V，若要取KV元素的K則寫成(KVvariable._1)的格式。

``````scala> transByCust.countByKey.toSeq.sortBy(pair=>pair._2).last
res4: (Int, Long) = (53,19)

scala>
``````

OK，那要如何將這個結果導入變數勒？那當然是宣告兩個變數來分別接KV啦:

``````scala> val (customerID, purchNum) = transByCust.countByKey.toSeq.sortBy(pair=>pair._2).last
customerID: Int = 53
purchNum: Long = 19
``````

``````scala> var complTrans = Array(Array("2015-03-30", "11:59 PM", "53", "4",
"1", "0.00"))
``````

customerID=53、bear doll ID為4、商品數量為1、金額為0

