iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 8
2
Big Data

Spark 2.0 in Scala系列 第 8

[Spark-Day8](core API實戰篇) Pair RDD-1

  • 分享至 

  • twitterImage
  •  

再來準備花幾天的時間提一下幾個Spark核心概念:

  • PairRDD
  • Partition
  • Aggregation類操作
  • RDD相依性
  • 累加器與廣播變數

Spark PairRDD

鍵值對(Key-Value)的結構被應用在許多地方,例如MapReduce、多種NoSQL的底層結構(HBase、Cassandra、Redis)、快取(Memcached),以及HashTable~XD等。Spark中也特地為了鍵值對資料,定義了一種RDD型別,:PairRDD,並提供了一些專屬的函式。

直接來玩一些常用的函式吧,之後再以use case來思考如何應用。
不過玩之前也要先有個pairRDD才能玩阿XD~


[Snippet. 10]建立簡易pairRDD

建立pairRDD最簡單的方式就是傳一個集合給sc.parallelize
其中每個元素都包含兩個值(也就是Tuple)

scala> val pair = sc.parallelize(List((1,3),(3,5),(3,9)))
pair: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[56] at parallelize at <console>:24

看到org.apache.spark.rdd.RDD[(Int, Int)]就代表你建立了一個key與value皆為Int的pairRDD類了~不對阿,為啥輸出不是PairRDD勒?因為這邊是用implict conversion,幫你外掛函式~

來玩一些pairRDD專屬函式吧:


[Snippet. 11]keys() & values()
分別抓出keys與values:
原始:{((1,3),(3,5),(3,9)}

scala> pair.keys.collect
res38: Array[Int] = Array(1, 3, 3)

scala> pair.values.collect
res39: Array[Int] = Array(3, 5, 9)

[Snippet. 12]groupByKey()
用key值分群:
原始:{((1,3),(3,5),(3,9)}

scala> pair.groupByKey.collect
res41: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(3)), (3,CompactBuffer(5, 9)))

CompactBuffer是一個iterable的RDD集合物件,有機會會再多講一點,反正現在當作一個集合物件就對哩~


