Spark分区器HashPartitioner

Spark中分区器直接决定了RDD中分区的个数;也决定了RDD中每条数据通过Shuffle过程属于哪一个分区;也决定了Reduce的个数。这三点看起来是不一样的方面的,但其深层的含义是一致的。
php

咱们须要注意的是,只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None的。
java

注:有的时候,HashPartitioner存在 分区碰撞问题,即不一样的值可能计算出来的分区是同样的 key的hashcode % reduce个数.(例如:java.itcast.cn与php.itcast.cn的hashcode值就是同样的,固然这是小几率事件,可是有的时候还真的是会发生的),因此有的时候须要咱们本身实现分区程序!在spark中实现自定义的分区只须要实现partitioner trait并实现里面的方法!算法


固然:在Spark中,存在两类分区函数:HashPartitioner和RangePartitioner,它们都是继承自Partitioner,主要提供了每一个RDD有几个分区(numPartitions)以及对于给定的值返回一个分区ID(0~numPartitions-1),也就是决定这个值是属于那个分区的。apache




HashPartitioner分区

  HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,若是余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现以下:数组

01 /////////////////////////////////////////////////////////////////////
02  User: 过往记忆
03  Date: 2015-11-10
04  Time: 06:59
05  bolg: http://www.iteblog.com
06  本文地址:http://www.iteblog.com/archives/1522
07  过往记忆博客,专一于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  过往记忆博客微信公共账号:iteblog_hadoop
09 /////////////////////////////////////////////////////////////////////
10  
11 class HashPartitioner(partitions: Int) extends Partitioner {
12   require(partitions >=0, s"Number of partitions ($partitions) cannot be negative.")
13  
14   def numPartitions: Int = partitions
15  
16   def getPartition(key: Any): Int = key match {
17     case null =0
18     case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
19   }
20  
21   override def equals(other: Any): Boolean = other match {
22     case h: HashPartitioner =>
23       h.numPartitions == numPartitions
24     case _ =>
25       false
26   }
27  
28   override def hashCode: Int = numPartitions
29 }

RangePartitioner分区

  从HashPartitioner分区的实现原理咱们能够看出,其结果可能致使每一个分区中数据量的不均匀,极端状况下会致使某些分区拥有RDD的所有数据,这显然不是咱们须要的。而RangePartitioner分区则尽可能保证每一个分区中数据量的均匀,并且分区与分区之间是有序的,也就是说一个分区中的元素确定都是比另外一个分区内的元素小或者大;可是分区内的元素是不能保证顺序的。简单的说就是将必定范围内的数映射到某一个分区内。微信

  前面讨论过,RangePartitioner分区器的主要做用就是将必定范围内的数映射到某一个分区内,因此它的实现中分界的算法尤其重要。这个算法对应的函数是rangeBounds。这个函数主要经历了两个过程:以Spark 1.1版本为界,Spark 1.1版本社区对rangeBounds函数进行了一次重大的重构。less

  由于在Spark 1.1版本以前,RangePartitioner分区对整个数据集进行了2次的扫描:一次是计算RDD中元素的个数;一次是进行采样。具体的代码以下:dom

