Checkpoint是Spark中比较高级的功能,适合整个处理链特别长,转换算子特别多的场景下。特别是对于那些运算时间长、须要较大运算量才能获得的RDD数据,以及计算链过长或大量依赖其余RDD的RDD数据。由于对于这类复杂应用,处理过程当中可能会反复调用某些RDD数据,一旦节点发生故障致使持久化数据丢失,则须要从新计算数据,致使整个计算耗时的上升,所以该机制可以有效提高Spark应用的容错性。缓存
Checkpoint本质上是对于复杂RDD计算链中,相对核心的,可能因为节点故障致使持久化数据丢失的数据,经过高可靠文件系统(例如HDFS)进行存储来实现容错与高可用性。ide
启用Checkpoint能够经过调用SparkContext的setCheckpointDir()方法,设置容错文件系统目录;而后经过RDD对象调用checkpoint()方法。当RDD对应的Job运行结束后,会启动单独Job将标记为checkpoint的RDD数据写入设置的文件系统中,从而实现高可用与持久化容错。当发生节点故障致使数据丢失,此时仍然能够从Checkpoint对应的文件系统中直接读取数据,而再也不须要从新计算数据。性能
与上一章介绍的Cache缓存机制不一样,当应用Cache缓存时,每计算一个须要缓存的分区数据会直接将其写入内存中。而如上所述,Checkpoint会在Job结束以后再启动新的Job执行持久化操做;也就意味着,若是此时数据没有缓存过,则会从新再计算一次用于Checkpoint持久化,无疑从性能开销上是不划算的。所以,建议在须要Checkpoint处理RDD上经过rdd.cache()或rdd.persist(StorageLevel.DISK_ONLY)方法事先执行缓存,避免二次计算的性能开销。spa
Checkpoint的实现过程包括4个阶段:初始化、标记数据、Checkpoint进行中以及Checkpoint完成。对象
初始化阶段:应用程序调用rdd.checkpoint()标记哪些RDD须要Checkpoint,标记后则代表该RDD受RDDCheckpointData管理。此外还须要设定Checkpoint存储路径(如HDFS)。blog
标记数据阶段:初始化以后RDDCheckpointData会将RDD数据逐一标记为MarkedForCheckpoint。内存
持久化处理阶段:每一个Job运行结束后会调用finalRdd.doCheckpoint()方法,该方法会沿着计算链回溯扫描直到发现须要Checkpoint的RDD数据为止,将其标记为CheckpointingInProgress并将持久化须要的配置广播到其余工做节点上的BlockManager。最后启动单独Job来完成Checkpoint操做写入磁盘文件系统。it
Checkpointed完成阶段:Job执行完Checkpoint操做后,会将对应RDD的血缘关系所有清除,并将RDD状态设置为checkpointed。而后将该RDD的父RDD设置为CheckpointRDD。io
图1:Checkpoint执行过程class
如何读取Checkpoint的数据?
读取RDD数据(rdd.partitions()方法)会经过RDDCheckpointData检查checkpointed的RDD数据,若是获取到则直接返回Array[Partition]。此外,当调用rdd.iterator()方法计算RDD的分区数据时,也会调用computeOrReadCheckpoint(split: Partition)查看该RDD作过Checkpoint;有则调用CheckpointRDD.iterator()方法直接读取数据。
Cache机制与Checkpoint机制二者最大的差别在于血缘关系(lineages)。Cache机制缓存数据到内存(大几率)或磁盘(不多)中,但不会改变RDD数据的血缘关系,一旦节点故障致使持久化失败,则会经过血缘关系从新计算RDD;而Checkpoint机制通常缓存数据到高可用的文件系统,如HDFS,并抹除RDD的血缘,将CheckpointRDD设置为其父RDD。