以前在學程式的時候有沒有感覺老師一直講API很無聊啊,所以我們還是要套個實際範例啊!!後續的內容會有許多XXX概念 by Use Case系列,學了總是要用才有感覺啊~開始吧!
假設公司有個客戶購買紀錄系統,結構如下:
userID
會被紀錄起來,並以csv格式存成client-ids.log範例如下:
---client-ids.log---
15,16,17,20,20
77,80,94
30,52,13,36
20,31,15
client-ids.log為例,請算出消費人次以及沒有重複的消費者數量
OK,首先當然先將檔案讀入啦(一起玩的直接copy上面那個小檔案然後取名即可)
scala> val lines = sc.textFile("/home/joechh/client-ids.log")
lines: org.apache.spark.rdd.RDD[String] = /home/joechh/client-ids.log MapPartitionsRDD[8] at textFile at <console>:24
有沒有看到熟悉的sc.textFile
!接著,按一般作法解析每行的字串,將字串切開放入lines
中進行下一步操作:
scala> val idsStr=lines.map(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[9] at map at <console>:26
透過map操作導入函式(也就是描述對linesRDD中每行字串的處理邏輯),用split(",")
函式把逗點分隔的資料切開,並回傳給idsStr。這樣應該沒有問題了吧?可以得到id集合了吧?用collect()
觀察一下好了:
scala> idsStr.collect
res17: Array[Array[String]] = Array(Array(15, 16, 17, 20, 20), Array(77, 80, 94), Array(30, 52, 13, 36), Array(20, 31, 15))
疑疑,我怎麼拿到Array of Array了?Ohh,原來,每個line.split(",")
都會回傳一個單獨的陣列(就像Java的split會回傳String[]一樣),將這些集合收集起來,當然就變成兩層Array啦~那該怎麼辦勒?沒問題,攤平他
要降低巢(潮)狀集合的層數,就靠flatmap
啦
def flatMap[U](f:(T) => TraversableOnecep[U]):RDD[U]
先忽略很奇怪的TraversableOnecep[U],把他當作某種集合就好了。基本上flapMap與map很像,會將導入的函式作用於RDD中的每個元素之上,並且會額外的將集合內的集合攤平
(精確來說應該是攤平一層),直接看例子吧:
scala> val ids= lines.flatMap(_.split(","))
ids: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at <console>:26
確認一下:
scala> ids.collect
res18: Array[String] = Array(15, 16, 17, 20, 20, 77, 80, 94, 30, 52, 13, 36, 20, 31, 15)
Got it!,得到簡單的一維陣列可以拿來加工囉
終於可以解第一個問題:算出消費人次,RDD上有沒有類似Java的length或是size可用勒,有的,那就是count
啦:
scala> ids.distinct.count
res18: Long = 12
再來解第二個問題:沒有重複的消費者數量,RDD上有沒有方便消去重複元數的函式勒?有的,distinct
:
scala> val distIds = ids.distinct
distIds: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at distinct at <console>:35
確認一下:
scala> distIds.collect
res0: Array[String] = Array(80, 20, 15, 31, 17, 13, 77, 36, 16, 52, 30, 94)
scala> distIds.count
res2: Long = 12
第一次的Use Case就先到此啦