Spark源码分析-SortByKey源码

  1. 简单介绍

SortByKey对<key, value>数据进行按照key进行排序,怎么个排法,我么先看一下spark源码中的一些注释:

 

        两段注释的基本意思差不多:通过RDD的key进行排序,每一个分区包括在一个范围内排好序的元素,然后返回一个有序的list集合,或者按照该key以part-x的形式保存在文件系统上。大概的意思就是这样,就是按照partition排好序,然后返回。举个例子:如果我们要对1到1000的数进行排序,然后分成10个分区间,每一百一个分区,每个分区内排好序,那么所有分区不用排序,本身就按照不同的分区将1-1000内的数,放到了各个分区内。但是各个分区内的数据可能数量不平衡,所以为了将数据量平衡,可能1-200一个分区,200-250一个分区,等等,但是怎么实现这个分区呢,就需要RangePartitioner对象。

往下看,我们可以看到下面代码:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

   其中有个RangePartitiner对象,这个RangePartitioner是排序的关键,其构造函数如下:

def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
  this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
}

里面除了穿进去的三个参数之外,又多了个samplePointsPerPartitionHint参数,默认是20,这个参数是用于取样,然后确定分区范围。所以我们这里最重要的就是需要怎么确定范围,所以看对应的函数代码:

private var rangeBounds: Array[K] = {
  if (partitions <= 1) {
    Array.empty
  } else {
    // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
    // Cast to double to avoid overflowing ints or longs
    val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
    // Assume the input partitions are roughly balanced and over-sample a little bit.
    val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
    val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
    if (numItems == 0L) {
      Array.empty
    } else {
      // If a partition contains much more than the average number of items, we re-sample from it
      // to ensure that enough items are collected from that partition.
      val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
      val candidates = ArrayBuffer.empty[(K, Float)]
      val imbalancedPartitions = mutable.Set.empty[Int]
      sketched.foreach { case (idx, n, sample) =>
        if (fraction * n > sampleSizePerPartition) {
          imbalancedPartitions += idx
        } else {
          // The weight is 1 over the sampling probability.
          val weight = (n.toDouble / sample.length).toFloat
          for (key <- sample) {
            candidates += ((key, weight))
          }
        }
      }
      if (imbalancedPartitions.nonEmpty) {
        // Re-sample imbalanced partitions with the desired sampling probability.
        val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
        val seed = byteswap32(-rdd.id - 1)
        val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
        val weight = (1.0 / fraction).toFloat
        candidates ++= reSampled.map(x => (x, weight))
      }
      RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
    }
  }
}

上面代码是确定分区边界的核心代码。看代码:

val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)

        这个sampleSize可以看成是要进行抽样的样例数据的数量,结合之前的代码数量具体为20*分区数,但是最大为10的6次方个数据。sampleSizePerPartition是前面的sampleSize乘以3然后除以分区数,就是平均到每个分区抽样数扩大了3倍,这个其实是个数据量的判断,如果该分区的抽样的数据量大于这个数,即抽样数据大于平均抽样的数据的3倍,说明这个分区的数据量比较大,需要重新抽样。

        接下来是调用了RangePartitioner的sketch函数,这个是对各个分区的数据进行采样操作,传入的是rdd.map(_._1), sampleSizePerPartition两个参数,第一个是rdd的key,第二个即前面计算出来的sampleSizePerPartition。下面我们简单看下这个sketch函数:

