本系列文章源自JerryLead的SparkInternals,本文只是在做者的原文基础上加入本身的理解,批注,和部分源码,做为学习之用
注:原文是基于Spark 1.0.2 , 而本篇笔记是基于spark 2.2.0, 对比后发现核心部分变化不大,依旧值得参考html
典型的 Job 逻辑执行图如上所示,通过下面四个步骤能够获得最终执行结果:java
RDD 能够被 cache 到内存或者 checkpoint 到磁盘上。RDD 中的 partition 个数不固定,一般由用户设定。RDD 和 RDD 之间 partition 的依赖关系能够不是 1 对 1,如上图既有 1 对 1 关系,也有多对多的关系。git
了解了 Job 的逻辑执行图后,写程序时候会在脑中造成相似上面的数据依赖图。然而,实际生成的 RDD 个数每每比咱们想一想的个数多。github
要解决逻辑执行图生成问题,实际须要解决:算法
解决这个问题的初步想法是让每个 transformation() 方法返回(new)一个 RDD。事实也基本如此,只是某些 transformation() 比较复杂,会包含多个子 transformation(),于是会生成多个 RDD。这就是实际 RDD 个数比咱们想象的多一些 的缘由。apache
如何计算每一个 RDD 中的数据?逻辑执行图其实是 computing chain,那么 transformation() 的计算逻辑在哪里被 perform?每一个 RDD 里有 compute() 方法,负责接收来自上一个 RDD 或者数据源的 input records,perform transformation() 的计算逻辑,而后输出 records。数组
/** * RDD.scala * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
//MapPartitionsRDD.scala
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
复制代码
产生哪些 RDD 与 transformation() 的计算逻辑有关,下面讨论一些典型的 transformation() 及其建立的 RDD。官网上已经解释了每一个 transformation 的含义。iterator(split) 的意思是 foreach record in the partition。这里空了不少,是由于那些 transformation() 较为复杂,会产生多个 RDD,具体会在下一节图示出来。数据结构
Transformation | Generated RDDs | Compute() |
---|---|---|
map(func) | MappedRDD | iterator(split).map(f) |
filter(func) | FilteredRDD | iterator(split).filter(f) |
flatMap(func) | FlatMappedRDD | iterator(split).flatMap(f) |
mapPartitions(func) | MapPartitionsRDD | f(iterator(split)) |
mapPartitionsWithIndex(func) | MapPartitionsRDD | f(split.index, iterator(split)) |
sample(withReplacement, fraction, seed) | PartitionwiseSampledRDD | PoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split)) |
pipe(command, [envVars]) | PipedRDD | |
union(otherDataset) | ||
intersection(otherDataset) | ||
distinct([numTasks])) | ||
groupByKey([numTasks]) | ||
reduceByKey(func, [numTasks]) | ||
sortByKey([ascending], [numTasks]) | ||
join(otherDataset, [numTasks]) | ||
cogroup(otherDataset, [numTasks]) | ||
cartesian(otherDataset) | ||
coalesce(numPartitions) | ||
repartition(numPartitions) |
spark 2.x 没找到MappedRDD,FilteredRDD,FlatMappedRDD,都统一改成了MapPartitionsRDD,在执行逻辑上没有改变闭包
RDD 之间的数据依赖问题实际包括三部分:app
第一个问题能够很天然的解决,好比x = rdda.transformation(rddb)
(e.g., x = a.join(b)) 就表示 RDD x 同时依赖于 RDD a 和 RDD b。
第二个问题中的 partition 个数通常由用户指定,不指定的话通常取max(numPartitions[parent RDD 1], .., numPartitions[parent RDD n])
。
object Partitioner {
/** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * * If any of the RDDs already has a partitioner, choose that one. * * Otherwise, we use a default HashPartitioner. For the number of partitions, if * spark.default.parallelism is set, then we'll use the value from SparkContext * defaultParallelism, otherwise we'll use the max number of upstream partitions. * * Unless spark.default.parallelism is set, the number of partitions will be the * same as the number of partitions in the largest upstream RDD, as this should * be least likely to cause out-of-memory errors. * feng:取最大的RDD partitions * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
//feng:先过滤出有partition的rdd
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
//直接取rdd中partitions数量最大的rdd的partitioner
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
//使用默认的HashPartitioner,并行度自定义优先,不然就取rdd中最大的partition数量,其实就是rdd中最大的并行度
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}
}
复制代码
第三个问题比较复杂。须要考虑这个 transformation() 的语义,不一样的 transformation() 的依赖关系不一样。好比 map() 是 1:1,而 groupByKey() 逻辑执行图中的 ShuffledRDD 中的每一个 partition 依赖于 parent RDD 中全部的 partition,还有更复杂的状况。
再次考虑第三个问题,RDD x 中每一个 partition 能够依赖于 parent RDD 中一个或者多个 partition。并且这个依赖能够是彻底依赖或者部分依赖。部分依赖指的是 parent RDD 中某 partition 中一部分数据与 RDD x 中的一个 partition 相关,另外一部分数据与 RDD x 中的另外一个 partition 相关。下图展现了彻底依赖和部分依赖。
前三个是彻底依赖,RDD x 中的 partition 与 parent RDD 中的 partition/partitions 彻底相关。最后一个是部分依赖,RDD x 中的 partition 只与 parent RDD 中的 partition 一部分数据相关,另外一部分数据与 RDD x 中的其余 partition 相关。
在 Spark 中,彻底依赖被称为 NarrowDependency,部分依赖被称为 ShuffleDependency。其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同(mapper 将其 output 进行 partition,而后每一个 reducer 会将全部 mapper 输出中属于本身的 partition 经过 HTTP fetch 获得)。
对于 NarrowDependency,具体 RDD x 中的 partitoin i 依赖 parrent RDD 中一个 partition 仍是多个 partitions,是由 NarrowDependency中的 getParents(partition i)
决定(下图中某些例子会详细介绍)。还有一种 RangeDependency 的彻底依赖,不过该依赖目前只在 UnionRDD 中使用,下面会介绍。
//map, filter等就是OneToOneDependency
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
//判断partitionId的合理性,必须在child RDD的合理partition范围内
if (partitionId >= outStart && partitionId < outStart + length) {
//算出parent RDD中对应的partition id
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
复制代码
因此,总结下来 partition 之间的依赖关系以下:
对于NarrowDependency (N:1)的依赖关系,repartition和coalesce有用到,但NarrowDependency (N:N)具体体如今哪里?
之因此要划分 NarrowDependency 和 ShuffleDependency 是为了生成物理执行图,下一章会具体介绍。
须要注意的是第三种 NarrowDependency (N:N) 不多在两个 RDD 之间出现。由于若是 parent RDD 中的 partition 同时被 child RDD 中多个 partitions 依赖,那么最后生成的依赖图每每与 ShuffleDependency 同样。只是对于 parent RDD 中的 partition 来讲一个是彻底依赖,一个是部分依赖,而箭头数没有少。因此 Spark 定义的 NarrowDependency 实际上是 “each partition of the parent RDD is used by at most one partition of the child RDD“,也就是只有 OneToOneDependency (1:1) 和 NarrowDependency (N:1) 两种状况。可是,本身设计的奇葩 RDD 确实能够呈现出 NarrowDependency (N:N) 的状况。这里描述的比较乱,其实看懂下面的几个典型的 RDD 依赖便可。
在spark 2.1.1 中NarrowDependency的定义是:
Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
只说明每一个child RDD partition依赖于一小组parent RDD的partition,并无写明parent RDD中的每一个partition最多被child RDD中的一个partition使用,先略过NarrowDependency (N:N) 的状况吧
如何计算获得 RDD x 中的数据(records)?下图展现了 OneToOneDependency 的数据依赖,虽然 partition 和 partition 之间是 1:1,但不表明计算 records 的时候也是读一个 record 计算一个 record。 下图右边上下两个 pattern 之间的差异相似于下面两个程序的差异:
code1 of iter.f()
int[] array = {1, 2, 3, 4, 5}
for(int i = 0; i < array.length; i++)
f(array[i])
复制代码
code2 of f(iter)
int[] array = {1, 2, 3, 4, 5}
f(array)
复制代码
1) union(otherRDD)
/** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
union(Seq(first) ++ rest)
}
/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
val partitioners = rdds.flatMap(_.partitioner).toSet //合并同类
//feng:若是要进行合并的两个RDD都包含有partitioner,同时这两个RDD引用的是相同的partitioner时
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
//能够当作是OneToOneDependency,假设RDD a,b,各有p各partitions,用同一个partitioner合并后仍是p个partitions
new PartitionerAwareUnionRDD(this, rdds)
} else {
//有一个RDD不包含partitioner,或者两个RDD的partitioner的算子不相同时
new UnionRDD(this, rdds) //RangeDependency
}
}
复制代码
union() 将两个 RDD 简单合并在一块儿,不改变 partition 里面的数据。RangeDependency 实际上也是 1:1,只是为了访问 union() 后的 RDD 中的 partition 方便,保留了原始 RDD 的 range 边界。
从代码上看,union也能够是OneToOneDependency的
2) groupByKey(numPartitions)
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self)) //默认使用HashPartitioner
}
/** * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
// feng:map side combine没有用,还可能浪费多一倍空间
//CompactBuffer相似ArrayBuffer,但比ArrayBuffer更加高效,只是取出前两个元素的值单独存起来,这种优化对短数组有用
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
/** * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. */
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
//闭包处理
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
//若是父子分区函数相同
if (self.partitioner == Some(partitioner)) {
//这里调用的是mapPartitions.生成MapPartitionsRDD
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else { //partitioner不一样才shuffle,生成ShuffledRDD
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
复制代码
上一章已经介绍了 groupByKey 的数据依赖,这里算是温故而知新 吧。
groupByKey() 只须要将 Key 相同的 records 聚合在一块儿,一个简单的 shuffle 过程就能够完成。ShuffledRDD 中的 compute() 只负责将属于每一个 partition 的数据 fetch 过来,以后使用 mapPartitions() 操做(前面的 OneToOneDependency 展现过)进行 aggregate,生成 MapPartitionsRDD,到这里 groupByKey() 已经结束。最后为了统一返回值接口,将 value 中的 ArrayBuffer[] 数据结构抽象化成 Iterable[]。
groupByKey() 没有在 map 端进行 combine,由于 map 端 combine 只会省掉 partition 里面重复 key 占用的空间,当重复 key 特别多时,能够考虑开启 combine。
这里的 ArrayBuffer 实际上应该是 CompactBuffer,An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
在spark 2.1.1中groupByKey并无能够开启mapSideCombine的设置,设置死就是mapSideCombine = false
另外,在哪里体现出最后的结果是MapPartitionsRDD?
ParallelCollectionRDD 是最基础的 RDD,直接从 local 数据结构 create 出的 RDD 属于这个类型,好比
val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3)
复制代码
生成的 pairs 就是 ParallelCollectionRDD。
/** SparkContext.scala * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call * to parallelize and before the first action on the RDD, the resultant RDD will reflect the * modified collection. Pass a copy of the argument to avoid this. * 懒执行,seq最好是不可变的,不然若在执行前改变了seq,那么parallelize的值也会改变 */
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
复制代码
2) reduceByKey(func, numPartitions)
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. */
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
//默认是mapSideCombine: Boolean = true
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
复制代码
reduceByKey() 至关于传统的 MapReduce,整个数据流也与 Hadoop 中的数据流基本同样。reduceByKey() 默认在 map 端开启 combine(),所以在 shuffle 以前先经过 mapPartitions 操做进行 combine,获得 MapPartitionsRDD,而后 shuffle 获得 ShuffledRDD,而后再进行 reduce(经过 aggregate + mapPartitions() 操做来实现)获得 MapPartitionsRDD。
和groupByKey同样,partitioner相同是能够生成MapPartitionsRDD,不必定是ShuffledRDD
3) distinct(numPartitions)
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
/** * Return a new RDD containing the distinct elements in this RDD. * feng:distinct直接使用了reduceByKey */
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, ).map(_._1)
}
复制代码
distinct() 功能是 deduplicate RDD 中的全部的重复数据。因为重复数据可能分散在不一样的 partition 里面,所以须要 shuffle 来进行 aggregate 后再去重。然而,shuffle 要求数据类型是 <K, V>
。若是原始数据只有 Key(好比例子中 record 只有一个整数),那么须要补充成 <K, null>
。这个补充过程由 map() 操做完成,生成 MappedRDD。而后调用上面的 reduceByKey() 来进行 shuffle,在 map 端进行 combine,而后 reduce 进一步去重,生成 MapPartitionsRDD。最后,将 <K, null>
还原成 K,仍然由 map() 完成,生成 MappedRDD。蓝色的部分就是调用的 reduceByKey()。
4) cogroup(otherRDD, numPartitions)
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
//默认HashPartitioner
cogroup(other, new HashPartitioner(numPartitions))
}
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
//MapPartitionsRDD,将cg的value转化为Iterable
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
复制代码
与 groupByKey() 不一样,cogroup() 要 aggregate 两个或两个以上的 RDD。那么 CoGroupedRDD 与 RDD a 和 RDD b 的关系都必须是 ShuffleDependency 么?是否存在 OneToOneDependency?
首先要明确的是 CoGroupedRDD 存在几个 partition 能够由用户直接设定,与 RDD a 和 RDD b 无关。然而,若是 CoGroupedRDD 中 partition 个数与 RDD a/b 中的 partition 个数不同,那么不可能存在 1:1 的关系。
再次,cogroup() 的计算结果放在 CoGroupedRDD 中哪一个 partition 是由用户设置的 partitioner 肯定的(默认是 HashPartitioner)。那么能够推出:即便 RDD a/b 中的 partition 个数与 CoGroupedRDD 中的同样,若是 RDD a/b 中的 partitioner 与 CoGroupedRDD 中的不同,也不可能存在 1:1 的关系。好比,在上图的 example 里面,RDD a 是 RangePartitioner,b 是 HashPartitioner,CoGroupedRDD 也是 RangePartitioner 且 partition 个数与 a 的相同。那么很天然地,a 中的每一个 partition 中 records 能够直接送到 CoGroupedRDD 中对应的 partition。RDD b 中的 records 必须再次进行划分与 shuffle 后才能进入对应的 partition。
最后,通过上面分析,对于两个或两个以上的 RDD 聚合,当且仅当聚合后的 RDD 中 partitioner 类别及 partition 个数与前面的 RDD 都相同,才会与前面的 RDD 构成 1:1 的关系。不然,只能是 ShuffleDependency.这个算法对应的代码能够在CoGroupedRDD.getDependencies()
中找到,虽然比较难理解。
/*CoGroupedRDD.scala*/
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
//scala中,若是比较的对象是null那么是判断引用否相等,不然调用的是equals
//当前rdd的partitioner和聚合后的partitioner对比,若是是hash partitioner,equals方法要对比类型和分区数
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
//负责给出 RDD 中有多少个 partition,以及每一个 partition 如何序列化
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _, _] =>
None
case _ =>
Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
}
}.toArray)
}
array
}
复制代码
Spark 代码中如何表示 CoGroupedRDD 中的 partition 依赖于多个 parent RDDs 中的 partitions?
首先,将 CoGroupedRDD 依赖的全部 RDD 放进数组 rdds[RDD] 中。再次,foreach i,若是 CoGroupedRDD 和 rdds(i) 对应的 RDD 是 OneToOneDependency 关系,那么 Dependecy[i] = new OneToOneDependency(rdd),不然 = new ShuffleDependency(rdd)。最后,返回与每一个 parent RDD 的依赖关系数组 deps[Dependency]。
Dependency 类中的 getParents(partition id) 负责给出某个 partition 按照该 dependency 所依赖的 parent RDD 中的 partitions: List[Int]。
getPartitions() 负责给出 RDD 中有多少个 partition,以及每一个 partition 如何序列化。
5) intersection(otherRDD)
def intersection(other: RDD[T]): RDD[T] = withScope {
//MapPartitionsRDD => CoGroupedRDD
//两个RDD分别先map
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
// => MapPartitionsRDD
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys // => MapPartitionsRDD
}
复制代码
intersection() 功能是抽取出 RDD a 和 RDD b 中的公共数据。先使用 map() 将 RDD[T] 转变成 RDD[(T, null)],这里的 T 只要不是 Array 等集合类型便可。接着,进行 a.cogroup(b),蓝色部分与前面的 cogroup() 同样。以后再使用 filter() 过滤掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 为空的 records,获得 FilteredRDD。最后,使用 keys() 只保留 key 便可,获得 MappedRDD。
/** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
//flatMapValues,不是flatMap
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
//遍历逻辑同下
// while(pair._1.iterator.hasNext){
// v <- pair._1.iterator.next
// while(pair._2.iterator.hasNext){
// w <- pair._2.iterator.next
// yield (v, w)
// }
// }
)
}
复制代码
join() 将两个 RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一块儿。与 intersection() 相似,首先进行 cogroup(),获得<K, (Iterable[V1], Iterable[V2])>
类型的 MappedValuesRDD,而后对 Iterable[V1] 和 Iterable[V2] 作笛卡尔集,并将集合 flat() 化。
这里给出了两个 example,第一个 example 的 RDD 1 和 RDD 2 使用 RangePartitioner 划分,而 CoGroupedRDD 使用 HashPartitioner,与 RDD 1/2 都不同,所以是 ShuffleDependency。第二个 example 中, RDD 1 事先使用 HashPartitioner 对其 key 进行划分,获得三个 partition,与 CoGroupedRDD 使用的 HashPartitioner(3) 一致,所以数据依赖是 1:1。若是 RDD 2 事先也使用 HashPartitioner 对其 key 进行划分,获得三个 partition,那么 join() 就不存在 ShuffleDependency 了,这个 join() 也就变成了 hashjoin()。
7) sortByKey(ascending, numPartitions)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
复制代码
sortByKey() 将 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的数据依赖很简单,先使用 shuffle 将 records 汇集在一块儿(放到对应的 partition 里面),而后将 partition 内的全部 records 按 key 排序,最后获得的 MapPartitionsRDD 中的 records 就有序了。
目前 sortByKey() 先使用 Array 来保存 partition 中全部的 records,再排序。
8) cartesian(otherRDD)
/*RDD.scala*/
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
}
复制代码
Cartesian 对两个 RDD 作笛卡尔集,生成的 CartesianRDD 中 partition 个数 = partitionNum(RDD a) * partitionNum(RDD b)。
这里的依赖关系与前面的不太同样,CartesianRDD 中每一个partition 依赖两个 parent RDD,并且其中每一个 partition 彻底依赖 RDD a 中一个 partition,同时又彻底依赖 RDD b 中另外一个 partition。这里没有红色箭头,由于全部依赖都是 NarrowDependency。
CartesianRDD.getDependencies() 返回 rdds[RDD a, RDD b]。CartesianRDD 中的 partiton i 依赖于 (RDD a).List(i / numPartitionsInRDDb) 和 (RDD b).List(i % numPartitionsInRDDb)。
override def getPartitions: Array[Partition] = {
// create the cross product split
//feng:Cartesian 对两个 RDD 作笛卡尔集
// 生成的 CartesianRDD 中 partition 个数 = partitionNum(RDD a) * partitionNum(RDD b)
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
复制代码
9) coalesce(numPartitions, shuffle = false)
/** RDD.scala *当shuffle: Boolean = false,扩增partition数量不会生效 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
//在每一个 partition 中,第一个元素的key
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1 //key递增
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
//不shuffle就生成CoalescedRDD
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
复制代码
coalesce() 能够将 parent RDD 的 partition 个数进行调整,好比从 5 个减小到 3 个,或者从 5 个增长到 10 个。须要注意的是当 shuffle = false 的时候,是不能增长 partition 个数的(不能从 5 个变为 10 个)。
coalesce() 的核心问题是如何确立 CoalescedRDD 中 partition 和其 parent RDD 中 partition 的关系。
Example: a.coalesce(3, shuffle = false)
展现了 N:1 的 NarrowDependency。var position = (new Random(index)).nextInt(numPartitions);position = position + 1
计算获得,index 是该 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 个数。接下来元素的 key 是递增的,而后 shuffle 后的 ShuffledRDD 能够获得均分的 records,而后通过复杂算法来创建 ShuffledRDD 和 CoalescedRDD 之间的数据联系,最后过滤掉 key,获得 coalesce 后的结果 MappedRDD。10) repartition(numPartitions)
等价于 coalesce(numPartitions, shuffle = true)
/* * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. * feng:减低partition数量可用coalesce代替,避免shuffle */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
复制代码
combineByKey()
分析了这么多 RDD 的逻辑执行图,它们之间有没有共同之处?若是有,是怎么被设计和实现的?
仔细分析 RDD 的逻辑执行图会发现,ShuffleDependency 左边的 RDD 中的 record 要求是 <key, value> 型的,通过 ShuffleDependency 后,包含相同 key 的 records 会被 aggregate 到一块儿,而后在 aggregated 的 records 上执行不一样的计算逻辑。实际执行时(后面的章节会具体谈到)不少 transformation() 如 groupByKey(),reduceByKey() 是边 aggregate 数据边执行计算逻辑的,所以共同之处就是 aggregate 同时 compute()。Spark 使用 combineByKey() 来实现这个 aggregate + compute() 的基础操做。
combineByKey() 的定义以下:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
复制代码
其中主要有三个参数 createCombiner,mergeValue 和 mergeCombiners。简单解释下这三个函数及 combineByKey() 的意义,注意它们的类型:
假设一组具备相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner 将第一个 record 的 value 初始化为 c (好比,c = value),而后从第二个 record 开始,来一个 record 就使用 mergeValue(c, record.value) 来更新 c,好比想要对这些 records 的全部 values 作 sum,那么使用 c = c + record.value。等到 records 所有被 mergeValue(),获得结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来,combineByKey() 使用前面的方法不断计算获得 c'。如今若是要求这两组 records 总的 combineByKey() 后的结果,那么可使用 final c = mergeCombiners(c, c') 来计算。
至此,咱们讨论了如何生成 job 的逻辑执行图,这些图也是 Spark 看似简单的 API 背后的复杂计算逻辑及数据依赖关系。
整个 job 会产生哪些 RDD 由 transformation() 语义决定。一些 transformation(), 好比 cogroup() 会被不少其余操做用到。
RDD 自己的依赖关系由 transformation() 生成的每个 RDD 自己语义决定。如 CoGroupedRDD 依赖于全部参加 cogroup() 的 RDDs。
RDD 中 partition 依赖关系分为 NarrowDependency 和 ShuffleDependency。前者是彻底依赖,后者是部分依赖。NarrowDependency 里面又包含多种状况,只有先后两个 RDD 的 partition 个数以及 partitioner 都同样,才会出现 NarrowDependency。
从数据处理逻辑的角度来看,MapReduce 至关于 Spark 中的 map() + reduceByKey(),但严格来说 MapReduce 中的 reduce() 要比 reduceByKey() 的功能强大些,详细差异会在 Shuffle details 一章中继续讨论。
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
/** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. * * 这里只说明每一个child RDD partition依赖于一小组parent RDD的partition * 并无写明parent RDD中的每一个partition最多被child RDD中的一个partition使用,须要NarrowDependency的更准肯定义 * NarrowDependency不须要shuffle */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
/** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set * explicitly then the default serializer, as specified by `spark.serializer` * config option, will be used. * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
/** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. * feng:parent和child里面的partitions是一一对应 eg:map, filter * partitionId就是partition在RDD中的序号, 因此若是是一一对应, 那么parent和child中的partition的序号应该是同样的 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range * parent RDD中的某个区间的partitions对应到child RDD中的某个区间的partitions * 因为是range, 因此直接记录起点和length就能够了, 没有必要加入每一个中间rdd * eg: 目前只在union中使用,多个parent RDD合并到一个child RDD, 故每一个parent RDD都对应到child RDD中的一个区间 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
//判断partitionId的合理性,必须在child RDD的合理partition范围内
if (partitionId >= outStart && partitionId < outStart + length) {
//算出parent RDD中对应的partition id
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
复制代码