iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 3
0
Big Data

Spark 2.0 in Scala系列 第 3

[Spark-Day3](基礎篇) RDD概念與flatMap操作 by Use Case

以前在學程式的時候有沒有感覺老師一直講API很無聊啊,所以我們還是要套個實際範例啊!!後續的內容會有許多XXX概念 by Use Case系列,學了總是要用才有感覺啊~開始吧!

假設公司有個客戶購買紀錄系統,結構如下:

  1. 每次消費,顧客的userID會被紀錄起來,並以csv格式存成client-ids.log
  2. 每日結束會換行(也就是同一日的所有消費客戶會放在同一筆)。

範例如下:

---client-ids.log---
15,16,17,20,20
77,80,94
30,52,13,36
20,31,15

任務需求

client-ids.log為例,請算出消費人次以及沒有重複的消費者數量

OK,首先當然先將檔案讀入啦(一起玩的直接copy上面那個小檔案然後取名即可)

scala> val lines = sc.textFile("/home/joechh/client-ids.log")
lines: org.apache.spark.rdd.RDD[String] = /home/joechh/client-ids.log MapPartitionsRDD[8] at textFile at <console>:24

有沒有看到熟悉的sc.textFile!接著,按一般作法解析每行的字串,將字串切開放入lines中進行下一步操作:

scala> val idsStr=lines.map(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[9] at map at <console>:26

透過map操作導入函式(也就是描述對linesRDD中每行字串的處理邏輯),用split(",")函式把逗點分隔的資料切開,並回傳給idsStr。這樣應該沒有問題了吧?可以得到id集合了吧?用collect()觀察一下好了:

scala> idsStr.collect
res17: Array[Array[String]] = Array(Array(15, 16, 17, 20, 20), Array(77, 80, 94), Array(30, 52, 13, 36), Array(20, 31, 15))

疑疑,我怎麼拿到Array of Array了?Ohh,原來,每個line.split(",")都會回傳一個單獨的陣列(就像Java的split會回傳String[]一樣),將這些集合收集起來,當然就變成兩層Array啦~那該怎麼辦勒?沒問題,攤平他

要降低巢()狀集合的層數,就靠flatmap

def flatMap[U](f:(T) => TraversableOnecep[U]):RDD[U]

先忽略很奇怪的TraversableOnecep[U],把他當作某種集合就好了。基本上flapMap與map很像,會將導入的函式作用於RDD中的每個元素之上,並且會額外的將集合內的集合攤平(精確來說應該是攤平一層),直接看例子吧:

scala> val ids= lines.flatMap(_.split(","))
ids: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at <console>:26

確認一下:

scala> ids.collect
res18: Array[String] = Array(15, 16, 17, 20, 20, 77, 80, 94, 30, 52, 13, 36, 20, 31, 15)

Got it!,得到簡單的一維陣列可以拿來加工囉

終於可以解第一個問題:算出消費人次,RDD上有沒有類似Java的length或是size可用勒,有的,那就是count啦:

scala> ids.distinct.count
res18: Long = 12

再來解第二個問題:沒有重複的消費者數量,RDD上有沒有方便消去重複元數的函式勒?有的,distinct:

scala> val distIds = ids.distinct
distIds: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at distinct at <console>:35

確認一下:

scala> distIds.collect
res0: Array[String] = Array(80, 20, 15, 31, 17, 13, 77, 36, 16, 52, 30, 94)

scala> distIds.count
res2: Long = 12

第一次的Use Case就先到此啦


上一篇
[Spark-Day2](基礎篇) RDD概念與map操作
下一篇
[Spark-Day4](基礎篇) Scala & RDD中的Implicit Conversion
系列文
Spark 2.0 in Scala30

尚未有邦友留言

立即登入留言