01 // An array of upper bounds for the first (partitions - 1) partitions
02 private val rangeBounds: Array[K] = {
03     if (partitions == 1) {
04       Array()
05     else {
06       val rddSize = rdd.count()
07       val maxSampleSize = partitions * 20.0
08       val frac =math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
09       val rddSample =rdd.sample(false, frac, 1).map(_._1).collect().sorted
10       if (rddSample.length == 0) {
11         Array()
12       else {
13         val bounds = new Array[K](partitions - 1)
14         for (i <- 0 until partitions - 1) {
15           val index = (rddSample.length - 1) * (i + 1) / partitions
16           bounds(i) = rddSample(index)
17         }
18         bounds
19       }
20     }
21 }

  注意看里面的rddSize的计算和rdd.sample的计算。因此若是你进行一次sortByKey操做就会对RDD扫描三次!而咱们都知道,分区函数性能对整个Spark做业的性能是有直接的影响,并且影响很大,直接影响做业运行的总时间,因此社区不得不对RangePartitioner中的rangeBounds算法进行重构。ide

  在阅读新版本的RangePartitioner以前,建议先去了解一下Reservoir sampling(水塘抽样),由于其中的实现用到了Reservoir sampling算法进行采样。
采样总数函数

  在新的rangeBounds算法总,采样总数作了一个限制,也就是最大只采样1e6的样本(也就是1000000):

1 val sampleSize = math.min(20.0 * partitions, 1e6)

因此若是你的分区个数为5,则采样样本数量为100.0

父RDD中每一个分区采样样本数

  按照咱们的思路,正常状况下,父RDD每一个分区须要采样的数据量应该是sampleSize/rdd.partitions.size,可是咱们看代码的时候发现父RDD每一个分区须要采样的数据量是正常数的3倍。

1 val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt

这是由于父RDD各分区中的数据量可能会出现倾斜的状况,乘于3的目的就是保证数据量小的分区可以采样到足够的数据,而对于数据量大的分区会进行第二次采样。

采样算法

  这个地方就是RangePartitioner分区的核心了,其内部使用的就是水塘抽样,而这个抽样特别适合那种总数很大并且未知,并没有法将全部的数据所有存放到主内存中的状况。也就是咱们不须要事先知道RDD中元素的个数(不须要调用rdd.count()了!)。其主要实现以下:

01 /////////////////////////////////////////////////////////////////////
02  User: 过往记忆
03  Date: 2015-11-10
04  Time: 06:59
05  bolg: http://www.iteblog.com
06  本文地址:http://www.iteblog.com/archives/1522
07  过往记忆博客,专一于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  过往记忆博客微信公共账号:iteblog_hadoop
09 /////////////////////////////////////////////////////////////////////
10  
11 val (numItems, sketched) =RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
12  
13 def sketch[K : ClassTag](
14       rdd: RDD[K],
15       sampleSizePerPartition: Int):(Long, Array[(Int, Int, Array[K])]) = {
16     val shift = rdd.id
17     // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
18     val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
19       val seed = byteswap32(idx ^ (shift << 16))
20       val (sample, n) = SamplingUtils.reservoirSampleAndCount(
21         iter, sampleSizePerPartition, seed)
22       Iterator((idx, n, sample))
23     }.collect()
24     val numItems = sketched.map(_._2.toLong).sum
25     (numItems, sketched)
26 }
27  
28 def reservoirSampleAndCount[T: ClassTag](
29       input: Iterator[T],
30       k: Int,
31       seed: Long = Random.nextLong())
32     : (Array[T], Int) = {
33     val reservoir = new Array[T](k)
34     // Put the first k elements in the reservoir.
35     var = 0
36     while (i < k && input.hasNext) {
37       val item = input.next()
38       reservoir(i) = item
39       i += 1
40     }
41  
42     // If we have consumed all the elements, return them. Otherwise do the replacement.
43     if (i < k) {
44       // If input size < k, trim the array to return only an array of input size.
45       val trimReservoir = new Array[T](i)
46       System.arraycopy(reservoir, 0, trimReservoir, 0, i)
47       (trimReservoir, i)
48     else {
49       // If input size > k, continue the sampling process.
50       val rand = new XORShiftRandom(seed)
51       while (input.hasNext) {
52         val item = input.next()
53         val replacementIndex = rand.nextInt(i)
54         if (replacementIndex < k) {
55           reservoir(replacementIndex) = item
56         }
57         i += 1
58       }
59       (reservoir, i)
60 }
61 }

  RangePartitioner.sketch的第一个参数是rdd.map(_._1),也就是把父RDD的key传进来,由于分区只须要对Key进行操做便可。该函数返回值是val (numItems, sketched) ,其中numItems至关于记录rdd元素的总数;而sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每一个分区采样的数据。

  sketch函数对父RDD中的每一个分区进行采样,并记录下分区的ID和分区中数据总和。

  reservoirSampleAndCount函数就是典型的水塘抽样实现,惟一不一样的是该算法还记录下i的值,这个就是该分区中元素的总和。

  咱们以前讨论过,父RDD各分区中的数据量可能不均匀,在极端状况下,有些分区内的数据量会占有整个RDD的绝大多数的数据,若是按照水塘抽样进行采样,会致使该分区所采样的数据量不足,因此咱们须要对该分区再一次进行采样,而此次采样使用的就是rdd的sample函数。实现以下:

01 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
02 val candidates = ArrayBuffer.empty[(K, Float)]
03 val imbalancedPartitions = mutable.Set.empty[Int]
04 sketched.foreach { case (idx, n, sample) =>
05   if (fraction * n > sampleSizePerPartition) {
06     imbalancedPartitions += idx
07   else {
08     // The weight is 1 over the sampling probability.
09     val weight = (n.toDouble / sample.size).toFloat
10     for (key <- sample) {
11       candidates += ((key, weight))
12     }
13   }
14 }
15 if (imbalancedPartitions.nonEmpty) {
16   // Re-sample imbalanced partitions with the desired sampling probability.
17   val imbalanced = newPartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
18   val seed = byteswap32(-rdd.id - 1)
19   val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()
20   val weight = (1.0 / fraction).toFloat
21   candidates ++= reSampled.map(x => (x, weight))
22 }

咱们能够看到,从新采样的采样因子和Spark 1.1以前的采样因子一致。对于知足于fraction * n > sampleSizePerPartition条件的分区,咱们对其再一次采样。全部采样完的数据所有存放在candidates 中。

确认边界

从上面的采样算法能够看出,对于不一样的分区weight的值是不同的,这个值对应的就是每一个分区的采样间隔。

01 def determineBounds[K : Ordering : ClassTag](
02     candidates: ArrayBuffer[(K, Float)],
03     partitions: Int): Array[K] = {
04   val ordering = implicitly[Ordering[K]]
05   val ordered = candidates.sortBy(_._1)
06   val numCandidates = ordered.size
07   val sumWeights = ordered.map(_._2.toDouble).sum
08   val step = sumWeights / partitions
09   var cumWeight = 0.0
10   var target = step
11   val bounds = ArrayBuffer.empty[K]
12   var = 0
13   var = 0
14   var previousBound = Option.empty[K]
15   while ((i < numCandidates) && (j < partitions - 1)) {
16     val (key, weight) = ordered(i)
17     cumWeight += weight
18     if (cumWeight > target) {
19       // Skip duplicate values.
20       if(previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
21         bounds += key
22         target += step
23         j += 1
24         previousBound = Some(key)
25       }
26     }
27     i += 1
28
相关文章
相关标签/搜索