1.概念:数据库
RDD是spark整个体系中最基础核心的概念,RDD(Resilient Distributed DataSet)即弹性分布式数据集数组
弹性:数据结构
RDD支持横向多分区,纵向操做内存不足写入磁盘,hdfs等,实现数据在内存和外存的灵活切换。架构
RDD能够在存储在内存和磁盘之间,而且自动或者手动切换分布式
RDD具备良好的容错性(即RDD能够经过血统转化为其余RDD)ide
Task若是失败,会进行特定次数的重试(default 4)函数
Stage若是失败会就行特定次数的重试oop
RDD能够存储任意类型的数据性能
RDD的分区数目能够自行设定this
分布式:
RDD能够存储在多台主机的内存或者磁盘之上。每一个RDD能够分为多个分区,每一个分区就是一个数据集片断,而且一个RDD的不一样分区能够被保存到集群中不一样的节点上,从而在集群中进行分布式并行计算。
数据集:
RDD是数据集合的抽象,从外部看RDD就是封装以后的可容错的数据集
RDD至关因而一个代理,对RDD进行操做其实就是对分区进行操做,就是对每一台机器上的迭代器进行操做,由于迭代器引用着咱们要操做的数据。
RDD存储的是逻辑数据结构,不存储真实数据,像关系数据库中的view 视图,只是表结构。
2.RDD的五个特征:
A list of partitioner 一系列分区
A function for computing each split 会有一个函数做用在每一个切片上
A list of depedencies on other RDDs 即RDD具备血统,RDD和RDD之间存在依赖关系
Optionally, a Partitioner for key-value RDDs (可选)若是是RDD中装的是KV类型的,那么Shuffle时会有一个分区器。默认是HashPartitioner。目前只有HashPartitioner 和RangeRartitioner
Optionally, a list of preferred locations to compute each split on (可选)若是只从HDFS中读取数据,会感知数据则位置,将Executor启动在数据所在的机器上
3.生成RDD的方式:
执行Transform操做(变换操做),根据已有的RDD计算获得
读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。
将Driver的Scala集合经过并行化的方式变成RDD(试验、测验)
4.RDD的两种操做:
针对RDD的操做,分两种,一种是Transformation(变换),一种是Actions(执行)。
Transformation(变换)操做属于懒操做(算子),不会真正触发RDD的处理计算。
Actions(执行)操做才会真正触发。前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操做的主要区别是,Transformation转换操做(好比map、filter、join等)接受RDD并返回RDD,而Actions行动操做(好比count、collect等)接受RDD可是返回非RDD(即输出一个值或结果)。
RDD采用了惰性调用,即在RDD的执行过程当中,真正的计算发生在RDD的“行动”操做,对于“行动”以前的全部“转换”操做,Spark只是记录下“转换”操做应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。
转换操做:对于RDD而言,每一次转换操做都会产生不一样的RDD,供给下一个“转换”使用。转换获得的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操做时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操做。
下面列出一些常见的转换操做(Transformation API):
filter(func):筛选出知足函数func的元素,并返回一个新的数据集
map(func):将每一个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func):与map()类似,但每一个输入元素均可以映射到0或多个输出结果
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每一个值是将每一个key传递到函数func中进行聚合
行动操做:行动操做是真正触发计算的地方。Spark程序执行到行动操做时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操做,最终,完成行动操做获得结果。
下面列出一些常见的行动操做(Action API):
count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的全部元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前n个元素
reduce(func) 经过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每一个元素传递到函数func中运行
5.RDD的依赖关系:
RDD的依赖关系是spark计算优于hadoop的重要缘由之一。
RDD中不一样的操做会使得不一样RDD中的分区会产生不一样的依赖。
RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)
窄依赖:对于窄依赖操做,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其余的处理,能够简单地认为只是将数据从一个形式转换到另外一个形式。 如 map filter union等
窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区;
宽依赖:表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。宽依赖典型的操做包括groupByKey、sortByKey等
窄依赖源码:
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
//返回子RDD的partitionId依赖的全部的parent RDD的Partition(s)
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}
宽依赖源码:
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
//获取新的shuffleId
val shuffleId: Int = _rdd.context.newShuffleId()
//向ShuffleManager注册Shuffle的信息
val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
spark中一旦遇到宽依赖就须要进行shuffle的操做,所谓的shuffle的操做的本质就是将数据汇总后从新分发的过程。
这个过程数据要汇总到一块儿,数据量可能很大因此不可避免的须要进行数据落磁盘的操做,会下降程序的性能,因此spark并非彻底内存不读写磁盘,只能说它尽力避免这样的过程来提升效率 。
spark中的shuffle,在早期的版本中,会产生多个临时文件,可是这种多临时文件的策略形成大量文件的同时的读写,磁盘的性能被分摊给多个文件,每一个文件读写效率都不高,影响spark的执行效率。因此在后续的spark中(1.2.0以后的版本)的shuffle中,只会产生一个文件,而且数据会通过排序再附加索引信息,减小了文件的数量并经过排序索引的方式提高了性能。
6.RDD的运行流程
1)Driver端 建立RDD对象 SparkContext根据用户提交的程序计算RDD之间的依赖关系,构建DAG
2)Driver端 DAGScheduler将DAG 切分Stage(切分的依据是遇到宽依赖shuffle),将stage中生成的Task以TaskSet的形式给TaskScheduler
3)Driver端 TaskScheduler调度Task(根据资源状况将Task调度到对应的Executor中)
4)Executor接收Task,而后用实现了Runnable接口的包装类将Task包装起来丢入到线程池中执行。
七、RDD底层实现原理
RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每一个RDD的数据都以Block的形式存储于多台机器上,下图是Spark的RDD存储架构图,其中每一个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD再也不须要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。
八、RDD cache的原理
RDD的转换过程当中,并非每一个RDD都会存储,若是某个RDD会被重复使用,或者计算其代价很高,那么能够经过显示调用RDD提供的cache()方法,把该RDD存储下来。那RDD的cache是如何实现的呢?
RDD中提供的cache()方法只是简单的把该RDD放到cache列表中。当RDD的iterator被调用时,经过CacheManager把RDD计算出来,并存储到BlockManager中,下次获取该RDD的数据时即可直接经过CacheManager从BlockManager读出。