SortByKey对<key, value>数据进行按照key进行排序,怎么个排法,我么先看一下spark源码中的一些注释:
两段注释的基本意思差不多:通过RDD的key进行排序,每一个分区包括在一个范围内排好序的元素,然后返回一个有序的list集合,或者按照该key以part-x的形式保存在文件系统上。大概的意思就是这样,就是按照partition排好序,然后返回。举个例子:如果我们要对1到1000的数进行排序,然后分成10个分区间,每一百一个分区,每个分区内排好序,那么所有分区不用排序,本身就按照不同的分区将1-1000内的数,放到了各个分区内。但是各个分区内的数据可能数量不平衡,所以为了将数据量平衡,可能1-200一个分区,200-250一个分区,等等,但是怎么实现这个分区呢,就需要RangePartitioner对象。
往下看,我们可以看到下面代码:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
其中有个RangePartitiner对象,这个RangePartitioner是排序的关键,其构造函数如下:
def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
}
里面除了穿进去的三个参数之外,又多了个samplePointsPerPartitionHint参数,默认是20,这个参数是用于取样,然后确定分区范围。所以我们这里最重要的就是需要怎么确定范围,所以看对应的函数代码:
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
上面代码是确定分区边界的核心代码。看代码:
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
这个sampleSize可以看成是要进行抽样的样例数据的数量,结合之前的代码数量具体为20*分区数,但是最大为10的6次方个数据。sampleSizePerPartition是前面的sampleSize乘以3然后除以分区数,就是平均到每个分区抽样数扩大了3倍,这个其实是个数据量的判断,如果该分区的抽样的数据量大于这个数,即抽样数据大于平均抽样的数据的3倍,说明这个分区的数据量比较大,需要重新抽样。
接下来是调用了RangePartitioner的sketch函数,这个是对各个分区的数据进行采样操作,传入的是rdd.map(_._1), sampleSizePerPartition两个参数,第一个是rdd的key,第二个即前面计算出来的sampleSizePerPartition。下面我们简单看下这个sketch函数:
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
这个地方调用了rdd的mapPartitionWithIndex算子,然后里面就是对各个分区进行抽样操作,三个参数分别为(要进行抽样的数据,抽样的数量,随机种子)。具体的抽样方法就是使用的是水塘抽样算法,这里就不展开讲述了,有兴趣可以继续看SamplingUtils.reservoirSampleAndCount(iter, sampleSizePerPartition, seed)这个方法。返回的数据(sample,n)就是抽出的数据(sample)和该该分区的数据量(n),然后val numItems = sketched.map(_._2).sum就是将所有分区的数据量加起来,得出总的数据量numItems,最后返回数据(numItems, sketched)具体的含义如下(数据总量,(分区id,分区数据量,该分区的抽样数据))。接下来我们返回到上一级rangeBounds中,(numItems, sketched)里面就保存了抽样出来的一些数据和信息。
接下来看变量fraction,接下来是用预抽样的数据量除以总的数据量,这个是个数据比重(常规是小于1),先声明一个长度为K的数组,具体的作用就是存放分区的边界。
下面就调用sketched的foreach函数遍历每个分区的采样数据:首先要理解if (fraction * n > sampleSizePerPartition)这个是说,该分区的数据量乘以平均比例,大于平均每个分区预采样数据的3倍之多,那么说明这个分区的数据量比较大,所有就需要对其进行重新的采样,这样可以保证采样的数据更加有代表性,不然不同数据量的分区权重一样的话,就不太合理了。后面把所有的数据量多的分区id放到imbalancedPartitions中,到后面重新采样。然后再3倍以下的分区用分区数据量除以采样的数据长度,作为该分区所有采样的key的权重,如果这个分区1000条数据,采样的数据为100,那么他的比重就是10,如果其他的分区2000条数据,那么他的比重就很有可能是200。然后变量candidates里面就是所有的key对应的权重。
对于数据量比较大的数据,我们对其分区重新取样比例为fraction的数据,然后将取样出来的所有key的比重都设置为1/fraction。放入到candidates中。到此为止,我们对所有的数据进行了采样,并且对所有的分区的所有的key赋了一个权重,然后我们就要通过这个数据进行分区边界的划分。如果我们对1-1000的数据进行分区,那么这个candidate就可能为((100,20),(300,10),(100,10),(500,30),(400,30),(700,10),(500,10),(800,50),(700,10))类似的,key是抽样的key,value是这个key的权重。
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
这段代码其实也挺简单的,就是变量多了一些。首先我们需要对candidates中的数据按照key进行排序,有兴趣的可以自己看,底层调用的额是java的Arrarys.sort()进行排序,因为我们采样的数据一般情况下不是很多,所以这个排序也并不时。然后我们将所有的key的权重加起来起来除以分区数,就是平均权重的步长,具体解释为,如果要将所有的key的权重平均分配到所有分区中,每个分区的权重平均为多少。
假设我们有聚合并且排好序的((1,10),(10,5),(20,5),(40,15),(60,10),(80,15)),那么权重之和为60,我们需要将这个数据分到4个分区中,那么平均每个分区的权重为15。然后继续向下看代码。当cumWeight >= target时,我们需要将对应的额key放到bounds边界上,所以对应上面例子,我们要放的第一个key就是10,然后此时的target变成了30,那么下一个需要放进去的就是40,最后放进去的就是80。所以此时返回的边界数据就是[10,40,80],就可以用这个边界对所有的数据进行分区,并且分区的数量还是相对平衡的。
分析到这里后面的思路基本上就明白了,先用bound数组最数据进行重新分区,然后各个分区的数据顺序是确定的,然后点对分区内的数据进行排序,最后得到所有的分区都是已经排好序的,分区之间也是排好序的。但是往后面的代码没有继续往下看。不过分区内排序应该也是用的java的自带的Arrays.sort()排序,排序思想为timsort。