iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 2
3
Big Data

Spark 2.0 in Scala系列 第 2

[Spark-Day2](基礎篇) RDD概念與map操作

  • 分享至 

  • xImage
  •  

前一天我們建立了一些變數,如lines、bsdLines等,有沒有感覺他們跟一般變數沒兩樣呢?在Spark中這些變數被稱為RDD(Resilient Distributed Datasets)。其實RDD就是我們常見的集合概念,比較特別的是實際資料集可以為橫跨數個結點所組成

RDD有三個特性:

  1. 不可更動(Immutable):每個RDD都是不能被改變的(可以像Java的String一樣),想要更新的?從既有中再建立另一個吧。這樣的作法乍看下可能感覺怪怪的,但仔細想想,要讓資料容易用於分散式系統,Immutable是關鍵的一環,因為每個RDD都保證不會被更動。
  2. 彈性(Resilient):分散式環境中忽然有節點失效是正常的(而且要平常心XD),那上面Spark正在使用或建立的RDD怎麼辦?沒關係,Spark會想辦法幫你重建。這與之後會提到的RDD lineage概念有關。
  3. 分散式(Distributed):資料集可跨多個節點,並儲存在每個節點的記憶體內,恩...所以Spark是記憶體怪獸XD,優點當然就是執行速度較快,但要小心網路資料交換(shuffling)這類昂貴操作。

RDD lineage

以先前的lines、bsdLines為例,因為每個RDD都是immutable,也就是說,只要紀錄了操作與建立行為(有點類似DB的commit log),bsdLines RDD就可以從lines RDD取得,因此可以串出RDD間的關係,例如: line -> badLines -> OtherRDD1 -> OtherRDD2 -> ...。我們將其稱為RDD lineage(族譜)。所以假設存放badLines RDD的節點損毀了(一或多台),但只要儲存line RDD的節點還在的話,是不是就能還原badLines了呢!當然底層通常會搭配一個分散式並且有副本(replication)特性的儲存系統,例如常見的Hadoop HDFS或S3等。


RDD操作

RDD的操作依性質主要分為兩類:

  1. transformation(轉換類操作):操作一個或多個RDD,並產生出新的RDD
  2. action(行動類操作):將操作結果回傳給Driver(可以先想成Spark的進入點程式段),或是對RDD元素執行一些操作,但不會產生新的RDD

之前用的filter就是一個transformation操作,而foreach則是action操作。初學者剛開始可能會分不清楚哪些操作是transformation,哪些又是action?這時候只要記住action類不會產生新的RDD,產生新的RDD就是transformation即可,會比較容易分辨~

開始來玩一些操作吧,先看看常見的map、flatMap、count、distinct


[Snippet.3] RDD map操作
map的宣告格式類似以下

def map(f:(T) => U ):RDD[U]

map會帶入一個函式,從現在的RDD型別(T),透過操作,產生出一個新RDD(型別為U)。輸入輸出的型別可以不同。例如我們可以將整數集合RDD內的每個元素取平方:

scala> val numbers=sc.parallelize(List(1,2,3,4,5)) ①
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> val numberSquared=numbers.map(num=>num*num) ②
numberSquared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26

scala> numberSquared.foreach(num=>print(num+" ")) ③
1 4 9 16 25

① 取得RDD有幾種方式,除了之前那種從外部資源(例如檔案)中取得外,其他常見的方式還有從一般集合中轉換,例如範例中透過sc.parallelize函式將List集合物件轉換成RDD物件。parallelize可以接收Scala中任何實作Seq trait(trait類似Java介面)的集合物件,這邊不用了解那麼多,只要知道可以將集合轉成集合概念的RDD即可~
② 對numbers RDD執行map操作,賦予一個num=>num*num函式,每個numbers的元素都會被平方,而且map是轉換操作,所以會回傳一個numberSquared RDD。
③ 最後透過foreach(行動類操作)印出結果檢查


另外再玩點RDD集合的操作吧:

scala> numberSquared.first
res9: Int = 1

scala> numberSquared.top(2)
res8: Array[Int] = Array(25, 16)

看圖就知道哩,first取出集合中的第一個元素,而top(N)對整數來說就是取出最大的N個

如果重複輸出幾次numberSquared會發現:

scala> numberSquared.foreach(num=>print(num+" "))
1 4 9 16 25

scala> numberSquared.foreach(num=>print(num+" "))
1 16 25 9 4

疑,順序不同了耶?沒有保證List順序嗎?List不是有序的嗎?ListRDD應該也是吧?
的確,RDD內部還是有保證元素順序,但因為foreach這個函式操作是平行印出才導致順序會變動。

可以使用一個collect()的action操作,他會返還一個新的普通集合物件*,常接於一串transformation操作的後面回傳最終的結果,例如這樣的格式:
val normalCollection = KindOfRDD.map(XXX)
.flatMap(YYY)
.filter(ZZZ)
.OtherTransformatioin1
.OtherTransformatioin2
.OtherTransformatioin3.
...
...
.collect()

scala> numberSquared.collect()
res15: Array[Int] = Array(1, 4, 9, 16, 25)
scala> numberSquared.collect()
res15: Array[Int] = Array(1, 4, 9, 16, 25)
scala> numberSquared.collect()
res15: Array[Int] = Array(1, 4, 9, 16, 25)

你看,不管執行幾次順序還是一樣的吧~

collect函式請慎用,雖然它用來Demo一些結果很方便。但因為它是返還非RDD的普通集合物件單一節點(driver執行所在節點),請想像龐大的分散式RDD在沒有經過謹慎處理的情況下,全部傳到單一節點上......痛過才懂XD~


Casting也是寫程式常見的功能,尤其是數字與字串間的轉換,這在RDD中也沒有問題,
最後講一個轉成字串,每個字串頭尾對調,並回傳給另外一個RDD的綜合操作吧:

scala> val castToReverseString=numberSquared.map(_.toString.reverse).collect
castToReverseString: Array[String] = Array(1, 4, 9, 61, 52)

numberSquared.map(\_.toString.reverse).collect這個表達式做了幾件事情:

  1. map函式這我們之前看過了,但有沒有看過_這個陌生的符號?這個是Scala中的placeholder,可以當作"我不care要傳給我的變數叫啥名字,我只要知道有他就可以了",在案例中就是依序傳入1,4,9,16,25啦
  2. 每個傳進來的整數變數(在此用_表示)用.toString轉成字串,再接reverse函式。就會得到上述的結果
  3. 最後用collect action啟動整串執行,並將結果回傳給castToReverseString變數

上一篇
[Spark-Day1](基礎篇)不囉唆,直接上Spark-shell
下一篇
[Spark-Day3](基礎篇) RDD概念與flatMap操作 by Use Case
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言