再來準備花幾天的時間提一下幾個Spark核心概念:
鍵值對(Key-Value)的結構被應用在許多地方,例如MapReduce、多種NoSQL的底層結構(HBase、Cassandra、Redis)、快取(Memcached),以及HashTable~XD等。Spark中也特地為了鍵值對資料,定義了一種RDD型別,:PairRDD
,並提供了一些專屬的函式。
直接來玩一些常用的函式吧,之後再以use case來思考如何應用。
不過玩之前也要先有個pairRDD才能玩阿XD~
[Snippet. 10]建立簡易pairRDD
建立pairRDD
最簡單的方式就是傳一個集合給sc.parallelize
,
其中每個元素都包含兩個值(也就是Tuple)
:
scala> val pair = sc.parallelize(List((1,3),(3,5),(3,9)))
pair: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[56] at parallelize at <console>:24
看到org.apache.spark.rdd.RDD[(Int, Int)]
就代表你建立了一個key與value皆為Int的pairRDD類了~不對阿,為啥輸出不是PairRDD勒?因為這邊是用implict conversion
,幫你外掛函式~
來玩一些pairRDD專屬函式吧:
[Snippet. 11]keys() & values()
分別抓出keys與values:
原始:{((1,3),(3,5),(3,9)}
scala> pair.keys.collect
res38: Array[Int] = Array(1, 3, 3)
scala> pair.values.collect
res39: Array[Int] = Array(3, 5, 9)
[Snippet. 12]groupByKey()
用key值分群:
原始:{((1,3),(3,5),(3,9)}
scala> pair.groupByKey.collect
res41: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(3)), (3,CompactBuffer(5, 9)))
CompactBuffer
是一個iterable的RDD集合物件,有機會會再多講一點,反正現在當作一個集合物件就對哩~
[Snippet. 13]mapValues(func)
對每個KV中的V執行某個func,但K值不變
原始:{((1,3),(3,5),(3,9)}
scala> pair.mapValues(x=>x+1).collect
res44: Array[(Int, Int)] = Array((1,4), (3,6), (3,10))
比較帥的寫法可用placeholder:
scala> pair.mapValues(_+1).collect
res45: Array[(Int, Int)] = Array((1,4), (3,6), (3,10))
[Snippet. 14]flatmapValues(func)
這就稍微複雜了一點了,flatmapValues有點類似mapValues,
但func一樣會對KV中的V的運算,但結果會是一個集合,而集合會被攤平,每個元素會搭配原本的K:
原始:{((1,3),(3,5),(3,9)}
scala> pair.flatMapValues(x=>(x to 6)).collect
res46: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (3,5), (3,6))
簡單來說:
[Snippet. 15]reduceByKey(func)
V要ByKey組合,並傳入一個函式說明同key的V如何被處理:
原始:{((1,3),(3,5),(3,9)}
scala> pair.reduceByKey((x,y)=>x+y).collect
res47: Array[(Int, Int)] = Array((1,3), (3,14))
這就是MapReduce的Reduce阿阿阿~~!!
可以用placeholder改的炫炮一點:
scala> pair.reduceByKey(_+_).collect
res48: Array[(Int, Int)] = Array((1,3), (3,14))
(_+_)
就指定我要把reduce的兩個對象(累積值與新值)進行哪種處理(在此就是疊加起來變成新的累積值)
[Snippet. 16]CountByKey()
將同樣key的資料分群,然後對每個K收集到的元素count:
原始:{((1,3),(3,5),(3,9)}
scala> pair.countByKey
res52: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2)
上圖代表K=1的元素有1個,K=3的元素有2個
這邊特別注意countByKey
是一個action操作,回傳的結果直接是一般集合
,要特別注意。
[Snippet. 17]sortByKey()
按key排序:
原始:{((1,3),(3,5),(3,9)}
scala> pair.sortByKey().collect
res50: Array[(Int, Int)] = Array((1,3), (3,5), (3,9))
假設有一份使用者消費紀錄(data_transactions.txt),內容如下:
2015-03-30#6:55 AM#51#68#1#9506.21 ①
2015-03-30#7:39 PM#99#86#5#4107.59
2015-03-30#11:57 AM#79#58#7#2987.22
2015-03-30#12:46 AM#51#50#6#7501.89 ②
2015-03-30#11:39 AM#86#24#5#8370.2
2015-03-30#10:35 AM#63#19#5#1023.57
2015-03-30#2:30 AM#23#77#7#5892.41
2015-03-30#7:41 PM#49#58#4#9298.18
...
...
其中欄位(以#為分隔符號)定義如下:
日期 #時間 #顧客ID #購買商品ID #數量 #總價
一筆交易(transaction)即形成一筆資料,也就是一個人可以有多筆資料(如①與②)
我們想要進行以下分析:
- 消費次數最多的贈予一支bear doll(已完成)
- 給予一次購買兩隻以上的Barbie交易結帳金額95%折的優待
- 購買五本字典的客戶贈予一支牙刷
- 贈予總消費金額最高的客戶一件睡衣
PS. 商品ID皆為已知,且贈予的物品也必須成為一筆金額為0的transaction
先把檔案讀進來,並處理成(客戶ID,交易紀錄)型式的KV:
scala> val transFile= sc.textFile("data_transactions.txt") ①
transFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[76] at textFile at <console>:24
scala> val transData = transFile.map(_.split("#")) ②
transData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[77] at map at <console>:26
scala> var transByCust = transData.map(tran => (tran(2).toInt,tran)) ③
transByCust: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[78] at map at <console>:28
① 先用sc讀檔囉,檔案視情況要修改路徑
② 將每行字串切開成為矩陣(每行皆為一個陣列)
③ 從每個矩陣中的抽出顧客ID(也就是陣列中的第3個元素tran(2))
與陣列本身,並透過map成新的RDD。最終會成為一個[Int, Array]的pairRDD。
③ 另外注意,這邊我們宣告的方式是var transByCust
而不是val(value),這代表transByCust是mutable,也就是可以更新的,方便我們直接修改value而不用另外產生RDD*
*分散式環境下最好還是儘量以immutable的方式使用變數,只是這樣更新map就會麻煩一點。
先玩一下資料練練手:
總共有幾個不同的客?該怎麼做?
scala> transByCust.keys.distinct.count
res51: Long = 100
OK,進入正題,找消費次數最多的客戶該如何做?
思路:
CountByKey
啦~scala> transByCust.countByKey
res53: scala.collection.Map[Int,Long] = Map(69 -> 7, 88 -> 5, 5 -> 11, 10 -> 7, 56 -> 17, 42 -> 7, 24 -> 9, 37 -> 7, 25 -> 12, 52 -> 9, 14 -> 8, 20 -> 8, 46 -> 9, 93 -> 12, 57 -> 8, 78 -> 11, 29 -> 9, 84 -> 9, 61 -> 8, 89 -> 9, 1 -> 9, 74 -> 11, 6 -> 7, 60 -> 4, 85 -> 9, 28 -> 11, 38 -> 9, 70 -> 8, ....
scala>
似乎可以用,但要如何取得count數最多的那組的呢?
思路:因為一般Map
不能sortByKey(RDD的才能,但countByKey後就轉為一般Map了),所以要將Map轉成元素是KV的List,然後透過List的sort進行排序。
先看看用toSeq轉換集合型態
的效果
scala> transByCust.countByKey.toSeq
res1: Seq[(Int, Long)] = ArrayBuffer((69,7), (88,5), (5,11), (10,7), (56,17), (42,7), (24,9), (37,7), (25,12), (52,9), (14,8), (20,8), (46,9), (93,12), (57,8), (78,11), (29,9), (84,9), (61,8), (89,9), (1,9), (74,11), (6,7), (60,4), (85,9), (28,11), (38,9), (70,8), (21,13), (33,9), (92,8), ...
scala>
OK,加上SortBy
scala> transByCust.countByKey.toSeq.sortBy(_._2)
res2: Seq[(Int, Long)] = ArrayBuffer((60,4), (88,5), (48,5), (67,5), (36,5), (30,5), (19,6), (62,6), (69,7), (10,7), (42,7), (37,7), (6,7), (9,7), (73,7), (27,7), (12,7), (54,7), (80,7), (72,7), (14,8), (20,8)......
sortBy(_._2)
是炫炮的placeholder寫法,比較清楚的方式是sortBy(pair=>pair._2)pair._2
代表取KV中的V,若要取KV元素的K則寫成(KVvariable._1)的格式。
因為排序是ascending,所以我們取最後一筆
scala> transByCust.countByKey.toSeq.sortBy(pair=>pair._2).last
res4: (Int, Long) = (53,19)
scala>
OK,那要如何將這個結果導入變數勒?那當然是宣告兩個變數來分別接KV啦:
scala> val (customerID, purchNum) = transByCust.countByKey.toSeq.sortBy(pair=>pair._2).last
customerID: Int = 53
purchNum: Long = 19
得到customerID
啦,但這不是很精確的作法,因為最高可能不只一個,用last
可能會失真。但當示範還OK
因為贈品最後要更新到交易紀錄中,先把這些小資料暫存到一個一般集合物件(var型別方便修改)中:
scala> var complTrans = Array(Array("2015-03-30", "11:59 PM", "53", "4",
"1", "0.00"))
參照交易的欄位,建立一筆資料,其中:
customerID=53、bear doll ID為4、商品數量為1、金額為0