如何管理Spark Streaming消费Kafka的偏移量(一)

最近工做有点忙,因此更新文章频率低了点,在这里给你们说声抱歉,前面已经写过在spark streaming中管理offset,但当时只知道怎么用,并非很了解为什么要那样用,最近一段时间又抽空看了一个github开源程序本身管理offset的源码,基本已经理解透彻了,固然这里面还包含了因为理解不透彻致使升级失败的一个案例,这个在下篇文章会分享出来。本篇咱们先从理论的角度聊聊在Spark Streaming集成Kafka时的offset状态如何管理。git

spark streaming 版本 2.1github

kafka 版本0.9.0.0微信

在这以前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的offset的方式是经过checkpoint来记录每一个批次的状态持久化到HDFS中,若是机器发生故障,或者程序故障中止,下次启动时候,仍然能够从checkpoint的目录中读取故障时候rdd的状态,便能接着上次处理的数据继续处理,但checkpoint方式最大的弊端是若是代码升级,新版本的jar不能复用旧版本的序列化状态,致使两个版本不能平滑过渡,结果就是要么丢数据,要么数据重复,因此官网搞的这个东西,几乎没有人敢在生产环境运行很是重要的流式项目。spa

因此比较通用的解决办法就是本身写代码管理spark streaming集成kafka时的offset,本身写代码管理offset,其实就是把每批次offset存储到一个外部的存储系统里面包括(Hbase,HDFS,Zookeeper,Kafka,DB等等),不用的什么存储系统, 都须要考虑到三种时刻的offset的状态,不然offset的状态不完整,就可能致使一些bug出现。图片

场景一:kafka

当一个新的spark streaming+kafka的流式项目第一次启动的时候,这个时候发现外部的存储系统并无记录任何有关这个topic全部分区的偏移量,因此就从 KafkaUtils.createDirectStream直接建立InputStream流,默认是从最新的偏移量消费,若是是第一次其实最新和最旧的偏移量时相等的都是0,而后在之后的每一个批次中都会把最新的offset给存储到外部存储系统中,不断的作更新。源码

场景二:it

当流式项目中止后再次启动,会首先从外部存储系统读取是否记录的有偏移量,若是有的话,就读取这个偏移量,而后把偏移量集合传入到KafkaUtils.createDirectStream中进行构建InputSteam,这样的话就能够接着上次中止后的偏移量继续处理,而后每一个批次中仍然的不断更新外部存储系统的偏移量,这样以来就可以无缝衔接了,不管是故障中止仍是升级应用,都是透明的处理。spark

场景三:后台

对正在运行的一个spark streaming+kafka的流式项目,咱们在程序运行期间增长了kafka的分区个数,请注意:这个时候新增的分区是不能被正在运行的流式项目感应到的,若是想要程序可以识别新增的分区,那么spark streaming应用程序必须得重启,同时若是你还使用的是本身写代码管理的offset就千万要注意,对已经存储的分区偏移量,也要把新增的分区插入进去,不然你运行的程序仍然读取的是原来的分区偏移量,这样就会丢失一部分数据。

总结:

若是本身管理kafka的偏移量,必定要注意上面的三个场景,若是考虑不全,就有可能出现诡异的问题。

有什么问题能够扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

输入图片说明

相关文章
相关标签/搜索