Spark之RDD容错原理及四大核心要点

1、Spark RDD容错原理

  RDD不一样的依赖关系致使Spark对不一样的依赖关系有不一样的处理方式。缓存

  对于宽依赖而言,因为宽依赖实质是指父RDD的一个分区会对应一个子RDD的多个分区,在此状况下出现部分计算结果丢失,单一计算丢失的数据没法达到效果,便采用从新计算该步骤中的全部数据,从而会致使计算数据重复;对于窄依赖而言,因为窄依赖实质是指父RDD的分区最多被一个子RDD使用,在此状况下出现部分计算的错误,因为计算结果的数据只与依赖的父RDD的相关数据有关,因此不须要从新计算全部数据,只从新计算出错部分的数据便可。框架

2、RDD容错的四大核心要点

  Spark框架层面的容错机制,主要分为三大层面(调度层、RDD血统层、Checkpoint层),在这三大层面中包括Spark RDD容错四大核心要点。分布式

  (1)Stage输出失败,上层调度器DAGScheduler重试。
  (2)Spark计算中,Task内部任务失败,底层调度器重试。
  (3)RDD Lineage血统中窄依赖、宽依赖计算。
  (4)Checkpoint缓存。oop

1.调度层(包含DAG生成和Task重算两大核心)

  从调度层面讲,错误主要出如今两个方面,分别是在Stage输出时出错和在计算时出错。性能

  1)DAG生成层

  Stage输出失败,上层调度器DAGScheduler会进行重试,DAGScheduler.scala的resubmitFailedStages的源码以下。fetch

  /**
   * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
   * the last fetch failure.
   */
  private[scheduler] def resubmitFailedStages() {
    // 判断是否存在失败的Stages
    if (failedStages.size > 0) {
      // Failed stages may be removed by job cancellation, so failed might be empty even if
      // the ResubmitFailedStages event has been scheduled.
      // 失败的阶段能够经过做业取消删除,若是ResubmitFailedStages事件已调度,失败将是空值
      logInfo("Resubmitting failed stages")
      clearCacheLocs()
      // 获取全部失败Stage的列表
      val failedStagesCopy = failedStages.toArray
      // 清空failedStages
      failedStages.clear()
      // 对以前获取全部失败的Stage,根据jobId排序后逐一重试
      for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
        submitStage(stage)
      }
    }
    submitWaitingStages()
  }

  2)Task计算层

  Spark计算过程当中,计算内部某个Task任务出现失败,底层调度器会对此Task进行若干次重试(默认4次)。TaskSetManager.scala的handleFailedTask的源码以下。spa

/**
   * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
   * DAG Scheduler.
   */
  def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {

    ......

    if (!isZombie && state != TaskState.KILLED
        && reason.isInstanceOf[TaskFailedReason]
        && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
      assert (null != failureReason)
      // 对失败的Task的numFailures进行计数加1
      numFailures(index) += 1
      // 判断失败的Task计数是否大于设定的最大失败次数,若是大于,则输出日志,并再也不重试
      if (numFailures(index) >= maxTaskFailures) {
        logError("Task %d in stage %s failed %d times; aborting job".format(
          index, taskSet.id, maxTaskFailures))
        abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
          .format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
        return
      }
    }
    // 若是运行的Task为0时,则完成Task步骤
    maybeFinishTaskSet()
  }

2.RDD Lineage血统层容错

  Spark中RDD采用高度受限的分布式共享内存,且新的RDD的产生只可以经过其余RDD上的批量操做来建立,依赖于以RDD的Lineage为核心的容错处理,在迭代计算方面比Hadoop快20多倍,同时还能够在5~7s内交互式地查询TB级别的数据集。scala

  Spark RDD实现基于Lineage的容错机制,基于RDD的各项transformation构成了compute chain,在部分计算结果丢失的时候能够根据Lineage从新恢复计算。日志

  (1)在窄依赖中,在子RDD的分区丢失,要重算父RDD分区时,父RDD相应分区的全部数据都是子RDD分区的数据,并不存在冗余计算。
  (2)在宽依赖状况下,丢失一个子RDD分区,重算的每一个父RDD的每一个分区的全部数据并非都给丢失的子RDD分区用的,会有一部分数据至关于对应的是未丢失的子RDD分区中须要的数据,这样就会产生冗余计算开销和巨大的性能浪费。orm

3.checkpoint层容错

  Spark checkpoint经过将RDD写入Disk做检查点,是Spark lineage容错的辅助,lineage过长会形成容错成本太高,这时在中间阶段作检查点容错,若是以后有节点出现问题而丢失分区,从作检查点的RDD开始重作Lineage,就会减小开销。

  checkpoint主要适用于如下两种状况:

  (1)DAG中的Lineage过长,若是重算,开销太大,如PageRank、ALS等。
  (2)尤为适合在宽依赖上做checkpoint,这个时候就能够避免为Lineage从新计算而带来的冗余计算。

相关文章
相关标签/搜索