Just for fun,写了一个demo,缓存
val rdd = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c"))) val sorted = rdd.sortByKey() sorted.foreach(println) val c = sorted.count()
打开Spark UI,如图:app
sortByKey
,一个transform算子。为何transform算子会引起一个job呢?
翻看源码,字体
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
有一个RangePartitioner
,点进去,ui
class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
有一个sketch
方法,点进去,spa
def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2).sum (numItems, sketched) }
有个collect
,这个collect
就是rdd的action算子。因此触发了一个job。可是它仍然是一个transform算子。点开佛reach算子触发的job,如图,通过了sortByKey3d
这段RangePartitioner里的代码是干吗呢?就是根据key划分各分区的边界,以决定后续shuffle从新分区的数据去向。code
点开count触发的job,orm
stage3被skip掉了。代码并无缓存却能跳过一个stage。
这是由于sortByKey是个宽依赖算子,发生shuffle,shuffle的过程是上游stage把rdd的数据写出到临时文件里,再由下游stage去读取。sparkContext的生命周期里,这些临时文件(中间结果)一直存在,因此在下一个job触发的时候,根据rdd的依赖会找到这些临时文件,从而起到了“缓存”的做用。
因而,我在sortByKey
后加了cache
。UI图没变(这里不贴了,下面有讲)。意味着sortByKey彷佛又执行了一次。cache没用仍是UI显示方式就这样?blog
为了验证这个问题,我把代码改了生命周期
val rddA = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c"))) .filter(x => x._1 > 1).aggregateByKey(0)((x, y) => { println("agg: " + y); 0 }, (x1, x2) => 0).cache() // 缓存agg后的rdd val c = rddA.count() println("总数:" + c) rddA.foreach(println)
看UI,
如上,彷佛aggregateByKey
又执行了一遍!我代码中在aggregateByKey
里打印了。
能够看到,只打印了一次,说明aggregateByKey
只执行了一次,可是在UI中只能整个stage为灰色或蓝色。
而且这个stage3不会去读取shuffle生成的临时文件,而是直接从cache中读取ShuffledRDD。有图为证,
Shuffle Read没有数据。
PS:的字体确实比win的好看!Ayuthaya或Manaco都比Console好看~