RDD是分布式内存的一个抽象概念,是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,能横跨集群全部节点并行计算,是一种基于工做集的应用抽象。apache
RDD底层存储原理:其数据分布存储于多台机器上,事实上,每一个RDD的数据都以Block的形式存储于多台机器上,每一个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点上的BlockManagerMaster保存,BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD再也不须要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。数组
BlockManager管理RDD的物理分区,每一个Block就是节点上对应的一个数据块,能够存储在内存或者磁盘上。而RDD中的Partition是一个逻辑数据块,对应相应的物理块Block。本质上,一个RDD在代码中至关于数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD以前的依赖转换关系。缓存
BlockManager在每一个节点上运行管理Block(Driver和Executors),它提供一个接口检索本地和远程的存储变量,如memory、disk、off-heap。使用BlockManager前必须先初始化。BlockManager.scala的部分源码以下所示:安全
private[spark] class BlockManager( executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, serializerManager: SerializerManager, val conf: SparkConf, memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, val blockTransferService: BlockTransferService, securityManager: SecurityManager, numUsableCores: Int) extends BlockDataManager with BlockEvictionHandler with Logging {
BlockManagerMaster会持有整个Application的Block的位置、Block所占用的存储空间等元数据信息,在Spark的Driver的DAGScheduler中,就是经过这些信息来确认数据运行的本地性的。Spark支持重分区,数据经过Spark默认的或者用户自定义的分区器决定数据块分布在哪些节点。RDD的物理分区是由Block-Manager管理的,每一个Block就是节点上对应的一个数据块,能够存储在内存或者磁盘。而RDD中的partition是一个逻辑数据块,对应相应的物理块Block。本质上,一个RDD在代码中至关于数据的一个元数据结构(一个RDD就是一组分区),存储着数据分区及Block、Node等的映射关系,以及其余元数据信息,存储着RDD以前的依赖转换关系。分区是一个逻辑概念,Transformation先后的新旧分区在物理上多是同一块内存存储。 数据结构
Spark经过读取外部数据建立RDD,或经过其余RDD执行肯定的转换Transformation操做(如map、union和groubByKey)而建立,从而构成了线性依赖关系,或者说血统关系(Lineage),在数据分片丢失时能够从依赖关系中恢复本身独立的数据分片,对其余数据分片或计算机没有影响,基本没有检查点开销,使得实现容错的开销很低,失效时只须要从新计算RDD分区,就能够在不一样节点上并行执行,而不须要回滚(Roll Back)整个程序。落后任务(即运行很慢的节点)是经过任务备份,从新调用执行进行处理的。分布式
由于RDD自己支持基于工做集的运用,因此可使Spark的RDD持久化(persist)到内存中,在并行计算中高效重用。多个查询时,咱们就能够显性地将工做集中的数据缓存到内存中,为后续查询提供复用,这极大地提高了查询的速度。在Spark中,一个RDD就是一个分布式对象集合,每一个RDD可分为多个片(Partitions),而分片能够在集群环境的不一样节点上计算。ide
RDD做为泛型的抽象的数据结构,支持两种计算操做算子:Transformation(变换)与Action(行动)。且RDD的写操做是粗粒度的,读操做既能够是粗粒度的,也能够是细粒度的。RDD.scala的源码以下: 函数
/** * Internally, each RDD is characterized by five main properties: * 每一个RDD都有5个主要特性 * - A list of partitions 分区列表 * - A function for computing each split 每一个分区都有一个计算函数 * - A list of dependencies on other RDDs 依赖于其余RDD的列表 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 数据类型(key-value)的RDD分区器 * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for 每一个分区都有一个分区位置列表 */ abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging {
其中,SparkContext是Spark功能的主要入口点,一个SparkContext表明一个集群链接,能够用其在集群中建立RDD、累加变量、广播变量等,在每个可用的JVM中只有一个SparkContext,在建立一个新的SparkContext以前,必须先中止该JVM中可用的SparkContext,这种限制可能最终会被修改。SparkContext被实例化时须要一个SparkConf对象去描述应用的配置信息,在这个配置对象中设置的信息,会覆盖系统默认的配置。大数据
RDD五大特性:this
(1)分区列表(a list of partitions)。Spark RDD是被分区的,每个分区都会被一个计算任务(Task)处理,分区数决定并行计算数量,RDD的并行度默认从父RDD传给子RDD。默认状况下,一个HDFS上的数据分片就是一个Partition,RDD分片数决定了并行计算的力度,能够在建立RDD时指定RDD分片个数,若是不指定分区数量,当RDD从集合建立时,则默认分区数量为该程序所分配到的资源的CPU核数(每一个Core能够承载2~4个Partition),若是是从HDFS文件建立,默认为文件的Block数。
(2)每个分区都有一个计算函数(a function for computing each split)。每一个分区都会有计算函数,Spark的RDD的计算函数是以分片为基本单位的,每一个RDD都会实现compute函数,对具体的分片进行计算,RDD中的分片是并行的,因此是分布式并行计算。有一点很是重要,就是因为RDD有先后依赖关系,遇到宽依赖关系,例如,遇到reduceBykey等宽依赖操做的算子,Spark将根据宽依赖划分Stage,Stage内部经过Pipeline操做,经过Block Manager获取相关的数据,由于具体的split要从外界读数据,也要把具体的计算结果写入外界,因此用了一个管理器,具体的split都会映射成BlockManager的Block,而具体split会被函数处理,函数处理的具体形式是以任务的形式进行的。
(3)依赖于其余RDD的列表(a list of dependencies on other RDDs)。RDD的依赖关系,因为RDD每次转换都会生成新的RDD,因此RDD会造成相似流水线的先后依赖关系,固然,宽依赖就不相似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面全部的RDD的全部的数据分片,这时数据分片就不进行内存中的Pipeline,这时通常是跨机器的。由于有先后的依赖关系,因此当有分区数据丢失的时候,Spark会经过依赖关系从新计算,算出丢失的数据,而不是对RDD全部的分区进行从新计算。RDD之间的依赖有两种:窄依赖(Narrow Dependency)、宽依赖(Wide Dependency)。RDD是Spark的核心数据结构,经过RDD的依赖关系造成调度关系。经过对RDD的操做造成整个Spark程序。
RDD有Narrow Dependency和Wide Dependency两种不一样类型的依赖,其中的Narrow Dependency指的是每个parent RDD的Partition最多被child RDD的一个Partition所使用,而Wide Dependency指的是多个child RDD的Partition会依赖于同一个parent RDD的Partition。能够从两个方面来理解RDD之间的依赖关系:一方面是该RDD的parent RDD是什么;另外一方面是依赖于parent RDD的哪些Partitions;根据依赖于parent RDD的Partitions的不一样状况,Spark将Dependency分为宽依赖和窄依赖两种。Spark中宽依赖指的是生成的RDD的每个partition都依赖于父RDD的全部partition,宽依赖典型的操做有groupByKey、sortByKey等,宽依赖意味着shuffle操做,这是Spark划分Stage边界的依据,Spark中宽依赖支持两种Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基于Hash的Shuffle机制,后者是基于排序的Shuffle机制。Spark 2.2如今的版本中已经没有Hash Shuffle的方式。
(4)key-value数据类型的RDD分区器(-Optionally,a Partitioner for key-value RDDS),控制分区策略和分区数。每一个key-value形式的RDD都有Partitioner属性,它决定了RDD如何分区。固然,Partition的个数还决定每一个Stage的Task个数。RDD的分片函数,想控制RDD的分片函数的时候能够分区(Partitioner)传入相关的参数,如HashPartitioner、RangePartitioner,它自己针对key-value的形式,若是不是key-value的形式,它就不会有具体的Partitioner。Partitioner自己决定了下一步会产生多少并行的分片,同时,它自己也决定了当前并行(parallelize)Shuffle输出的并行数据,从而使Spark具备可以控制数据在不一样节点上分区的特性,用户能够自定义分区策略,如Hash分区等。Spark提供了“partitionBy”运算符,能经过集群对RDD进行数据再分配来建立一个新的RDD。
(5)每一个分区都有一个优先位置列表(-Optionally,a list of preferred locations to compute each split on)。它会存储每一个Partition的优先位置,对于一个HDFS文件来讲,就是每一个Partition块的位置。观察运行spark集群的控制台会发现Spark的具体计算,具体分片前,它已经清楚地知道任务发生在什么节点上,也就是说,任务自己是计算层面的、代码层面的,代码发生运算以前已经知道它要运算的数据在什么地方,有具体节点的信息。这就符合大数据中数据不动代码动的特色。数据不动代码动的最高境界是数据就在当前节点的内存中。这时有多是memory级别或Alluxio级别的,Spark自己在进行任务调度时候,会尽量将任务分配处处理数据的数据块所在的具体位置。据Spark的RDD.Scala源码函数getPreferredLocations可知,每次计算都符合完美的数据本地性。
RDD类源码文件中的4个方法和一个属性对应上述阐述的RDD的5大特性。RDD.scala的源码以下:
/** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. 经过子类实现给定分区的计算 */ @DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T] /** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * 经过子类实现,返回一个RDD分区列表,这个方法只被调用一次,它是安全的执行一次耗时计算 * * 数组中的分区必须符合如下属性设置 * The partitions in this array must satisfy the following property: * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }` */ protected def getPartitions: Array[Partition] /** * 返回对父RDD的依赖列表,这个方法仅只被调用一次,它是安全的执行一次耗时计算 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getDependencies: Seq[Dependency[_]] = deps /** * 可选的,指定优先位置,输入参数是spilt分片,输出结果是一组优先的节点位置 * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil /** * Optionally overridden by subclasses to specify how they are partitioned. * 可选的,经过子类实现,指定如何分区 */ @transient val partitioner: Option[Partitioner] = None
其中,TaskContext是读取或改变执行任务的环境,用org.apache.spark.TaskContext.get()可返回当前可用的TaskContext,能够调用内部的函数访问正在运行任务的环境信息。Partitioner是一个对象,定义了如何在key-Value类型的RDD元素中用Key分区,从0到numPartitions-1区间内映射每个Key到Partition ID。Partition是一个RDD的分区标识符。Partition.scala的源码以下。
/** * An identifier for a partition in an RDD. */ trait Partition extends Serializable { /** * Get the partition's index within its parent RDD */ def index: Int // A better default implementation of HashCode override def hashCode(): Int = index override def equals(other: Any): Boolean = super.equals(other) }