今天來聊聊Spark的partition,也就是RDD的DD(Distributed dataset)。基本上一個RDD會有數個不等的partition所組成,而partition則分散在叢集中的節點上。
再玩一下前幾天用到的transByCust,若忘記建立方法的話,請看以下cheatsheet~XD
scala> val transFile= sc.textFile("data_transactions.txt")
transFile: org.apache.spark.rdd.RDD[String] = data_transactions.txt MapPartitionsRDD[76] at textFile at <console>:24
scala> val transData = transFile.map(_.split("#"))
transData: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[77] at map at <console>:26
scala> var transByCust = transData.map(tran => (tran(2).toInt,tran))
transByCust: org.apache.spark.rdd.RDD[(Int, Array[String])] = MapPartitionsRDD[78] at map at <console>:28
OK,那我們要拿來幹嘛勒?假設我想用aggregateByKey建立一個K為customerID,V為Array[productID]該如何做呢?:
scala> val prods = transByCust.aggregateByKey(List[String]())(
(prods, tran) => prods ::: List(tran(3)),
(prods1, prods2) => prods1 ::: prods2
)
prods: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[4] at aggregateByKey at <console>:30
scala> prods.collect
res0: Array[(Int, List[String])] = Array((34,List(34, 99, 66, 58, 59, 17, 58, 5, 38, 58, 93, 22, 48, 50)), (52,List(37, 69, 82, 11, 19, 58, 67, 51, 93)), (96,List(37, 31, 17...
prods ::: List(tran(3))
與prods1 ::: prods2
使用了:::
串接兩個List:
scala> List(1,2,3):::List(4,5,6)
res1: List[Int] = List(1, 2, 3, 4, 5, 6)
若是一個元素要加入一個immutable list並返回另外一個immutable list呢?
scala> List(1,2,3):+4
res12: List[Int] = List(1, 2, 3, 4)
scala> 4+:List(1,2,3)
res14: List[Int] = List(4, 1, 2, 3)
將元素加在前面另有常見語法:
scala> val testList = 4::List(1,2,3)
testList: List[Int] = List(4, 1, 2, 3)
這種語法在case中很常見,4在此被稱為List的head,而List(1,2,3則為)List的tail:
scala> testList.head
res0: Int = 4
scala> testList.tail
res1: List[Int] = List(1, 2, 3)
scala> testList.head::testList.tail
res2: List[Int] = List(4, 1, 2, 3)
回到正題,partition size會決定RDD的平行度,因為partition是執行的最小單位(有機會可以多說說partition與spark executor與實體core的關係),RDD的partition size可以透過以下方式觀察:
scala> val list = sc.parallelize(1 to 10)
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> list.partitions.size
res3: Int = 8
//或是
scala> list.getNumPartitions
res4: Int = 8
每個RDD的partition數最佳的情況是適中的數量與大小:
可用核心數的3~4倍
為一個參考值而RDD的partition的數量是如何被決定的呢?決定的方式如下:
scala> val list = sc.parallelize(1 to 10,5)
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> list.getNumPartitions
res6: Int = 5
scala> val list2 = list.map(_+1)
list2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:26
scala> list2.getNumPartitions
res7: Int = 5
3.若沒有父RDD,則用預設值,預設值可以在conf檔內的spark.default.parallelism
設定,若沒有進行相關設定,則預設值為叢集的核心數量:
目前的測試環境是一台PC,可以透過指令檢查核心數:
joechh@joechh:~$ nproc
8
//或是給予更多資訊的lscpu
joechh@joechh:~$ lscpu
Architecture: x86_64
CPU 作業模式: 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 8
On-line CPU(s) list: 0-7
每核心執行緒數:2
每通訊端核心數:4
Socket(s): 1
.....中略
NUMA node0 CPU(s): 0-7
.....中略
joechh@joechh:~$
這就是為何我預設RDD是切成8個partition。而Spark定義每個RDD partition的元件稱為partititioner,預設為HashPartition
。而每個RDD中的element要分到哪個partition在HashPartition中的實作也相當直接,有點類似partitionIndex = hashCode % numberOfPartitions
,但這不是我們需要關注的重點。
資料實際在不同partition間交流傳遞的行為被稱為洗牌(shuffling),這個字眼不僅用在Spark,在Hadoop的MapReduce中從Map轉到Reduce也會有類似的概念。shuffling在叢集環境中下有可能需要跨節點進行資料傳輸,這對spark來說是很昂貴的操作(相較於記憶體速度)。尤其是pairRDD
,因為提供了許多不同資料Aggregation的方式,而這些方式經常有可能導致shuffling,而了解哪部份操作可能引發shuffling是很重要的,以先前的例子為例:
val prods = transByCust.aggregateByKey(List[String]())(
(prods, tran) => prods ::: List(tran(3)), ①
(prods1, prods2) => prods1 ::: prods2 ②
)
前兩天講聚合函數時有提到,①是所謂的SeqOP
,此操作會reduce同一個partitoin
得到一個單一結果。而多個partition的單一結果聚合,則要靠②Combiner
。所以①不會造成shuffling,②才有可能。
Spark會儘量避免shuffling的發生,但map
與flatmap
這兩個transformation操作會把RDD的partitioner資訊抹除掉,造成接在他之後的很多操作都會產生shuffling的現象,例如:
scala> val data = list.map(x =>(x,x*x)).reduceByKey(_+_).collect
此時reduceByKey
操作會引發shuffling。
接在'map/flatMap'後面會引發shuffling的操作有蠻多的:
aggregateByKey , foldByKey, reduceByKey, groupByKey
等join, leftOuterJoin, rightOuterJoin, fullOuterJoin
等有時候我們可能會想要重新一個RDD的partition,例如以下情境:
scala> val nums = sc.parallelize(1 to 1000000,100) ①
nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> nums.getNumPartitions
res8: Int = 100
scala> val filted = nums.filter(_>999990) ②
filted: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at filter at <console>:26
scala> filted.collect
res9: Array[Int] = Array(999991, 999992, 999993, 999994, 999995, 999996, 999997, 999998, 999999, 1000000)
scala> filted.getNumPartitions ③
res10: Int = 100
①定義一個100個partition的較大資料集合
②進行過慮,在此模擬過濾掉大量數據的場景
③可以看出即便剩下10個元素,filted RDD仍有1百個partition
上述例子最後filted中會有一堆空值的partition,如何降低(或提高)partition勒?:
用coalesce吧
,格式:coalesce (numPartitions: Int, shuffle: Boolean = false)
scala> val smallfilted=filted.coalesce(2)
smallfilted: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[11] at coalesce at <console>:28
scala> smallfilted.getNumPartitions
res13: Int = 2
這邊要注意coalesce
是一個transformation操作所以會返回一個新RDD
那要怎麼增加勒?
scala> val bigfilted=smallfilted.coalesce(10)
bigfilted: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[12] at coalesce at <console>:30
scala> bigfilted.getNumPartitions
res14: Int = 2
原來是要帶入第二個參數shuffle=true,允許shuffling才行阿(預設為false):
scala> val bigfilted=smallfilted.coalesce(10,true)
bigfilted: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at coalesce at <console>:30
scala> bigfilted.getNumPartitions
res15: Int = 10
另外有一些操作是針對RDD中的每個partition的,例如mapPartition
、mapPartitionsWithIndex
、glom
,
玩一下glom吧:
scala> val list =List.fill(500)(scala.util.Random.nextInt(100))
list: List[Int] = List(39, 81, 33, 98, 15, 6, 13, 6, 75, 21, 82, 75, 38, 44, 79, 1, 36, 2, 19, 81, 33, 39, 96, 91, 53, 0, 5, 6, 55, 27, 58, 3, 0, 59, 1, 24, 85, 15, 14, 91, 12, 40, 30, 82, 85, 61, 74, 87, 36, 79, 42, 68, 86, ...
scala> val rdd =sc.parallelize(list,30)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:26
scala> rdd.glom.collect
res17: Array[Array[Int]] = Array(Array(39, 81, 33, 98, 15, 6, 13, 6, 75, 21, 82, 75, 38, 44, 79, 1), Array(36, 2, 95, 16, 1, 82, 40, 87, 87, 47, 60, 13, 0, 96, 19, 81, 33), Array(39, 96, 91, 53, 0, 5, 6, 55, 27, 58, 3, 0, 59, 1, 24, 85, 15), Array(14, 91, 12, 40, 30, 82, 85, 61, 74, 87, 36, 79, 42, 68, 86, 17),...
glom
會把每個partition的元素收集一個Array,並放在一個Array of Array中