本节介绍RDD的Transformations函数的原理和做用。还会介绍transformations函数的分类,和不一样类型的转换产生的效果。shell
在RDD中定义了两类操做函数:action和transformations。transformations经过在一些RDD中执行一些数据操做来产生一个或多个新的RDD。这些transformations函数包括:map,filter,join,reduceByKey,cogroup,randomSplit等。apache
也就是说,transformations是一系列函数,它们的输入是一个RDD,输出是一个或多个RDD。但这些函数并不会改变输入RDD的值(这是RDD不可改变的特性),但经过transformations函数的计算会产生一个或多个新的RDD。dom
RDD的transformations函数是懒评估(evaluated lazily)的。所谓懒评估是指:在调用transformations函数时不会当即执行,直到action函数被调用。也就是说,transformations函数的执行是由action函数的调用来触发的。ide
经过使用transformations转换函数,您可使用最终RDD的全部父RDD逐步构建RDD血缘(RDD Lineage)。函数
一个RDD经过transformations转换函数处理后获得的新的结果RDD一般与父RDD的值不一样,该结果RDD的数据集可能变得更大(例如:flatMap,union等),也可能变得更小(例如:filter,distinct,count等),或则大小相同(例如:map等)。oop
注意:有些转换函数也可能发起计算,例如:例如sortBy,zipWithIndex等lua
安装好spark(能够是单机的),并启动spark-shell,在spark-shell中输入一下命令:spa
# 1. 加载一个文件,文件内容不重要
scala> val file = sc.textFile("derby.log")
file: org.apache.spark.rdd.RDD[String] = derby.logMapPartitionsRDD[1] at textFile at <console>:24
# 2. 查看file的结果
scala> file.toDebugString
res14: String =
(2) derby.log MapPartitionsRDD[1] at textFile at <console>:24 []
| derby.log HadoopRDD[0] at textFile at <console>:24 []
# 3. 把文件中的内容按\s+进行分割,也能够执行其余文本操做
scala> val allWords = file.flatMap(_.split("\\s+"))
allWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2]at flatMap at <console>:26
# 4. 查看一下allWords这个RDD的linage或逻辑执行计划
scala> allWords.toDebugString
res0: String =
(2) MapPartitionsRDD[2] at flatMap at <console>:26 []
| derby.log MapPartitionsRDD[1] at textFile at <console>:24 []
| derby.log HadoopRDD[0] at textFile at <console>:24 []
从以上第2步的代码和输出能够看到,执行sc.textFile函数后,会产生两种RDD,一种是:HadoopRDD,一种是:MapPartitionsRDD,其中HadoopRDD是中间状态的RDD,最后获得的是MapPartitionsRDD。这个结果从第1步的系统输出能够看到。scala
而后再对文件中的内容进行处理,会产生新的RDD,产生的新的RDD也是MapPartitionsRDD类型的。从第4步的输出能够看到。orm
为了更好的理解这些输出,咱们能够看一下这些函数的源代码:
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] =withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat],classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
能够看到textFile会调用hadoopFile函数建立一个HadoopRDD,而后再执行map操做,这样就获得了一个MapPartitionsRDD,而后再对该RDD设置一个名称,该RDD的名称被设置为参数path的值。
scala> file.name
res18: String = derby.log
窄转换是基于窄依赖(narrow dependencies)进行的RDD转换。
所谓窄依赖是指:父RDD的每一个分区最多被儿子RDD的一个分区使用。
产生窄转换的函数有:map,filter,distinct,union,基于分区的jion等。
Spark能够将窄转换进行分组,造成一个pipeline,以便提升计算效率。
下图是2个或3个RDD进行各类操做时的依赖关系,他们都是窄依赖。由于,全部的父RDD(前面的RDD)的每一个分区都最多被儿子RDD(后面的RDD)一个分区使用。
基于窄依赖的窄转换操做是高效的,由于窄转换一般能够在同一个节点中完成,不须要有集群中各个计算节点之间的数据传输。也就是,窄依赖不会产生shuffle。
那么,让咱们再进一步思考一下,为何窄依赖不会产生shuffle?
从《RDD的分区原理》一章,咱们能够知晓,RDD的数据是以分区的形式保存在spark的各个worker节点上。这样,由于窄依赖,因此输出的结果RDD(儿子RDD)的分区数据,都是基于父RDD的分区获得的,计算结果是父RDD分区数据的子集,或者和父RDD的分区数相等,这就意味着:计算儿子RDD的分区和须要的父RDD的分区会在同一个节点上,也就是说:转换的过程能够在同一个节点中完成,不会产生worker节点之间的数据传输,因此窄转换不会产生shuffle。
另外,当须要从新计算子RDD某个分区时,因为父RDD的分区只服务于子RDD的某一个分区,因此,复用率是100%,计算过程不会有任何浪费。
宽转换是基于宽依赖(wide dependencies)进行的RDD转换。
所谓宽依赖是指:父RDD的每一个分区均可能被子RDD的多个分区使用。
也就是说,计算单个分区中的记录所需的数据可能存在父RDD的多个分区中。因此,宽转换会发生shuffle过程,有时候把宽转换也称为:shuffle transformations。
具备相同key的全部元组必须最终位于同一分区中,由同一任务处理。为了知足这些操做,Spark必须执行RDD shuffle动做,它在集群之间传输数据,使用一组新的分区建立一个新阶段。以下图所示:
致使宽转换的函数有:groupByKey,reduceByKey等。
进一步思考一下,当须要进行子RDD重算时,因为须要从新计算父RDD的分区数据,但因为父RDD的分区数据被多个子RDD分区依赖,而所有从新计算父RDD的某个分区,其实会形成计算资源的浪费。由于,计算出来的数据不会100%被子RDD所使用。
本节分析了spark RDD的Transformations性质和原理,着重讲解了,窄转换和款转换的特性。窄转换和宽转换是spark的核心概念,要着重进行理解。