Spark的checkpoint源码讲解

1、Checkpoint相关源码分为四个部分安全

一、Checkpoint的基本使用:spark_core   &   spark_streamingspa

二、初始化的源码线程

三、Checkpoint的job生成及执行的过程3d

四、读Checkpoint的过程rest

2、Checkpoint的基本使用blog

Checkpoint能够是还原药水。辅助Spark应用从故障中恢复。SparkStreaming宕机恢复,适合调度器有自动重试功能的。对于 SparkCore 则适合那些计算链条超级长或者计算耗时的
关键点进行 Checkpoint, 便于故障恢复 。
kafka


Checkpoint和persist从根本上是不同的:
源码

  一、Cache or persist:it

    Cache or persist保存了RDD的血统关系,假若有部分cache的数据丢失能够根据血缘关系从新生成。spark

  二、Checkpoint

    会将RDD数据写到hdfs这种安全的文件系统里面,而且抛弃了RDD血缘关系的记录。即便persist存储到了磁盘里面,在driver停掉以后会被删除,而checkpoint能够被下次启动使用。

 

Checkpoint基本使用

  对于spark_streaming的checkpoint:

    spark streaming有一个单独的线程CheckpointWriteHandler,每generate一个batch interval的RDD数据都会触发checkpoint操做。对于kafka的DirectKafkaInputDStreamCheckpointData,实质是重写DStreamCheckpointData的update和restore方法,这样checkpoint的数据就是topic,partition,fromOffset和untilOffset。更多请参考源码例子RecoverableNetworkWordCount

  对于spark_core的checkpoint: 

  docheckpoint: 

     

       recover:

  

2、Checkpoint的初始化源码

一、设置Checkpoint目录

二、调用Checkpoint方法,构建checkpointData

 

3、DoCheckpoint源码

在SparkContext的runjob方法中

进入以后

RDDCheckpointData中真正作Checkpoint返回一个新的RDD并清除掉依赖关系

ReliableRDDCheckpointData中真正进行Checkpoint操做

在该方法中

一、获取sc

二、建立输出目录

三、以Job的方式进行Checkpoint操做

四、将分区策略写入Checkpoint目录

 

4、读取Checkpoint数据

三个方法:

一、同一个Spark任务,共有了Checkpoint的RDD,在该RDD的iterator方法中

进入 computeOrReadCheckpoint

若是进行了 Checkpoint, 条件为真firstParent[T].iterator(split, context)其中, firstParent

/** Returns the first parent RDD */

接着是获取依赖

假如进行了Checkpoint,那么CheckpointRDD就是存在

在初始化Checkpoint的时候,咱们已经初始化了CheckpointData了。

二、RDD的计算链条失败,主动去读Checkpoint文件的过程

这个要求咱们的入口类在下面这个包

三、SparkStreaming的故障恢复

首先,看一下SteamingContext的须要

而后去读取Checkpoint

分两个步骤:

A、获取最新的Checkpoint目录

B、迭代找到最新的Checkpoint就返回

最后就是使用获取的Checkpoint去构建ssc

主要是作了一下动做

相关文章
相关标签/搜索