def sketch[K : ClassTag](
    rdd: RDD[K],
    sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
    val seed = byteswap32(idx ^ (shift << 16))
    val (sample, n) = SamplingUtils.reservoirSampleAndCount(
      iter, sampleSizePerPartition, seed)
    Iterator((idx, n, sample))
  }.collect()
  val numItems = sketched.map(_._2).sum
  (numItems, sketched)
}

        这个地方调用了rdd的mapPartitionWithIndex算子,然后里面就是对各个分区进行抽样操作,三个参数分别为(要进行抽样的数据,抽样的数量,随机种子)。具体的抽样方法就是使用的是水塘抽样算法,这里就不展开讲述了,有兴趣可以继续看SamplingUtils.reservoirSampleAndCount(iter, sampleSizePerPartition, seed)这个方法。返回的数据(sample,n)就是抽出的数据(sample)和该该分区的数据量(n),然后val numItems = sketched.map(_._2).sum就是将所有分区的数据量加起来,得出总的数据量numItems,最后返回数据(numItems, sketched)具体的含义如下(数据总量,(分区id,分区数据量,该分区的抽样数据))。接下来我们返回到上一级rangeBounds中,(numItems, sketched)里面就保存了抽样出来的一些数据和信息。

        接下来看变量fraction,接下来是用预抽样的数据量除以总的数据量,这个是个数据比重(常规是小于1),先声明一个长度为K的数组,具体的作用就是存放分区的边界。

        下面就调用sketched的foreach函数遍历每个分区的采样数据:首先要理解if (fraction * n > sampleSizePerPartition)这个是说,该分区的数据量乘以平均比例,大于平均每个分区预采样数据的3倍之多,那么说明这个分区的数据量比较大,所有就需要对其进行重新的采样,这样可以保证采样的数据更加有代表性,不然不同数据量的分区权重一样的话,就不太合理了。后面把所有的数据量多的分区id放到imbalancedPartitions中,到后面重新采样。然后再3倍以下的分区用分区数据量除以采样的数据长度,作为该分区所有采样的key的权重,如果这个分区1000条数据,采样的数据为100,那么他的比重就是10,如果其他的分区2000条数据,那么他的比重就很有可能是200。然后变量candidates里面就是所有的key对应的权重。

        对于数据量比较大的数据,我们对其分区重新取样比例为fraction的数据,然后将取样出来的所有key的比重都设置为1/fraction。放入到candidates中。到此为止,我们对所有的数据进行了采样,并且对所有的分区的所有的key赋了一个权重,然后我们就要通过这个数据进行分区边界的划分。如果我们对1-1000的数据进行分区,那么这个candidate就可能为((100,20),(300,10),(100,10),(500,30),(400,30),(700,10),(500,10),(800,50),(700,10))类似的,key是抽样的key,value是这个key的权重。

def determineBounds[K : Ordering : ClassTag](
    candidates: ArrayBuffer[(K, Float)],
    partitions: Int): Array[K] = {
  val ordering = implicitly[Ordering[K]]
  val ordered = candidates.sortBy(_._1)
  val numCandidates = ordered.size
  val sumWeights = ordered.map(_._2.toDouble).sum
  val step = sumWeights / partitions
  var cumWeight = 0.0
  var target = step
  val bounds = ArrayBuffer.empty[K]
  var i = 0
  var j = 0
  var previousBound = Option.empty[K]
  while ((i < numCandidates) && (j < partitions - 1)) {
    val (key, weight) = ordered(i)
    cumWeight += weight
    if (cumWeight >= target) {
      // Skip duplicate values.
      if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
        bounds += key
        target += step
        j += 1
        previousBound = Some(key)
      }
    }
    i += 1
  }
  bounds.toArray
}

        这段代码其实也挺简单的,就是变量多了一些。首先我们需要对candidates中的数据按照key进行排序,有兴趣的可以自己看,底层调用的额是java的Arrarys.sort()进行排序,因为我们采样的数据一般情况下不是很多,所以这个排序也并不时。然后我们将所有的key的权重加起来起来除以分区数,就是平均权重的步长,具体解释为,如果要将所有的key的权重平均分配到所有分区中,每个分区的权重平均为多少。

        假设我们有聚合并且排好序的((1,10),(10,5),(20,5),(40,15),(60,10),(80,15)),那么权重之和为60,我们需要将这个数据分到4个分区中,那么平均每个分区的权重为15。然后继续向下看代码。当cumWeight >= target时,我们需要将对应的额key放到bounds边界上,所以对应上面例子,我们要放的第一个key就是10,然后此时的target变成了30,那么下一个需要放进去的就是40,最后放进去的就是80。所以此时返回的边界数据就是[10,40,80],就可以用这个边界对所有的数据进行分区,并且分区的数量还是相对平衡的。

        分析到这里后面的思路基本上就明白了,先用bound数组最数据进行重新分区,然后各个分区的数据顺序是确定的,然后点对分区内的数据进行排序,最后得到所有的分区都是已经排好序的,分区之间也是排好序的。但是往后面的代码没有继续往下看。不过分区内排序应该也是用的java的自带的Arrays.sort()排序,排序思想为timsort。