iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 13
0
Big Data

Spark 2.0 in Scala系列 第 13

[Spark-Day13](core API實戰篇)Partition

今天來聊聊Spark的partition,也就是RDD的DD(Distributed dataset)。基本上一個RDD會有數個不等的partition所組成,而partition則分散在叢集中的節點上。

再玩一下前幾天用到的transByCust,若忘記建立方法的話,請看以下cheatsheet~XD

scala> val transFile= sc.textFile("data_transactions.txt") 
transFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[76] at textFile at <console>:24

scala> val transData = transFile.map(_.split("#")) 
transData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[77] at map at <console>:26

scala> var transByCust = transData.map(tran => (tran(2).toInt,tran)) 
transByCust: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[78] at map at <console>:28

OK,那我們要拿來幹嘛勒?假設我想用aggregateByKey建立一個K為customerID,V為Array[productID]該如何做呢?:

scala> val prods = transByCust.aggregateByKey(List[String]())(  
          (prods, tran) => prods ::: List(tran(3)),
          (prods1, prods2) => prods1 ::: prods2
      )
prods: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[4] at aggregateByKey at <console>:30

scala> prods.collect
res0: Array[(Int, List[String])] = Array((34,List(34, 99, 66, 58, 59, 17, 58, 5, 38, 58, 93, 22, 48, 50)), (52,List(37, 69, 82, 11, 19, 58, 67, 51, 93)), (96,List(37, 31, 17...

prods ::: List(tran(3))prods1 ::: prods2使用了:::串接兩個List:

scala> List(1,2,3):::List(4,5,6)
res1: List[Int] = List(1, 2, 3, 4, 5, 6)

若是一個元素要加入一個immutable list並返回另外一個immutable list呢?

scala> List(1,2,3):+4
res12: List[Int] = List(1, 2, 3, 4)

scala> 4+:List(1,2,3)
res14: List[Int] = List(4, 1, 2, 3)

將元素加在前面另有常見語法:

scala> val testList = 4::List(1,2,3)
testList: List[Int] = List(4, 1, 2, 3)

這種語法在case中很常見,4在此被稱為List的head,而List(1,2,3則為)List的tail:

scala> testList.head
res0: Int = 4

scala> testList.tail
res1: List[Int] = List(1, 2, 3)

scala> testList.head::testList.tail
res2: List[Int] = List(4, 1, 2, 3)

回到正題,partition size會決定RDD的平行度,因為partition是執行的最小單位(有機會可以多說說partition與spark executor與實體core的關係),RDD的partition size可以透過以下方式觀察:

scala> val list = sc.parallelize(1 to 10)
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> list.partitions.size
res3: Int = 8

//或是

scala> list.getNumPartitions
res4: Int = 8

每個RDD的partition數最佳的情況是適中的數量與大小:

  • partition數量太小可能導致cluster使用率降低,或是單一partition太大無法放入單機的記憶體中
  • partition數量太多可能導致較多的跨節點溝通,造成太多無謂的網路IO
    Spark官方與DataBricks都建議partition數量為叢集中可用核心數的3~4倍為一個參考值

而RDD的partition的數量是如何被決定的呢?決定的方式如下:

  1. 建立RDD時手動指定數量:
scala> val list = sc.parallelize(1 to 10,5)
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> list.getNumPartitions
res6: Int = 5
  1. 若是由RDD+某個transformation操作產生的新RDD,新RDD會繼承父RDD的partition數量(前提是該某個transformation操作會保留此資訊):
scala> val list2 = list.map(_+1)
list2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:26
scala> list2.getNumPartitions
res7: Int = 5

3.若沒有父RDD,則用預設值,預設值可以在conf檔內的spark.default.parallelism設定,若沒有進行相關設定,則預設值為叢集的核心數量:

目前的測試環境是一台PC,可以透過指令檢查核心數:

joechh@joechh:~$ nproc
8
//或是給予更多資訊的lscpu
joechh@joechh:~$ lscpu
Architecture:          x86_64
CPU 作業模式:    32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                8
On-line CPU(s) list:   0-7
每核心執行緒數:2
每通訊端核心數:4
Socket(s):             1
.....中略
NUMA node0 CPU(s):     0-7
.....中略
joechh@joechh:~$ 

這就是為何我預設RDD是切成8個partition。而Spark定義每個RDD partition的元件稱為partititioner,預設為HashPartition。而每個RDD中的element要分到哪個partition在HashPartition中的實作也相當直接,有點類似
partitionIndex = hashCode % numberOfPartitions,但這不是我們需要關注的重點。


Shuffling

資料實際在不同partition間交流傳遞的行為被稱為洗牌(shuffling),這個字眼不僅用在Spark,在Hadoop的MapReduce中從Map轉到Reduce也會有類似的概念。shuffling在叢集環境中下有可能需要跨節點進行資料傳輸,這對spark來說是很昂貴的操作(相較於記憶體速度)。尤其是pairRDD,因為提供了許多不同資料Aggregation的方式,而這些方式經常有可能導致shuffling,而了解哪部份操作可能引發shuffling是很重要的,以先前的例子為例:

val prods = transByCust.aggregateByKey(List[String]())(  
          (prods, tran) => prods ::: List(tran(3)), ①
          (prods1, prods2) => prods1 ::: prods2 ②
      )

前兩天講聚合函數時有提到,①是所謂的SeqOP,此操作會reduce同一個partitoin得到一個單一結果。而多個partition的單一結果聚合,則要靠②Combiner。所以①不會造成shuffling,②才有可能。

Spark會儘量避免shuffling的發生,但mapflatmap這兩個transformation操作會把RDD的partitioner資訊抹除掉,造成接在他之後的很多操作都會產生shuffling的現象,例如:

scala> val data = list.map(x =>(x,x*x)).reduceByKey(_+_).collect

此時reduceByKey操作會引發shuffling。

接在'map/flatMap'後面會引發shuffling的操作有蠻多的:

  • ByKey系列:aggregateByKey , foldByKey, reduceByKey, groupByKey
  • Join系列(之後會講到):join, leftOuterJoin, rightOuterJoin, fullOuterJoin
  • SortByKey(無論如何都會shuffling)

Repartitioning

有時候我們可能會想要重新一個RDD的partition,例如以下情境:

scala> val nums = sc.parallelize(1 to 1000000,100) ①
nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> nums.getNumPartitions
res8: Int = 100
 
scala> val filted = nums.filter(_>999990) ②
filted: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at filter at <console>:26

scala> filted.collect
res9: Array[Int] = Array(999991, 999992, 999993, 999994, 999995, 999996, 999997, 999998, 999999, 1000000)

scala> filted.getNumPartitions ③
res10: Int = 100

①定義一個100個partition的較大資料集合
②進行過慮,在此模擬過濾掉大量數據的場景
③可以看出即便剩下10個元素,filted RDD仍有1百個partition

上述例子最後filted中會有一堆空值的partition,如何降低(或提高)partition勒?:
coalesce吧,格式:coalesce (numPartitions: Int, shuffle: Boolean = false)

scala> val smallfilted=filted.coalesce(2)
smallfilted: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[11] at coalesce at <console>:28

scala> smallfilted.getNumPartitions
res13: Int = 2

這邊要注意coalesce是一個transformation操作所以會返回一個新RDD

那要怎麼增加勒?

scala> val bigfilted=smallfilted.coalesce(10)
bigfilted: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[12] at coalesce at <console>:30

scala> bigfilted.getNumPartitions
res14: Int = 2

疑..怎麼沒增加?

原來是要帶入第二個參數shuffle=true,允許shuffling才行阿(預設為false):

scala> val bigfilted=smallfilted.coalesce(10,true)
bigfilted: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at coalesce at <console>:30

scala> bigfilted.getNumPartitions
res15: Int = 10

另外有一些操作是針對RDD中的每個partition的,例如mapPartitionmapPartitionsWithIndexglom
玩一下glom吧:

scala> val list =List.fill(500)(scala.util.Random.nextInt(100))
list: List[Int] = List(39, 81, 33, 98, 15, 6, 13, 6, 75, 21, 82, 75, 38, 44, 79, 1, 36, 2, 19, 81, 33, 39, 96, 91, 53, 0, 5, 6, 55, 27, 58, 3, 0, 59, 1, 24, 85, 15, 14, 91, 12, 40, 30, 82, 85, 61, 74, 87, 36, 79, 42, 68, 86, ...

scala> val rdd =sc.parallelize(list,30)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:26

scala> rdd.glom.collect
res17: Array[Array[Int]] = Array(Array(39, 81, 33, 98, 15, 6, 13, 6, 75, 21, 82, 75, 38, 44, 79, 1), Array(36, 2, 95, 16, 1, 82, 40, 87, 87, 47, 60, 13, 0, 96, 19, 81, 33), Array(39, 96, 91, 53, 0, 5, 6, 55, 27, 58, 3, 0, 59, 1, 24, 85, 15), Array(14, 91, 12, 40, 30, 82, 85, 61, 74, 87, 36, 79, 42, 68, 86, 17),...

glom會把每個partition的元素收集一個Array,並放在一個Array of Array中


上一篇
[Spark-Day12](core API實戰篇)聚合函數-2
下一篇
[Spark-Day14](core API實戰篇) Joining by Use Case
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言