RDD(Resilient Distributed Dataset)叫作分布式数据集,是Spark中最基本的数据抽象,它表明一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的全部操做不外乎建立 RDD、转化已有RDD 以及调用 RDD 操做进行求值。每一个 RDD 都被分为多个分区,这些分区运行在集群中的不一样节点上。RDD 能够包含 Python、Java、Scala 中任意类型的对象, 甚至能够包含用户自定义的对象。RDD具备数据流模型的特色:自动容错、位置感知性调度和可伸缩性。RDD容许用户在执行多个查询时显式地将工做集缓存在内存中,后续的查询可以重用工做集,这极大地提高了查询速度。java
RDD支持两种操做:转化操做和行动操做。RDD 的转化操做是返回一个新的 RDD的操做,好比 map()和 filter(),而行动操做则是向驱动器程序返回结果或把结果写入外部系统的操做。好比 count() 和 first()。es6
Spark采用惰性计算模式,RDD只有第一次在一个行动操做中用到时,才会真正计算。Spark能够优化整个计算过程。默认状况下,Spark 的 RDD 会在你每次对它们进行行动操做时从新计算。若是想在多个行动操做中重用同一个 RDD,可使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。算法
1) 一组分片(Partition),即数据集的基本组成单位。对于RDD来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度。用户能够在建立RDD时指定RDD的分片个数,若是没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。shell
2) 一个计算每一个分区的函数。Spark中RDD的计算是以分片为单位的,每一个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不须要保存每次计算的结果。数据库
3) RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,因此RDD之间就会造成相似于流水线同样的先后依赖关系。在部分分区数据丢失时,Spark能够经过这个依赖关系从新计算丢失的分区数据,而不是对RDD的全部分区进行从新计算。apache
4) 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD自己的分片数量,也决定了parent RDD Shuffle输出时的分片数量。编程
5)一个列表,存储存取每一个Partition的优先位置(preferred location)。对于一个HDFS文件来讲,这个列表保存的就是每一个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽量地将计算任务分配到其所要处理数据块的存储位置。数组
RDD是一个应用层面的逻辑概念。一个RDD多个分片。RDD就是一个元数据记录集,记录了RDD内存全部的关系数据。缓存
1) 自动进行内存和磁盘数据存储的切换安全
Spark优先把数据放到内存中,若是内存放不下,就会放到磁盘里面,程序进行自动的存储切换
2) 基于血统的高效容错机制
在RDD进行转换和动做的时候,会造成RDD的Lineage依赖链,当某一个RDD失效的时候,能够经过从新计算上游的RDD来从新生成丢失的RDD数据。
3) Task若是失败会自动进行特定次数的重试
RDD的计算任务若是运行失败,会自动进行任务的从新计算,默认次数是4次。
4) Stage若是失败会自动进行特定次数的重试
若是Job的某个Stage阶段计算失败,框架也会自动进行任务的从新计算,默认次数也是4次。
5) Checkpoint和Persist可主动或被动触发
RDD能够经过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也能够将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的全部父RDD依赖都会被移除。
6) 数据调度弹性
Spark把这个JOB执行模型抽象为通用的有向无环图DAG,能够将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。
7) 数据分片的高度弹性
能够根据业务的特征,动态调整数据分片的个数,提高总体的应用执行效率。
RDD全称叫作弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能经过其余RDD转换而建立,为此,RDD支持丰富的转换操做(如map, join, filter, groupBy等),经过这种转换操做,新的RDD则包含了如何从其余RDDs衍生所必需的信息,因此说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会造成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是经过血缘关系(Lineage)一鼓作气的,即便出现数据分区丢失,也能够经过血缘关系重建分区。
RDD表示只读的分区的数据集,对RDD进行改动,只能经过RDD的转换操做,由一个RDD获得一个新的RDD,新的RDD包含了从其余RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。若是血缘关系较长,能够经过持久化RDD来切断血缘关系。
RDD逻辑上是分区的,每一个分区的数据是抽象存在的,计算的时候会经过一个compute函数获得每一个分区的数据。若是RDD是经过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,若是RDD是经过其余RDD转换而来,则compute函数是执行转换逻辑将其余RDD的数据进行转换。
RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上建立新的RDD。以下图所示
由一个RDD转换到另外一个RDD,能够经过丰富的操做算子实现,再也不像MapReduce那样只能写map和reduce了,以下图所示。
RDD的操做算子包括两类,一类叫作transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另外一类叫作actions,它是用来触发RDD的计算,获得RDD的相关计算结果或者将RDD保存的文件系统中
RDDs经过操做算子进行转换,转换获得的新RDD包含了从其余RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。以下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另外一种是宽依赖,下游RDD的每一个分区与上游RDD(也称之为父RDD)的每一个分区都有关,是多对多的关系。
经过RDDs之间的这种依赖关系,一个任务流能够描述为DAG(有向无环图),以下图所示,在实际执行过程当中宽依赖对应于Shuffle(图中的reduceByKey和join),窄依赖中的全部转换操做能够经过相似于管道的方式一鼓作气执行(图中map和union能够一块儿执行)。
若是在应用程序中屡次使用同一个RDD,能够将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系获得分区的数据,在后续其余地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。以下图所示,RDD-1通过一系列的转换后获得RDD-n并保存到hdfs,RDD-1在这一过程当中会有个中间结果,若是将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程当中,就不会计算其以前的RDD-0了。
虽然RDD的血缘关系自然地能够实现容错,当RDD的某个分区数据失败或丢失,能够经过血缘关系重建。可是对于长时间迭代型应用来讲,随着迭代的进行,RDDs之间的血缘关系会愈来愈长,一旦在后续迭代过程当中出错,则须要经过很是长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就能够切断以前的血缘关系,由于checkpoint后的RDD不须要知道它的父RDDs了,它能够从checkpoint处拿到数据。
给定一个RDD咱们至少能够知道以下几点信息:
一、分区数以及分区方式;
二、由父RDDs衍生而来的相关依赖信息;
三、计算每一个分区的数据,计算步骤为:
1)若是被缓存,则从缓存中取的分区的数据;
2)若是被checkpoint,则从checkpoint处恢复数据;
3)根据血缘关系计算分区的数据。
在Spark中,RDD被表示为对象,经过对象上的方法调用来对RDD进行转换。通过一系列的transformations定义RDD以后,就能够调用actions触发RDD的计算,action能够是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时能够经过管道的方式传输多个转换。
要使用Spark,须要编写一个Driver程序,它被提交到集群以调度运行Worker,以下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
Dirver ,SparkContext ,Executor ,Master ,Worker 关系如图
在Spark中建立RDD的建立方式大概能够分为三种:(1)、从集合中建立RDD;(2)、从外部存储建立RDD;(3)、从其余RDD建立。
1)由一个已经存在的Scala集合建立,集合并行化。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
而从集合中建立RDD,Spark主要提供了两种函数:parallelize和makeRDD
makeRDD函数有两种实现,第一种实现其实彻底和parallelize一致;而第二种实现能够为数据提供位置信息,而除此以外的实现和parallelize函数也是一致的。
scala> val gh01= sc.parallelize(List(1,2,3)) gh01: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21 scala> val gh02 = sc.makeRDD(List(1,2,3)) gh022: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21 scala> val seq = List((1, List("Hello", "World", "Spark")), | (2, List("At", "zgh"))) seq: List[(Int, List[String])] = List((1,List(Hello, World, Spark)), (2,List(At, zgh))) scala> val gh03 = sc.makeRDD(seq) gh03: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23 scala> guigu3.preferredLocations(gh03.partitions(1)) res26: Seq[String] = List(At, zgh) scala> gh03.preferredLocations(gh03.partitions(0)) res27: Seq[String] = List(Hello, World, Spark) scala> gh01.preferredLocations(gh01.partitions(0)) res28: Seq[String] = List()
2)由外部存储系统的数据集建立,包括本地的文件系统,还有全部Hadoop支持的数据集,好比HDFS、Cassandra、HBase等
scala> val atgh = sc.textFile("hdfs://master01:9000/RELEASE") atgh: org.apache.spark.rdd.RDD[String] = hdfs://master01:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
RDD通常分为数值RDD和键值对RDD
RDD中的全部转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动做。只有当发生一个要求返回结果给Driver的动做时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
经常使用的Transformation:
转换 |
含义 |
|||
map(func) |
返回一个新的RDD,该RDD由每个输入元素通过func函数转换后组成 |
|||
scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24 scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26 scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) |
||||
filter(func) |
返回一个新的RDD,该RDD由通过func函数计算后返回值为true的输入元素组成 |
|||
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val filter = sourceFilter.filter(_.contains("xiao")) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe) |
||||
flatMap(func) |
相似于map,可是每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素) |
|||
scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26 scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5) |
||||
mapPartitions(func) |
相似于map,但独立地在RDD的每个分片上运行,所以在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理全部分区 |
|||
scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))) rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24 scala> :paste // Entering paste mode (ctrl-D to finish) def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = { var woman = List[String]() while (iter.hasNext){ val next = iter.next() next match { case (_,"female") => woman = next._1 :: woman case _ => } } woman.iterator } // Exiting paste mode, now interpreting. partitionsFun: (iter: Iterator[(String, String)])Iterator[String] scala> val result = rdd.mapPartitions(partitionsFun) result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at mapPartitions at <console>:28 scala> result.collect() res13: Array[String] = Array(kpop, lucy) |
||||
mapPartitionsWithIndex(func) |
相似于mapPartitions,但func带有一个整数参数表示分片的索引值,所以在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
|||
scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))) rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[18] at parallelize at <console>:24 scala> :paste // Entering paste mode (ctrl-D to finish) def partitionsFun(index : Int, iter : Iterator[(String,String)]) : Iterator[String] = { var woman = List[String]() while (iter.hasNext){ val next = iter.next() next match { case (_,"female") => woman = "["+index+"]"+next._1 :: woman case _ => } } woman.iterator } // Exiting paste mode, now interpreting. partitionsFun: (index: Int, iter: Iterator[(String, String)])Iterator[String] scala> val result = rdd.mapPartitionsWithIndex(partitionsFun) result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at mapPartitionsWithIndex at <console>:28 scala> result.collect() res14: Array[String] = Array([0]kpop, [3]lucy) |
||||
sample(withReplacement, fraction, seed) |
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(便可能以1 2 3的其中一个起始值) |
|||
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24 scala> rdd.collect() res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> var sample1 = rdd.sample(true,0.4,2) sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26 scala> sample1.collect() res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9) scala> var sample2 = rdd.sample(false,0.2,3) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26 scala> sample2.collect() res17: Array[Int] = Array(1, 9) |
||||
takeSample
|
和Sample的区别是:takeSample返回的是最终的结果集合。 |
|||
union(otherDataset) |
对源RDD和参数RDD求并集后返回一个新的RDD |
|||
scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24 scala> val rdd3 = rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28 scala> rdd3.collect() res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10) |
||||
intersection(otherDataset) |
对源RDD和参数RDD求交集后返回一个新的RDD |
|||
scala> val rdd1 = sc.parallelize(1 to 7) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24 scala> val rdd3 = rdd1.intersection(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28 scala> rdd3.collect() [Stage 15:=============================> (2 + 2) |
||||
distinct([numTasks])) |
对源RDD进行去重后返回一个新的RDD. 默认状况下,只有8个并行任务来操做,可是能够传入一个可选的numTasks参数改变它。 |
|||
scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24 scala> val unionRDD = distinctRdd.distinct() unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26 scala> unionRDD.collect() [Stage 16:> (0 + 4) [Stage 16:=============================> (2 + 2) |
||||
partitionBy
|
对RDD进行分区操做,若是原有的partionRDD和现有的partionRDD是一致的话就不进行分区, |
|||
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24 scala> rdd.partitions.size res24: Int = 4 scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26 scala> rdd2.partitions.size res25: Int = 2 |
||||
reduceByKey(func, [numTasks]) |
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一块儿,reduce任务的个数能够经过第二个可选的参数来设置 |
|||
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24 scala> val reduce = rdd.reduceByKey((x,y) => x+y) reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26 scala> reduce.collect() res29: Array[(String, Int)] = Array((female,6), (male,7)) |
||||
groupByKey |
groupByKey也是对每一个key进行操做,但只生成一个sequence。 |
|||
scala> val words = Array("one", "two", "two", "three", "three", "three") words: Array[String] = Array(one, two, two, three, three, three) scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26 scala> val group = wordPairsRDD.groupByKey() group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28 scala> group.collect() res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1))) scala> group.map(t => (t._1, t._2.sum)) res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31 scala> res2.collect() res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) scala> val map = group.map(t => (t._1, t._2.sum)) map: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:30 scala> map.collect() res4: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) |
||||
combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
|
对相同K,把V合并成一个集合. createCombiner: combineByKey() 会遍历分区中的全部元素,所以每一个元素的键要么尚未遇到过,要么就 和以前的某个元素的键相同。若是这是一个新的元素,combineByKey() 会使用一个叫做 createCombiner() 的函数来建立 mergeValue: 若是这是一个在处理当前分区以前已经遇到的键,它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并 mergeCombiners: 因为每一个分区都是独立处理的,所以对于同一个键能够有多个累加器。若是有两个或者更多的分区都有对应同一个键的累加器,就须要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。 |
|||
scala> val scores = Array(("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)) scores: Array[(String, Int)] = Array((Fred,88), (Fred,95), (Fred,91), (Wilma,93), (Wilma,95), (Wilma,98)) scala> val input = sc.parallelize(scores) input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26 scala> val combine = input.combineByKey( | (v)=>(v,1), | (acc:(Int,Int),v)=>(acc._1+v,acc._2+1), | (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)) combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[53] at combineByKey at <console>:28 scala> val result = combine.map{ | case (key,value) => (key,value._1/value._2.toDouble)} result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at <console>:30 scala> result.collect() res33: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333)) |
||||
aggregateByKey(zeroValue:U,[partitioner: Partitioner])(seqOp: (U, V) => U,combOp: (U, U) => U) |
在kv对的RDD中,,按key将value进行分组合并,合并时,将每一个value和初始值做为seq函数的参数,进行计算,返回的结果做为一个新的kv对,而后再将结果按照key进行合并,最后将每一个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果做为一个新的kv对输出。 seqOp函数用于在每个分区中用初始值逐步迭代value,combOp函数用于合并每一个分区中的结果 |
|||
例如:分一个分区,以key为1的分区为例,0先和3比较得3,3在和2比较得3,3在和4比较得4,因此整个key为1的组最终结果为(1,4),同理,key为2的最终结果为(2,3),key为3的为(3,8). 若是分三个分区,前两个是一个分区,中间两个是一个分区,最后两个是一个分区,第一个分区的最终结果为(1,3),第二个分区为(1,4)(2,3),最后一个分区为(3,8),combine后为 (3,8), (1,7), (2,3) scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_) agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[13] at aggregateByKey at <console>:26 scala> agg.collect() res7: Array[(Int, Int)] = Array((3,8), (1,7), (2,3)) scala> agg.partitions.size res8: Int = 3 scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),1) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_).collect() agg: Array[(Int, Int)] = Array((1,4), (3,8), (2,3)) |
||||
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] |
aggregateByKey的简化操做,seqop和combop相同 |
|||
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24 scala> val agg = rdd.foldByKey(0)(_+_) agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at <console>:26 scala> agg.collect() res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3)) |
||||
sortByKey([ascending], [numTasks]) |
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
|||
scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24 scala> rdd.sortByKey(true).collect() res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc)) scala> rdd.sortByKey(false).collect() res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd)) |
||||
sortBy(func,[ascending], [numTasks]) |
与sortByKey相似,可是更灵活,能够用func先对数据进行处理,按照处理后的数据比较结果排序。 |
|||
scala> val rdd = sc.parallelize(List(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24 scala> rdd.sortBy(x => x).collect() res11: Array[Int] = Array(1, 2, 3, 4) scala> rdd.sortBy(x => x%3).collect() res12: Array[Int] = Array(3, 4, 1, 2) |
||||
join(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD |
|||
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24 scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24 scala> rdd.join(rdd1).collect() res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6))) |
||||
cogroup(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD |
|||
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24 scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24 scala> rdd.cogroup(rdd1).collect() res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), |
||||
cartesian(otherDataset) |
笛卡尔积 |
|||
scala> val rdd1 = sc.parallelize(1 to 3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(2 to 5) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24 scala> rdd1.cartesian(rdd2).collect() res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5)) |
||||
pipe(command, [envVars]) |
对于每一个分区,都执行一个perl或者shell脚本,返回输出的RDD |
|||
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24 scala> rdd.pipe("/home/bigdata/pipe.sh").collect() res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you) scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24 scala> rdd.pipe("/home/bigdata/pipe.sh").collect() res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you) pipe.sh: #!/bin/sh echo "AA" while read LINE; do echo ">>>"${LINE} done 注意:shell脚本须要集群中的全部节点都能访问到。 |
||||
coalesce(numPartitions) |
缩减分区数,用于大数据集过滤后,提升小数据集的执行效率。 |
|||
scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24 scala> rdd.partitions.size res20: Int = 4 scala> val coalesceRDD = rdd.coalesce(3) coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26 scala> coalesceRDD.partitions.size res21: Int = 3 |
||||
repartition(numPartitions) |
根据分区数,重新经过网络随机洗牌全部数据。 |
|||
scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24 scala> rdd.partitions.size res22: Int = 4 scala> val rerdd = rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26 scala> rerdd.partitions.size res23: Int = 2 scala> val rerdd = rdd.repartition(4) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at repartition at <console>:26 scala> rerdd.partitions.size res24: Int = 4 |
||||
repartitionAndSortWithinPartitions(partitioner) |
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不一样的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。 |
|||
|
|
|||
glom |
将每个分区造成一个数组,造成新的RDD类型时RDD[Array[T]] |
|||
scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24 scala> rdd.glom().collect() res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16)) |
||||
mapValues |
针对于(K,V)形式的类型只对V进行操做 |
|||
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24 scala> rdd3.mapValues(_+"|||").collect() res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||)) |
||||
subtract |
计算差的一种函数去除两个RDD中相同的元素,不一样的RDD将保留下来 |
|||
scala> val rdd = sc.parallelize(3 to 8) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24 scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24 scala> rdd.subtract(rdd1).collect() res27: Array[Int] = Array(8, 6, 7) |
||||
|
经常使用的Action:
动做 |
含义 |
reduce(func) |
经过func函数汇集RDD中的全部元素,这个功能必须是可交换且可并联的 |
scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24 scala> rdd1.reduce(_+_) res50: Int = 55 scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24 scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2)) res51: (String, Int) = (adca,12) |
|
collect() |
在驱动程序中,以数组的形式返回数据集的全部元素 |
|
|
count() |
返回RDD的元素个数 |
|
|
first() |
返回RDD的第一个元素(相似于take(1)) |
|
|
take(n) |
返回一个由数据集的前n个元素组成的数组 |
|
|
takeSample(withReplacement,num, [seed]) |
返回一个数组,该数组由从数据集中随机采样的num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
|
|
takeOrdered(n) |
返回前几个的排序 |
|
|
aggregate (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
|
aggregate函数将每一个分区里面的元素经过seqOp和初始值进行聚合,而后用combine函数将每一个分区的结果和初始值(zeroValue)进行combine操做。这个函数最终返回的类型不须要和RDD中元素类型一致。 |
scala> var rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24 scala> rdd1.aggregate(1)( | {(x : Int,y : Int) => x + y}, | {(a : Int,b : Int) => a + b} | ) res56: Int = 58 scala> rdd1.aggregate(1)( | {(x : Int,y : Int) => x * y}, | {(a : Int,b : Int) => a + b} | ) res57: Int = 30361 |
|
fold(num)(func) |
折叠操做,aggregate的简化操做,seqop和combop同样。 |
scala> var rdd1 = sc.makeRDD(1 to 4,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at makeRDD at <console>:24 scala> rdd1.aggregate(1)( | {(x : Int,y : Int) => x + y}, | {(a : Int,b : Int) => a + b} | ) res59: Int = 13 scala> rdd1.fold(1)(_+_) res60: Int = 13 |
|
saveAsTextFile(path) |
将数据集的元素以textfile的形式保存到HDFS文件系统或者其余支持的文件系统,对于每一个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
|
|
saveAsSequenceFile(path) |
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可使HDFS或者其余Hadoop支持的文件系统。 |
|
|
saveAsObjectFile(path) |
用于将RDD中的元素序列化成对象,存储到文件中。
|
|
|
countByKey() |
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。 |
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24 scala> rdd.countByKey() res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1) |
|
foreach(func) |
在数据集的每个元素上,运行函数func进行更新。 |
scala> var rdd = sc.makeRDD(1 to 10,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24 scala> var sum = sc.accumulator(0) warning: there were two deprecation warnings; re-run with -deprecation for details sum: org.apache.spark.Accumulator[Int] = 0 scala> rdd.foreach(sum+=_) scala> sum.value res68: Int = 55 scala> rdd.collect().foreach(println) 1 2 3 4 5 6 7 8 9 10 |
Spark 对包含数值数据的 RDD 提供了一些描述性的统计操做。 Spark 的数值操做是经过流式算法实现的,容许以每次一个元素的方式构建出模型。这些 统计数据都会在调用 stats() 时经过一次遍历数据计算出来,并以 StatsCounter 对象返回。
scala> var rdd1 = sc.makeRDD(1 to 100) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at makeRDD at <console>:32 scala> rdd1.sum() res34: Double = 5050.0 scala> rdd1.max() res35: Int = 100
Spark 的大部分转化操做和一部分行动操做,都须要依赖用户传递的函数来计算。 在 Scala 中,咱们能够把定义的内联函数、方法的引用或静态方法传递给 Spark,就像 Scala 的其余函数式 API 同样。咱们还要考虑其余一些细节,好比所传递的函数及其引用 的数据须要是可序列化的(实现了 Java 的 Serializable 接口)。 传递一个对象的方法或者字段时,会包含对整个对象的引用。
class SearchFunctions(val query: String) extends java.io.Serializable{ def isMatch(s: String): Boolean = { s.contains(query) }
def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { // "isMatch"表示"this.isMatch",所以咱们要传递整个"this" rdd.filter(isMatch) }
def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { // "query"表示"this.query",所以咱们要传递整个"this" rdd.filter(x => x.contains(query)) }
def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { // 安全:只把咱们须要的字段拿出来放入局部变量中 val query_ = this.query rdd.filter(x => x.contains(query_)) } }
若是在 Scala 中出现了 NotSerializableException,一般问题就在于咱们传递了一个不可序列 化的类中的函数或字段。
有些函数只能用于特定类型的 RDD,好比 mean() 和 variance() 只能用在数值 RDD 上, 而 join() 只能用在键值对 RDD 上。在 Scala 和 Java 中,这些函数都没有定义在标准的 RDD 类中,因此要访问这些附加功能,必需要确保得到了正确的专用 RDD 类。
在 Scala 中,将 RDD 转为有特定函数的 RDD(好比在 RDD[Double] 上进行数值操做)是 由隐式转换来自动处理的。
Spark速度很是快的缘由之一,就是在不一样操做中能够在内存中持久化或缓存个数据集。当持久化某个RDD后,每个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其余动做中重用。这使得后续的动做变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。能够说,缓存是Spark构建迭代式算法和快速交互式查询的关键。若是一个有持久化数据的节点发生故障,Spark 会在须要用到缓存的数据时重算丢失的数据分区。若是 但愿节点故障的状况不会拖累咱们的执行速度,也能够把数据备份到多个节点上。
RDD经过persist方法或cache方法能够将前面的计算结果缓存,默认状况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空 间中。
可是并非这两个方法被调用时当即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
scala> val rdd = sc.makeRDD(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at <console>:25 scala> val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]") nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27 scala> val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]") cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27 scala> cache.cache res24: cache.type = MapPartitionsRDD[21] at map at <console>:27 scala> nocache.collect res25: Array[String] = Array(1[1505479375155], 2[1505479374674], 3[1505479374674], 4[1505479375153], 5[1505479375153], 6[1505479374675], 7[1505479375154], 8[1505479375154], 9[1505479374676], 10[1505479374676]) scala> nocache.collect res26: Array[String] = Array(1[1505479375679], 2[1505479376157], 3[1505479376157], 4[1505479375680], 5[1505479375680], 6[1505479376159], 7[1505479375680], 8[1505479375680], 9[1505479376158], 10[1505479376158]) scala> nocache.collect res27: Array[String] = Array(1[1505479376743], 2[1505479377218], 3[1505479377218], 4[1505479376745], 5[1505479376745], 6[1505479377219], 7[1505479376747], 8[1505479376747], 9[1505479377218], 10[1505479377218]) scala> cache.collect res28: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253]) scala> cache.collect res29: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253]) scala> cache.collect res30: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253]) cache.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
在存储级别的末尾加上“_2”来把持久化数据存为两份
缓存有可能丢失,或者存储存储于内存的数据因为内存不足而被删除,RDD的缓存容错机制保证了即便缓存丢失也能保证计算的正确执行。经过基于RDD的一系列转换,丢失的数据会被重算,因为RDD的各个Partition是相对独立的,所以只须要计算丢失的部分便可,并不须要重算所有Partition。
注意:使用 Tachyon能够实现堆外缓存
Spark中对于数据的保存除了持久化操做以外,还提供了一种检查点的机制,检查点(本质是经过将RDD写入Disk作检查点)是为了经过lineage作容错的辅助,lineage过长会形成容错成本太高,这样就不如在中间阶段作检查点容错,若是以后有节点出现问题而丢失分区,从作检查点的RDD开始重作Lineage,就会减小开销。检查点经过将数据写入到HDFS文件系统实现了RDD的检查点功能。
cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来而后放在内存中,可是RDD 的依赖链(至关于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了,上面cache 的RDD就会丢掉, 须要经过 依赖链重放计算出来, 不一样的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,因此依赖链就能够丢掉了,就斩断了依赖链, 是经过复制实现的高容错。
若是存在如下场景,则比较适合使用检查点机制:
1) DAG中的Lineage过长,若是重算,则开销太大(如在PageRank中)。
2) 在宽依赖上作Checkpoint得到的收益更大。
为当前RDD设置检查点。该函数将会建立一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程当中,该RDD的全部依赖于父RDD中的信息将所有被移出。对RDD进行checkpoint操做并不会立刻被执行,必须执行Action操做才能触发。
scala> val data = sc.parallelize(1 to 100 , 5) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:12 scala> sc.setCheckpointDir("hdfs://master01:9000/checkpoint") scala> data.checkpoint scala> data.count scala> val ch1 = sc.parallelize(1 to 2) ch1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:25 scala> val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]") ch2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[36] at map at <console>:27 scala> val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]") ch3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[37] at map at <console>:27 scala> ch3.checkpoint scala> ch2.collect res62: Array[String] = Array(1[1505480940726], 2[1505480940243]) scala> ch2.collect res63: Array[String] = Array(1[1505480941957], 2[1505480941480]) scala> ch2.collect res64: Array[String] = Array(1[1505480942736], 2[1505480942257]) scala> ch3.collect res65: Array[String] = Array(1[1505480949080], 2[1505480948603]) scala> ch3.collect res66: Array[String] = Array(1[1505480948683], 2[1505480949161]) scala> ch3.collect res67: Array[String] = Array(1[1505480948683], 2[1505480949161])
RDD checkpoint 过程当中会通过如下几个状态,
[ Initialized → marked for checkpointing → checkpointing in progress → checkpointed ]
转换流程以下
1) data.checkpoint 这个函数调用中, 设置的目录中, 全部依赖的 RDD 都会被删除, 函数必须在 job 运行以前调用执行, 强烈建议 RDD 缓存 在内存中(注意), 不然保存到文件的时候须要从头计算。初始化RDD的 checkpointData 变量为 ReliableRDDCheckpointData。 这时候标记为 Initialized 状态,
2) 在全部 job action 的时候, runJob 方法中都会调用 rdd.doCheckpoint , 这个会向前递归调用全部的依赖的RDD, 看看需不须要 checkpoint。 若是须要 checkpoint, 而后调用 checkpointData.get.checkpoint(), 里面标记 状态为 CheckpointingInProgress, 里面调用具体实现类的 ReliableRDDCheckpointData 的 doCheckpoint 方法,
3) doCheckpoint -> writeRDDToCheckpointDirectory, 注意这里会把 job 再运行一次, 若是已经cache 了,就能够直接使用缓存中的 RDD 了, 就不须要重头计算一遍了(注意), 这时候直接把RDD, 输出到 hdfs, 每一个分区一个文件, 会先写到一个临时文件, 若是所有输出完,进行 rename , 若是输出失败,就回滚delete。
4)标记 状态为 Checkpointed, markCheckpointed方法中清除全部的依赖, 怎么清除依赖的呢, 就是 吧RDD 变量的强引用 设置为 null, 垃圾回收了,会触发 ContextCleaner 里面监听清除实际 BlockManager 缓存中的数据
若是一个RDD 咱们已经 checkpoint了那么是何时用呢, checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,若是不被手动 remove 掉,是一直存在的,也就是说能够被下一个 driver program 使用。 好比 spark streaming 挂掉了, 重启后就可使用以前 checkpoint 的数据进行 recover (下面会讲到) , 固然在同一个 driver program 也可使用。 考虑在同一个 driver program 中是怎么使用 checkpoint 数据的。
若是 一个 RDD 被checkpoint了, 若是这个 RDD 上有 action 操做时候,或者回溯的这个 RDD 的时候,这个 RDD 进行计算的时候,里面判断若是已经 checkpoint 过, 对分区和依赖的处理都是使用的 RDD 内部的 checkpointRDD 变量。
具体细节以下,
若是 一个 RDD 被checkpoint了, 那么这个 RDD 中对分区和依赖的处理都是使用的 RDD 内部的 checkpointRDD 变量, 具体实现是 ReliableCheckpointRDD 类型。 这个是在 checkpoint 写流程中建立的。依赖和获取分区方法中先判断是否已经checkpoint, 若是已经checkpoint了, 就斩断依赖, 使用ReliableCheckpointRDD, 来处理依赖和获取分区。
若是没有,才往前回溯依赖。 依赖就是没有依赖, 由于已经斩断了依赖, 获取分区数据就是读取 checkpoint 到 hdfs目录中不一样分区保存下来的文件。
RDD和它依赖的父RDD(s)的关系有两种不一样的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
窄依赖指的是每个父RDD的Partition最多被子RDD的一个Partition使用
总结:窄依赖咱们形象的比喻为独生子女
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引发shuffle
总结:窄依赖咱们形象的比喻为超生
RDD只支持粗粒度转换,即在大量记录上执行的单个操做。将建立RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它能够根据这些信息来从新运算和恢复丢失的数据分区。
scala> val text = sc.textFile("README.md") text: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> val words = text.flatMap(_.split) split splitAt scala> val words = text.flatMap(_.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26 scala> words.map((_,1)) res0: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29 scala> res0.reduceByKey reduceByKey reduceByKeyLocally scala> res0.reduceByKey(_+_) res1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:31 scala> res1.dependencies res2: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@6cfe48a4) scala> res0.dependencies res3: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@6c9e24c4)
DAG(Directed Acyclic Graph)叫作有向无环图,原始的RDD经过一系列的转换就就造成了DAG,根据RDD之间的依赖关系的不一样将DAG划分红不一样的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,因为有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,所以宽依赖是划分Stage的依据。
输入可能以多个文件的形式存储在HDFS上,每一个File都包含了不少块,称为Block。
当Spark读取这些文件做为输入时,会根据具体数据格式对应的InputFormat进行解析,通常是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。
随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。
随后这些具体的Task每一个都会被分配到集群上的某个节点的某个Executor去执行。
1) 每一个节点能够起一个或多个Executor。
2) 每一个Executor由若干core组成,每一个Executor的每一个core一次只能执行一个Task。
3) 每一个Task执行的结果就是生成了目标RDD的一个partiton。
注意: 这里的core是虚拟的core而不是机器的物理CPU核,能够理解为就是Executor的一个工做线程。
而 Task被执行的并发度 = Executor数目 * 每一个Executor核数。
至于partition的数目:
1) 对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会须要多少初始Task。
2) 在Map阶段partition数目保持不变。
3) 在Reduce阶段,RDD的聚合会触发shuffle操做,聚合后的RDD的partition数目跟具体操做有关,例如repartition操做会聚合成指定分区数,还有一些算子是可配置的。
RDD在计算的时候,每一个分区都会起一个task,因此rdd的分区数目决定了总的的task数目。
申请的计算节点(Executor)数目和每一个计算节点核数,决定了你同一时刻能够并行执行的task。
好比的RDD有100个分区,那么计算的时候就会生成100个task,你的资源配置为10个计算节点,每一个2个核,同一时刻能够并行的task数目为20,计算这个RDD就须要5个轮次。
若是计算资源不变,你有101个task的话,就须要6个轮次,在最后一轮中,只有一个task在执行,其他核都在空转。
若是资源不变,你的RDD只有2个分区,那么同一时刻只有2个task运行,其他18个核空转,形成资源浪费。这就是在spark调优中,增大RDD分区数目,增大任务并行度的作法。