关于SparkStreaming的checkpoint的弊端

框架版本html

spark2.1.0mysql

kafka0.9.0.0git

当使用sparkstreaming处理流式数据的时候,它的数据源搭档大部分都是Kafka,尤为是在互联网公司颇为常见。 当他们集成的时候咱们须要重点考虑就是若是程序发生故障,或者升级重启,或者集群宕机,它究竟可否作到数据不丢不重呢?github

也就是一般咱们所说的高可靠和稳定性,一般框架里面都带有不一样层次的消息保证机制,通常来讲有三种就是:redis

at most once 最多一次
at least once 最少一次
exactly once  准确一次

在storm里面是经过ack和Trident,在sparkstreaming里面,若是是1.3版本以前是经过Receiver方式读取kafka数据,1.3以后经过Direct Approach方式直接读取kafka的数据,直接分配每一个Batch及RDD最新的Topic partition offset,任务运行后使用kafka的Simple Consumer API去获取那一段的offset的数据,这样的好处是避免了原来Receiver接受数据宕机带来的数据可靠性风险,至关于原来的数据是在内存中而如今的数据是在kafka的磁盘中,经过偏移量可随时再次消费数据,从而实现了数据的Exactly Once处理,此外还有个不一样之处在于1.3以后,使用的checkpoint保存当前消费的kafka的offset,而以前用zk保存的,这就是今天这篇文章重点吐槽的地方。sql

在sparkstreaming如何作到数据不丢失呢?apache

(1)使用checkpoint (2)本身维护kafka偏移量微信

checkpoint配合kafka可以在特定环境下保证不丢不重,注意为何要加上特定环境呢,这里有一些坑,checkpoint是对sparkstreaming运行过程当中的元数据和 每次rdds的数据状态保存到一个持久化系统中,固然这里面也包含了offset,通常是HDFS,S3,若是程序挂了,或者集群挂了,下次启动仍然可以从checkpoint中恢复,从而作到生产环境的7*24高可用。app

可是checkpoint的最大的弊端在于,一旦你的流式程序代码或配置改变了,或者更新迭代新功能了,这个时候,你先停旧的sparkstreaming程序,而后新的程序打包编译后执行运行,会发现两种状况: (1)启动报错,反序列化异常 (2)启动正常,可是运行的代码仍然是上一次的程序的代码。框架

为何会出现上面的两种状况,这是由于checkpoint第一次持久化的时候会把整个相关的jar给序列化成一个二进制文件,每次重启都会从里面恢复,可是当你新的 程序打包以后序列化加载的仍然是旧的序列化文件,这就会致使报错或者依旧执行旧代码。有的同窗可能会说,既然如此,直接把上次的checkpoint删除了,不就能启动了吗? 确实是能启动,可是一旦你删除了旧的checkpoint,新启动的程序,只能从kafka的smallest或者largest的偏移量消费,默认是从最新的,若是是最新的,而不是上一次程序中止的那个偏移量 就会致使有数据丢失,若是是老的,那么就会致使数据重复。无论怎么样搞,都有问题。 https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#upgrading-application-code

针对这种问题,spark官网给出了2种解决办法:

(1)旧的不停机,新的程序继续启动,两个程序并存一段时间消费。 评价:仍然有丢重复消费的可能 (2)停机的时候,记录下最后一次的偏移量,而后新恢复的程序读取这个偏移量继续工做,从而达到不丢消息。 评价:官网没有给出具体怎么操做,只是给了个思路

第二种思路是正确的,但还须要本身维护一个offset状态,这样以来checkpoint这个功能只能在程序写好以后不容许再次变更,但能够重启的状况保证高可靠。

但实际状况是大多数公司的代码都会频繁迭代和升级,与checkpoint恰好相悖,这样以来checkpoint的做用便显的有点没用了,既然仍是须要本身维护offset状态, 那么不用checkpoint也罢,彻底本身维护offset状态到zk中便可。因此果断弃用checkpoint,采用本身维护offset。其原理以下:

首次启动,先从zk中找是否有上次存储的偏移量,若是没有就从最新的消费,而后保存偏移量至zk中

若是从zk中找到了偏移量,那么就从指定的偏移量处开始消费处理,每一个批处理处理完毕后,都会更新新的offset到zk中, 这样以来不管是程序故障,仍是宕机,再次启动后都会从上次的消费的偏移量处继续开始消费,并且程序的升级或功能改动新版本的发布都能正常运行 并作到了消息不丢。

须要注意的是,虽然上游可以作到准确一次的消费,可是下游的落地存储输出,好比写入hbase,redis,mysql,es等等若是失败了,整条消息依旧会失败,这个彻底要靠本身的设计了,要么记录log,针对特定数据记录,若是失败按期 从新打入kafka走程序恢复或者手动恢复。

或者设计存储的时候,有复合主键,把偏移量提早,就算重复消费,但主键同样,最终只会有一条数据落地,这个要分场景和具体业务结合使用了。

回到主题,本身维护kafka的offset状态,如何作? github上已经有大神贡献了,咱们只须要拿过来稍加改动便可,使用本身维护的offset以后,就没有必要再使用 checkpoint,github链接以下,有兴趣的朋友能够了解下:

https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala

使用zk维护offset也是比较不错的选择,若是将checkpoint存储在HDFS上,每隔几秒都会向HDFS上进行一次写入操做并且大部分都是小文件,且不说写入性能怎么样,就小文件过多,对整个Hadoop集群都不太友好。由于只记录偏移量信息,因此数据量很是小,zk做为一个分布式高可靠的的内存文件系统,很是适合这种场景。

全部参考连接:

http://aseigneurin.github.io/

http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html

http://why-not-learn-something.blogspot.jp/2016/08/upgrading-running-spark-streaming.html

http://www.binwang.me/2015-11-03-the-proper-way-to-use-spark-checkpoint.html

https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala

https://github.com/ippontech/spark-kafka-source

有什么问题能够扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

输入图片说明

相关文章
相关标签/搜索