[Snippet. 13]mapValues(func)
對每個KV中的V執行某個func,但K值不變
原始:{((1,3),(3,5),(3,9)}

scala> pair.mapValues(x=>x+1).collect
res44: Array[(Int, Int)] = Array((1,4), (3,6), (3,10))

比較帥的寫法可用placeholder:

scala> pair.mapValues(_+1).collect
res45: Array[(Int, Int)] = Array((1,4), (3,6), (3,10))

[Snippet. 14]flatmapValues(func)
這就稍微複雜了一點了,flatmapValues有點類似mapValues,
但func一樣會對KV中的V的運算,但結果會是一個集合,而集合會被攤平,每個元素會搭配原本的K:
原始:{((1,3),(3,5),(3,9)}

scala> pair.flatMapValues(x=>(x to 6)).collect
res46: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (3,5), (3,6))

簡單來說:

  • (1,3)變成[(1,3),(1,4),(1,5),(1,6)]
  • (3,5)變成[(3,5),(3,6)]
  • (3,9)為空(9超過6了)
    然後再將(1,3),(1,4),(1,5),(1,6),(3,5),(3,6)攤平收集起來成為一個結果

[Snippet. 15]reduceByKey(func)
V要ByKey組合,並傳入一個函式說明同key的V如何被處理:
原始:{((1,3),(3,5),(3,9)}

scala> pair.reduceByKey((x,y)=>x+y).collect
res47: Array[(Int, Int)] = Array((1,3), (3,14))

這就是MapReduce的Reduce阿阿阿~~!!
可以用placeholder改的炫炮一點:

scala> pair.reduceByKey(_+_).collect
res48: Array[(Int, Int)] = Array((1,3), (3,14))

(_+_)就指定我要把reduce的兩個對象(累積值與新值)進行哪種處理(在此就是疊加起來變成新的累積值)


[Snippet. 16]CountByKey()
將同樣key的資料分群,然後對每個K收集到的元素count:
原始:{((1,3),(3,5),(3,9)}

scala> pair.countByKey
res52: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2)

上圖代表K=1的元素有1個,K=3的元素有2個
這邊特別注意countByKey是一個action操作,回傳的結果直接是一般集合,要特別注意。


[Snippet. 17]sortByKey()
按key排序:
原始:{((1,3),(3,5),(3,9)}

scala> pair.sortByKey().collect
res50: Array[(Int, Int)] = Array((1,3), (3,5), (3,9))

use Case(購買行為資料分析):

假設有一份使用者消費紀錄(data_transactions.txt),內容如下:

2015-03-30#6:55 AM#51#68#1#9506.21 ①
2015-03-30#7:39 PM#99#86#5#4107.59
2015-03-30#11:57 AM#79#58#7#2987.22 
2015-03-30#12:46 AM#51#50#6#7501.89 ②
2015-03-30#11:39 AM#86#24#5#8370.2
2015-03-30#10:35 AM#63#19#5#1023.57
2015-03-30#2:30 AM#23#77#7#5892.41
2015-03-30#7:41 PM#49#58#4#9298.18
...
...

其中欄位(以#為分隔符號)定義如下:
日期 #時間 #顧客ID #購買商品ID #數量 #總價
一筆交易(transaction)即形成一筆資料,也就是一個人可以有多筆資料(如①與②)

我們想要進行以下分析:

  1. 消費次數最多的贈予一支bear doll(已完成)
  2. 給予一次購買兩隻以上的Barbie交易結帳金額95%折的優待
  3. 購買五本字典的客戶贈予一支牙刷
  4. 贈予總消費金額最高的客戶一件睡衣

PS. 商品ID皆為已知,且贈予的物品也必須成為一筆金額為0的transaction


Task.1 搜尋消費最多的使用者,額外贈予一支bear doll(ID=4)

先把檔案讀進來,並處理成(客戶ID,交易紀錄)型式的KV:

scala> val transFile= sc.textFile("data_transactions.txt") ①
transFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[76] at textFile at <console>:24

scala> val transData = transFile.map(_.split("#")) ②
transData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[77] at map at <console>:26

scala> var transByCust = transData.map(tran => (tran(2).toInt,tran)) ③
transByCust: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[78] at map at <console>:28

① 先用sc讀檔囉,檔案視情況要修改路徑
② 將每行字串切開成為矩陣(每行皆為一個陣列)
③ 從每個矩陣中的抽出顧客ID(也就是陣列中的第3個元素tran(2))與陣列本身,並透過map成新的RDD。最終會成為一個[Int, Array]的pairRDD。
③ 另外注意,這邊我們宣告的方式是var transByCust而不是val(value),這代表transByCust是mutable,也就是可以更新的,方便我們直接修改value而不用另外產生RDD*

*分散式環境下最好還是儘量以immutable的方式使用變數,只是這樣更新map就會麻煩一點。

先玩一下資料練練手:
總共有幾個不同的客?該怎麼做?

scala> transByCust.keys.distinct.count
res51: Long = 100

OK,進入正題,找消費次數最多的客戶該如何做?
思路:

  • 將同樣key的資料放在一起,然後count,類似DB的groupBy加count,能用先前介紹的哪個函式嗎?
  • 當然是CountByKey啦~
scala> transByCust.countByKey
res53: scala.collection.Map[Int,Long] = Map(69 -> 7, 88 -> 5, 5 -> 11, 10 -> 7, 56 -> 17, 42 -> 7, 24 -> 9, 37 -> 7, 25 -> 12, 52 -> 9, 14 -> 8, 20 -> 8, 46 -> 9, 93 -> 12, 57 -> 8, 78 -> 11, 29 -> 9, 84 -> 9, 61 -> 8, 89 -> 9, 1 -> 9, 74 -> 11, 6 -> 7, 60 -> 4, 85 -> 9, 28 -> 11, 38 -> 9, 70 -> 8, ....
scala> 

似乎可以用,但要如何取得count數最多的那組的呢?
思路:因為一般Map不能sortByKey(RDD的才能,但countByKey後就轉為一般Map了),所以要將Map轉成元素是KV的List,然後透過List的sort進行排序。

先看看用toSeq轉換集合型態的效果

scala> transByCust.countByKey.toSeq
res1: Seq[(Int, Long)] = ArrayBuffer((69,7), (88,5), (5,11), (10,7), (56,17), (42,7), (24,9), (37,7), (25,12), (52,9), (14,8), (20,8), (46,9), (93,12), (57,8), (78,11), (29,9), (84,9), (61,8), (89,9), (1,9), (74,11), (6,7), (60,4), (85,9), (28,11), (38,9), (70,8), (21,13), (33,9), (92,8), ...
scala> 

OK,加上SortBy

scala> transByCust.countByKey.toSeq.sortBy(_._2)
res2: Seq[(Int, Long)] = ArrayBuffer((60,4), (88,5), (48,5), (67,5), (36,5), (30,5), (19,6), (62,6), (69,7), (10,7), (42,7), (37,7), (6,7), (9,7), (73,7), (27,7), (12,7), (54,7), (80,7), (72,7), (14,8), (20,8)......

sortBy(_._2)是炫炮的placeholder寫法,比較清楚的方式是sortBy(pair=>pair._2)
pair._2代表取KV中的V,若要取KV元素的K則寫成(KVvariable._1)的格式。

因為排序是ascending,所以我們取最後一筆

scala> transByCust.countByKey.toSeq.sortBy(pair=>pair._2).last
res4: (Int, Long) = (53,19)

scala> 

OK,那要如何將這個結果導入變數勒?那當然是宣告兩個變數來分別接KV啦:

scala> val (customerID, purchNum) = transByCust.countByKey.toSeq.sortBy(pair=>pair._2).last
customerID: Int = 53
purchNum: Long = 19

得到customerID啦,但這不是很精確的作法,因為最高可能不只一個,用last可能會失真。但當示範還OK

因為贈品最後要更新到交易紀錄中,先把這些小資料暫存到一個一般集合物件(var型別方便修改)中:

scala> var complTrans = Array(Array("2015-03-30", "11:59 PM", "53", "4",
"1", "0.00"))

參照交易的欄位,建立一筆資料,其中:
customerID=53、bear doll ID為4、商品數量為1、金額為0


上一篇
[Spark-Day7](基礎篇) Broadcast與透過Spark-submit遞交工作
下一篇
[Spark-Day9](core API實戰篇) Pair RDD-2
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
c77s77
iT邦新手 5 級 ‧ 2017-02-04 12:12:47

消費紀錄的檔案放錯了

joechh iT邦新手 5 級 ‧ 2017-02-06 01:07:22 檢舉

已更正,感謝提醒。

我要留言

立即登入留言