Spark Streaming如何使用checkpoint容错

在互联网场景下,常常会有各类实时的数据处理,这种处理方式也就是流式计算,延迟一般也在毫秒级或者秒级,比较有表明性的几个开源框架,分别是Storm,Spark Streaming和Filnk。 

伦理片 http://www.dotdy.com/ 
曾经在一个项目里面用过阿里改造后的JStrom,总体感觉就是编程略复杂,在不使用Trident Api的时候是不能保证准确一次的数据处理的,可是能保证不丢数据,可是不保证数据重复,咱们在使用期间也出现过几回问题,bolt或者worker重启时候会致使大量数据重复计算,这个问无法解决,若是想解决就得使用Trident来保证,使用比较繁琐。 



最近在作一个实时流计算的项目,采用的是Spark Steaming,主要是对接Spark方便,固然后续有机会也会尝试很是具备潜力的Filnk,大体流程,就是消费kafka的数据,而后中间作业务上的一些计算,中间须要读取redis,计算的结果会落地在Hbase中,Spark2.x的Streaming能保证准确一次的数据处理,经过spark自己维护kafka的偏移量,可是也须要启用checkpoint来支持,由于你无法预料到可能出现的故障,好比断电,系统故障,或者JVM崩溃等等。 

鉴于上面的种种可能,Spark Streaming须要经过checkpoint来容错,以便于在任务失败的时候能够从checkpoint里面恢复。 

在Spark Streaming里面有两种类型的数据须要作checkpoint: 

A :元数据信息checkpoint 主要是驱动程序的恢复 

(1)配置 构建streaming应用程序的配置 

(2)Dstream操做 streaming程序中的一系列Dstream操做 

(3)没有完成的批处理 在运行队列中的批处理可是没有完成 

B:消费数据的checkpoint 

保存生成的RDD到一个可靠的存储系统中,经常使用的HDFS,一般有状态的数据横跨多个batch流的时候,须要作checkpoint 


总结下: 

元数据的checkpoint是用来恢复当驱动程序失败的场景下 
而数据自己或者RDD的checkpoint一般是用来容错有状态的数据处理失败的场景 


大多数场景下没有状态的数据或者不重要的数据是不须要激活checkpoint的,固然这会面临丢失少数数据的风险(一些已经消费了,可是没有处理的数据) 


如何在代码里面激活checkpoint? 
 redis

Java代码  收藏代码编程

  1. // 经过函数来建立或者从已有的checkpoint里面构建StreamingContext  
  2. def functionToCreateContext(): StreamingContext = {  
  3.   val ssc = new StreamingContext(...)   // new context  
  4.   val rdds = ssc.socketTextStream(...) // create DStreams  
  5.   ...  
  6.   ssc.checkpoint("/spark/kmd/checkpoint")   // 设置在HDFS上的checkpoint目录  
  7.   //设置经过间隔时间,定时持久checkpoint到hdfs上  
  8.   rdds.checkpoint(Seconds(batchDuration*5))  
  9.     
  10.   rdds.foreachRDD(rdd=>{  
  11.   //能够针对rdd每次调用checkpoint  
  12.   //注意上面设置了,定时持久checkpoint下面这个地方能够不用写  
  13.   rdd.checkpoint()  
  14.     
  15.   }  
  16.   )  
  17.   //返回ssc  
  18.   ssc  
  19. }  
  20.   
  21. def main(args:Array){  
  22. // 建立context  
  23. val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)  
  24. // 启动流计算  
  25. context.start()  
  26. context.awaitTermination()  
  27. }  



启动项目以后,咱们能在HDFS上看到对应目录下面的checkpoint内容 
 


这里有有两个坑: 


(1)处理的逻辑必须写在functionToCreateContext函数中,你要是直接写在main方法中,在首次启动后,kill关闭,再启动就会报错 

关闭命令 服务器

Java代码  收藏代码app

  1. yarn application -kill application_1482996264071_34284  


再次启动后报错信息 框架

Java代码  收藏代码socket

  1. has not been initialized when recovery from checkpoint  



解决方案:将逻辑写在函数中,不要写main方法中, 

(2)首次编写Spark Streaming程序中,由于处理逻辑没放在函数中,所有放在main函数中,虽然能正常运行,也能记录checkpoint数据,可是再次启动先报(1)的错误,而后你解决了,打包编译从新上传服务器运行,会发现依旧报错,此次的错误和(1)不同: ide

Java代码  收藏代码函数

  1. xxxx classs ClassNotFoundException  


但令你疑惑的是明明打的jar包中包含了,这个类,上一次还能正常运行此次为啥就不能了,问题就出在checkpoint上,由于checkpoint的元数据会记录jar的序列化的二进制文件,由于你改动过代码,而后从新编译,新的序列化jar文件,在checkpoint的记录中并不存在,因此就致使了上述错误,如何解决: 

也很是简单,删除checkpoint开头的的文件便可,不影响数据自己的checkpoint oop

Java代码  收藏代码spa

  1. hadoop fs -rm /spark/kmd/check_point/checkpoint*  

而后再次启动,发现一切ok,能从checkpoint恢复数据,而后kill掉又一次启动  就能正常工做了。  最后注意的是,虽然数据可靠性获得保障了,可是要谨慎的设置刷新间隔,这可能会影响吞吐量,由于每隔固定时间都要向HDFS上写入checkpoint数据,spark streaming官方推荐checkpoint定时持久的刷新间隔通常为批处理间隔的5到10倍是比较好的一个方式。

相关文章
相关标签/搜索