RDD做为弹性分布式数据集,它的弹性具体体如今如下七个方面。node
Spark会优先把数据放到内存中,若是内存实在放不下,会放到磁盘里面,不但能计算内存放下的数据,也能计算内存放不下的数据。若是实际数据大于内存,则要考虑数据放置策略和优化算法。当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保障其高效运行。web
Lineage是基于Spark RDD的依赖关系来完成的(依赖分为窄依赖和宽依赖两种形态),每一个操做只关联其父操做,各个分片的数据之间互不影响,出现错误时只要恢复单个Split的特定部分便可。常规容错有两种方式:一个是数据检查点;另外一个是记录数据的更新。数据检查点的基本工做方式,就是经过数据中心的网络连接不一样的机器,而后每次操做的时候都要复制数据集,就至关于每次都有一个复制,复制是要经过网络传输的,网络带宽就是分布式的瓶颈,对存储资源也是很大的消耗。记录数据更新就是每次数据变化了就记录一下,这种方式不须要从新复制一份数据,可是比较复杂,消耗性能。Spark的RDD经过记录数据更新的方式为什么很高效?由于① RDD是不可变的且Lazy;② RDD的写操做是粗粒度的。可是,RDD读操做既能够是粗粒度的,也能够是细粒度的。算法
默认重试次数为4次。TaskSchedulerImpl的源码以下所示:apache
private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging { def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) val conf = sc.conf
TaskSchedulerImpl是底层的任务调度接口TaskScheduler的实现,这些Schedulers从每个Stage中的DAGScheduler中获取TaskSet,运行它们,尝试是否有故障。DAGScheduler是高层调度,它计算每一个Job的Stage的DAG,而后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。缓存
这样,Stage对象能够跟踪多个StageInfo(存储SparkListeners监听到的Stage的信息,将Stage信息传递给Listeners或web UI)。默认重试次数为4次,且能够直接运行计算失败的阶段,只计算失败的数据分片,Stage的源码以下所示:网络
private[scheduler] abstract class Stage( val id: Int, val rdd: RDD[_], val numTasks: Int, val parents: List[Stage], val firstJobId: Int, val callSite: CallSite) extends Logging { val numPartitions = rdd.partitions.length /** Set of jobs that this stage belongs to.属于这个工做集的Stage */ val jobIds = new HashSet[Int] val pendingPartitions = new HashSet[Int] /** The ID to use for the next new attempt for this stage.用于此Stage的下一个新attempt的ID */ private var nextAttemptId: Int = 0 val name: String = callSite.shortForm val details: String = callSite.longForm /** * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts * have been created). * 最新的[StageInfo] Object指针,须要被初始化,任何attempts都是被创造出来的,由于DAGScheduler使用 * StageInfo告诉SparkListeners工做什么时候开始(即发生前的任何阶段已经建立) */ private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId) /** * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure. * We keep track of each attempt ID that has failed to avoid recording duplicate failures if * multiple tasks from the same stage attempt fail (SPARK-5945). * 设置Stage attempt IDs当失败时能够读取失败信息,跟踪这些失败,为了不无休止的重复失败 * 跟踪每一次attempt,以便避免记录重复故障,若是从同一stage建立多任务失败(SPARK-5945) */ private val fetchFailedAttemptIds = new HashSet[Int] private[scheduler] def clearFailures() : Unit = { fetchFailedAttemptIds.clear() } /** * Check whether we should abort the failedStage due to multiple consecutive fetch failures. * 检查是否应该停止因为连续屡次读取失败的stage * This method updates the running set of failed stage attempts and returns * true if the number of failures exceeds the allowable number of failures. * 若是失败的次数超过容许的次数,此方法更新失败stage attempts和返回的运行集 */ private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = { fetchFailedAttemptIds.add(stageAttemptId) fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES } /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ // 在stage中建立一个新的attempt def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { val metrics = new TaskMetrics metrics.register(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 } /** Returns the StageInfo for the most recent attempt for this stage. */ // 返回当前stage中最新的stageinfo def latestInfo: StageInfo = _latestInfo override final def hashCode(): Int = id override final def equals(other: Any): Boolean = other match { case stage: Stage => stage != null && stage.id == id case _ => false } /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ // 返回须要从新计算的分区标识的序列 def findMissingPartitions(): Seq[Int] } private[scheduler] object Stage { // The number of consecutive failures allowed before a stage is aborted // 容许一个stage停止的连续故障数 val MAX_CONSECUTIVE_FETCH_FAILURES = 4 }
Stage是Spark Job运行时具备相同逻辑功能和并行计算任务的一个基本单元。Stage中全部的任务都依赖一样的Shuffle,每一个DAG任务经过DAGScheduler在Stage的边界处发生Shuffle造成Stage,而后DAGScheduler运行这些阶段的拓扑顺序。每一个Stage均可能是ShuffleMapStage,若是是ShuffleMapStage,则跟踪每一个输出节点(nodes)上的输出文件分区,它的任务结果是输入其余的Stage(s),或者输入一个ResultStage,若输入一个ResultStage,这个ResultStage的任务直接在这个RDD上运行计算这个Spark Action的函数(如count()、 save()等),并生成shuffleDep等字段描述Stage和生成变量,如outputLocs和numAvailableOutputs,为跟踪map输出作准备。每一个Stage会有firstjobid,肯定第一个提交Stage的Job,使用FIFO调度时,会使得其前面的Job先行计算或快速恢复(失败时)。 app
ShuffleMapStage是DAG产生数据进行Shuffle的中间阶段,它发生在每次Shuffle操做以前,可能包含多个Pipelined操做,ResultStage阶段捕获函数在RDD的分区上运行Action算子计算结果,有些Stage不是运行在RDD的全部的分区上,例如,first()、lookup()等。SparkListener是Spark调度器的事件监听接口。注意,这个接口随着Spark版本的不一样会发生变化。less
checkpoint是对RDD进行的标记,会产生一系列的文件,且全部父依赖都会被删除,是整个依赖(Lineage)的终点。checkpoint也是Lazy级别的。persist后RDD工做时每一个工做节点都会把计算的分片结果保存在内存或磁盘中,下一次若是对相同的RDD进行其余的Action计算,就能够重用。dom
由于用户只与Driver Program交互,所以只能用RDD中的cache()方法去cache用户能看到的RDD。所谓能看到,是指通过Transformation算子处理后生成的RDD,而某些在Transformation算子中Spark本身生成的RDD是不能被用户直接cache的。例如,reduceByKey()中会生成的ShuffleRDD、MapPartitionsRDD是不能被用户直接cache的。在Driver Program中设定RDD.cache()后,系统怎样进行cache?首先,在计算RDD的Partition以前就去判断Partition要不要被cache,若是要被cache,先将Partition计算出来,而后cache到内存。cache可以使用memory,若是写到HDFS磁盘的话,就要检查checkpoint。调用RDD.cache()后,RDD就变成persistRDD了,其StorageLevel为MEMORY_ONLY,persistRDD会告知Driver说本身是须要被persist的。此时会调用RDD.iterator()。 RDD.scala的iterator()的源码以下:分布式
/** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. * RDD的内部方法,将从合适的缓存中读取,不然计算它。这不该该被用户直接使用,但可用于实现自定义的子RDD */ final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
当RDD.iterator()被调用的时候,也就是要计算该RDD中某个Partition的时候,会先去cacheManager那里获取一个blockId,而后去BlockManager里匹配该Partition是否被checkpoint了,若是是,那就不用计算该Partition了,直接从checkpoint中读取该Partition的全部records放入ArrayBuffer里面。若是没有被checkpoint过,先将Partition计算出来,而后将其全部records放到cache中。整体来讲,当RDD会被重复使用(不能太大)时,RDD须要cache。Spark自动监控每一个节点缓存的使用状况,利用最近最少使用原则删除老旧的数据。若是想手动删除RDD,可使用RDD.unpersist()方法。
此外,能够利用不一样的存储级别存储每个被持久化的RDD。例如,它容许持久化集合到磁盘上,将集合做为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到Alluxio中。能够经过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用默认的存储级别-StorageLevel.MEMORY_ONLY。RDD根据useDisk、useMemory、 useOffHeap、deserialized、replication 5个参数的组合提供了经常使用的12种基本存储,完整的存储级别介绍以下。StorageLevel.scala的源码以下:
val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
StorageLevel是控制存储RDD的标志,每一个StorageLevel记录RDD是否使用memory,或使用ExternalBlockStore存储,若是RDD脱离了memory或ExternalBlockStore,是否扔掉RDD,是否保留数据在内存中的序列化格式,以及是否复制多个节点的RDD分区。另外,org.apache.spark.storage.StorageLevel是单实例(singleton)对象,包含了一些静态常量和经常使用的存储级别,且可用singleton对象工厂方法StorageLevel(...)建立定制化的存储级别。
Spark的多个存储级别意味着在内存利用率和CPU利用率间的不一样权衡。推荐经过下面的过程选择一个合适的存储级别:①若是RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。由于这是CPU利用率最高的选项,会使RDD上的操做尽量地快。②若是不适合用默认级别,就选择MEMORY_ONLY_SER。选择一个更快的序列化库提升对象的空间使用率,可是仍可以至关快地访问。③除非算子计算RDD花费较大或者须要过滤大量的数据,不要将RDD存储到磁盘上,不然重复计算一个分区,就会和从磁盘上读取数据同样慢。④若是但愿更快地恢复错误,能够利用replicated存储机制,全部的存储级别均可以经过replicated计算丢失的数据来支持完整的容错。另外,replicated的数据能在RDD上继续运行任务,而不须要重复计算丢失的数据。在拥有大量内存的环境中或者多应用程序的环境中,Off_Heap(将对象从堆中脱离出来序列化,而后存储在一大块内存中,这就像它存储到磁盘上同样,但它仍然在RAM内存中。Off_Heap对象在这种状态下不能直接使用,须进行序列化及反序列化。序列化和反序列化可能会影响性能,Off_Heap堆外内存不须要进行GC)。Off_Heap具备以下优点:Off_Heap运行多个执行者共享的Alluxio中相同的内存池,显著地减小GC。若是单个的Executor崩溃,缓存的数据也不会丢失。
Spark将执行模型抽象为通用的有向无环图计划(DAG),这能够将多Stage的任务串联或并行执行,从而不须要将Stage中间结果输出到HDFS中,当发生节点运行故障时,可有其余可用节点代替该故障节点运行。
Spark进行数据分片时,默认将数据放在内存中,若是内存放不下,一部分会放在磁盘上进行保存。
RDD.scala的coalesce算子代码以下:
/** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * Note: With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ // 从随机分区开始,将元素均匀分布在输出分区上 val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. // key的哈希码是key自己,HashPartitioner将它与总分区数进行取模运算 position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed // 包括一个shuffle步骤,使咱们的上游任务仍然是分布式的 new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { new CoalescedRDD(this, numPartitions, partitionCoalescer) } }
例如,在计算的过程当中,会产生不少的数据碎片,这时产生一个Partition可能会很是小,若是一个Partition很是小,每次都会消耗一个线程去处理,这时可能会下降它的处理效率,须要考虑把许多小的Partition合并成一个较大的Partition去处理,这样会提升效率。另外,有可能内存不是那么多,而每一个Partition的数据Block比较大,这时须要考虑把Partition变成更小的数据分片,这样让Spark处理更多的批次,可是不会出现OOM。