RDD

1.概念:数据库

RDD是spark整个体系中最基础核心的概念,RDD(Resilient Distributed DataSet)即弹性分布式数据集数组

弹性:数据结构

RDD支持横向多分区,纵向操做内存不足写入磁盘,hdfs等,实现数据在内存和外存的灵活切换。架构

RDD能够在存储在内存和磁盘之间,而且自动或者手动切换分布式

RDD具备良好的容错性(即RDD能够经过血统转化为其余RDD)ide

Task若是失败,会进行特定次数的重试(default 4)函数

Stage若是失败会就行特定次数的重试oop

RDD能够存储任意类型的数据性能

RDD的分区数目能够自行设定this

分布式:

RDD能够存储在多台主机的内存或者磁盘之上。每一个RDD能够分为多个分区,每一个分区就是一个数据集片断,而且一个RDD的不一样分区能够被保存到集群中不一样的节点上,从而在集群中进行分布式并行计算。

数据集:

RDD是数据集合的抽象,从外部看RDD就是封装以后的可容错的数据集

RDD至关因而一个代理,对RDD进行操做其实就是对分区进行操做,就是对每一台机器上的迭代器进行操做,由于迭代器引用着咱们要操做的数据。
RDD存储的是逻辑数据结构,不存储真实数据,像关系数据库中的view 视图,只是表结构。

 

2.RDD的五个特征:

A list of partitioner     一系列分区
A function for computing each split     会有一个函数做用在每一个切片上
A list of depedencies on other RDDs    即RDD具备血统,RDD和RDD之间存在依赖关系
Optionally, a Partitioner for key-value RDDs (可选)若是是RDD中装的是KV类型的,那么Shuffle时会有一个分区器。默认是HashPartitioner。目前只有HashPartitioner 和RangeRartitioner
Optionally, a list of preferred locations to compute each split on (可选)若是只从HDFS中读取数据,会感知数据则位置,将Executor启动在数据所在的机器上

 

3.生成RDD的方式:

执行Transform操做(变换操做),根据已有的RDD计算获得

读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。

将Driver的Scala集合经过并行化的方式变成RDD(试验、测验)
 

4.RDD的两种操做:

针对RDD的操做,分两种,一种是Transformation(变换),一种是Actions(执行)。

Transformation(变换)操做属于懒操做(算子),不会真正触发RDD的处理计算。

Actions(执行)操做才会真正触发。前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操做的主要区别是,Transformation转换操做(好比map、filter、join等)接受RDD并返回RDD,而Actions行动操做(好比count、collect等)接受RDD可是返回非RDD(即输出一个值或结果)。

RDD采用了惰性调用,即在RDD的执行过程当中,真正的计算发生在RDD的“行动”操做,对于“行动”以前的全部“转换”操做,Spark只是记录下“转换”操做应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

<img  data-cke-saved-src='1.jpg' src='1.jpg'>


转换操做:对于RDD而言,每一次转换操做都会产生不一样的RDD,供给下一个“转换”使用。转换获得的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操做时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操做。
下面列出一些常见的转换操做(Transformation API):

filter(func):筛选出知足函数func的元素,并返回一个新的数据集
map(func):将每一个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func):与map()类似,但每一个输入元素均可以映射到0或多个输出结果
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每一个值是将每一个key传递到函数func中进行聚合

行动操做:行动操做是真正触发计算的地方。Spark程序执行到行动操做时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操做,最终,完成行动操做获得结果。 
下面列出一些常见的行动操做(Action API):

count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的全部元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前n个元素
reduce(func) 经过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每一个元素传递到函数func中运行
 

5.RDD的依赖关系:

RDD的依赖关系是spark计算优于hadoop的重要缘由之一。

RDD中不一样的操做会使得不一样RDD中的分区会产生不一样的依赖。

RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)

窄依赖:对于窄依赖操做,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其余的处理,能够简单地认为只是将数据从一个形式转换到另外一个形式。  如 map  filter  union等 

窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区

宽依赖:表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。宽依赖典型的操做包括groupByKey、sortByKey等

窄依赖源码:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

    //返回子RDD的partitionId依赖的全部的parent RDD的Partition(s)

    def getParents(partitionId: Int): Seq[Int]

    override def rdd: RDD[T] = _rdd

}

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

    override def getParents(partitionId: Int) = List(partitionId)

}

宽依赖源码:

class ShuffleDependency[K, V, C](

    @transient _rdd: RDD[_ <: Product2[K, V]],

    val partitioner: Partitioner,

    val serializer: Option[Serializer] = None,

    val keyOrdering: Option[Ordering[K]] = None,

    val aggregator: Option[Aggregator[K, V, C]] = None,

    val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]] {

 

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

//获取新的shuffleId

val shuffleId: Int = _rdd.context.newShuffleId()

//向ShuffleManager注册Shuffle的信息

val shuffleHandle: ShuffleHandle =

_rdd.context.env.shuffleManager.registerShuffle(

    shuffleId, _rdd.partitions.size, this)

    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

spark中一旦遇到宽依赖就须要进行shuffle的操做,所谓的shuffle的操做的本质就是将数据汇总后从新分发的过程。

这个过程数据要汇总到一块儿,数据量可能很大因此不可避免的须要进行数据落磁盘的操做,会下降程序的性能,因此spark并非彻底内存不读写磁盘,只能说它尽力避免这样的过程来提升效率 。

spark中的shuffle,在早期的版本中,会产生多个临时文件,可是这种多临时文件的策略形成大量文件的同时的读写,磁盘的性能被分摊给多个文件,每一个文件读写效率都不高,影响spark的执行效率。因此在后续的spark中(1.2.0以后的版本)的shuffle中,只会产生一个文件,而且数据会通过排序再附加索引信息,减小了文件的数量并经过排序索引的方式提高了性能。

 

6.RDD的运行流程

<img  data-cke-saved-src='5.jpg' src='5.jpg'>

1)Driver端 建立RDD对象 SparkContext根据用户提交的程序计算RDD之间的依赖关系,构建DAG

2)Driver端 DAGScheduler将DAG 切分Stage(切分的依据是遇到宽依赖shuffle),将stage中生成的Task以TaskSet的形式给TaskScheduler

3)Driver端 TaskScheduler调度Task(根据资源状况将Task调度到对应的Executor中)

 4)Executor接收Task,而后用实现了Runnable接口的包装类将Task包装起来丢入到线程池中执行。

七、RDD底层实现原理
RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每一个RDD的数据都以Block的形式存储于多台机器上,下图是Spark的RDD存储架构图,其中每一个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD再也不须要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。

八、RDD cache的原理
RDD的转换过程当中,并非每一个RDD都会存储,若是某个RDD会被重复使用,或者计算其代价很高,那么能够经过显示调用RDD提供的cache()方法,把该RDD存储下来。那RDD的cache是如何实现的呢?

RDD中提供的cache()方法只是简单的把该RDD放到cache列表中。当RDD的iterator被调用时,经过CacheManager把RDD计算出来,并存储到BlockManager中,下次获取该RDD的数据时即可直接经过CacheManager从BlockManager读出。  

相关文章
相关标签/搜索