spark RDD 详解

概念缓存

RDD具备如下一些特色:架构

建立:只能经过转换( transformation,如map/filter/groupBy/join等,区别于动做action)从两种数据源中建立RDD:1)稳定存储中的数据;2)其余RDD。分布式

只读:状态不可变,不能修改函数

分区:支持使RDD中的元素根据那个key来分区( partitioning),保存到多个结点上。还原时只会从新计算丢失分区的数据,而不会影响整个系统。oop

路径:在RDD中叫世族或血统( lineage),即RDD有充足的信息关于它是如何从其余RDD产生而来的。spa

持久化:支持将会·被重用的RDD缓存(如in-memory或溢出到磁盘)设计

延迟计算:像DryadLINQ同样,Spark也会延迟计算RDD,使其可以将转换管道化(pipeline transformation)orm

操做:丰富的动做( action),count/reduce/collect/save等。对象

关于转换(transformation)与动做(action)的区别,前者会生成新的RDD,然后者只是将RDD上某项操做的结果返回给程序,而不会生成新的RDDip

 

RDD底层实现原理

RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每一个RDD的数据都以Block的形式存储于多台机器上,下图是Spark的RDD存储架构图,其中每一个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD再也不须要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。

RDD cache的原理

RDD的转换过程当中,并非每一个RDD都会存储,若是某个RDD会被重复使用,或者计算其代价很高,那么能够经过显示调用RDD提供的cache()方法,把该RDD存储下来。那RDD的cache是如何实现的呢?

RDD中提供的cache()方法只是简单的把该RDD放到cache列表中。当RDD的iterator被调用时,经过CacheManager把RDD计算出来,并存储到BlockManager中,下次获取该RDD的数据时即可直接经过CacheManager从BlockManager读出。

RDD的容错机制实现分布式数据集容错方法

数据检查点和记录更新RDD采用记录更新的方式:记录全部更新点的成本很高。因此,RDD只支持粗颗粒变换,即只记录单个块上执行的单个操做,而后建立某个RDD的变换序列(血统)存储下来;变换序列指,每一个RDD都包含了他是如何由其余RDD变换过来的以及如何重建某一块数据的信息。所以RDD的容错机制又称“血统”容错。 要实现这种“血统”容错机制,最大的难题就是如何表达父RDD和子RDD之间的依赖关系。实际上依赖关系能够分两种,窄依赖和宽依赖:窄依赖:子RDD中的每一个数据块只依赖于父RDD中对应的有限个固定的数据块;宽依赖:子RDD中的一个数据块能够依赖于父RDD中的全部数据块。例如:map变换,子RDD中的数据块只依赖于父RDD中对应的一个数据块;groupByKey变换,子RDD中的数据块会依赖于多有父RDD中的数据块,由于一个key可能错在于父RDD的任何一个数据块中 将依赖关系分类的两个特性:第一,窄依赖能够在某个计算节点上直接经过计算父RDD的某块数据计算获得子RDD对应的某块数据;宽依赖则要等到父RDD全部数据都计算完成以后,而且父RDD的计算结果进行hash并传到对应节点上以后才能计算子RDD。第二,数据丢失时,对于窄依赖只须要从新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的全部数据块所有从新计算来恢复。因此在长“血统”链特别是有宽依赖的时候,须要在适当的时机设置数据检查点。也是这两个特性要求对于不一样依赖关系要采起不一样的任务调度机制和容错恢复机制。

RDD内部的设计

每一个RDD有5个主要的属性:

1)一组分片(Partition),即数据集的基本组成单位。对于RDD来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度。用户能够在建立RDD时指定RDD的分片个数,若是没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。图3-1描述了分区存储的计算模型,每一个分配的存储是由BlockManager实现的。每一个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。

2)一个计算每一个分区的函数。Spark中RDD的计算是以分片为单位的,每一个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不须要保存每次计算的结果。

3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,因此RDD之间就会造成相似于流水线同样的先后依赖关系。在部分分区数据丢失时,Spark能够经过这个依赖关系从新计算丢失的分区数据,而不是对RDD的全部分区进行从新计算。

4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。

Partitioner函数不但决定了RDD自己的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

5)一个列表,存储存取每一个Partition的优先位置(preferred location)。对于一个HDFS文件来讲,这个列表保存的就是每一个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽量地将计算任务分配到其所要处理数据块的存储位置。

以Spark中内建的几个RDD举例来讲:

信息/RDD

HadoopRDD

FilteredRDD

JoinedRDD

Partitions

每一个HDFS块一个分区,组成集合

与父RDD相同

每一个Reduce任务一个分区

PreferredLoc

HDFS块位置

无(或询问父RDD)

Dependencies

无(父RDD)

与父RDD一对一

对每一个RDD进行混排

Iterator

读取对应的块数据

过滤

联接混排的数据

Partitioner

HashPartitioner

工做原理

主要分为三步:建立RDD对象,DAG调度器建立执行计划,Task调度器分配任务并调度Worker开始运行。

如下面一个按A-Z首字母分类,查找相同首字母下不一样姓名总个数的例子来看一下RDD是如何运行起来的。

步骤1:建立RDD。上面的例子除去最后一个collect是个动做,不会建立RDD以外,前面四个转换都会建立出新的RDD。所以第一步就是建立好全部RDD(内部的五项信息)。

步骤2:建立执行计划。Spark会尽量地管道化,并基因而否要从新组织数据来划分 阶段(stage),例如本例中的groupBy()转换就会将整个执行计划划分红两阶段执行。最终会产生一个 DAG(directed acyclic graph,有向无环图)做为逻辑执行计划。

步骤3:调度任务。将各阶段划分红不一样的 任务(task),每一个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的全部任务都要执行完成。由于下一阶段的第一个转换必定是从新组织数据的,因此必须等当前阶段全部结果数据都计算出来了才能继续。

相关文章
相关标签/搜索