如何管理Spark Streaming消费Kafka的偏移量(二)

上篇文章,讨论了在spark streaming中管理消费kafka的偏移量的方式,本篇就接着聊聊上次说升级失败的案例。并发

事情发生一个月前,因为当时咱们想提升spark streaming程序的并行处理性能,因而须要增长kafka分区个数,,这里须要说下,在新版本spark streaming和kafka的集成中,按照官网的建议 spark streaming的executors的数量要和kafka的partition的个数保持相等,这样每个executor处理一个kafka partition的数据,效率是最高的。若是executors的数量大于kafka的分区个数,其实多余的executors至关因而不会处理任何数据,这部分的进程实际上是白白浪费性能。运维

若是executor的个数小于kafka partition的个数,那么其实有一些executors进程是须要处理多个partition分区的数据的,因此官网建议spark executors的进程数和kafka partition的个数要保持一致。性能

那么问题来了,若是想要提升spark streaming的并行处理性能,只能增长kafka的分区了,给kafka增长分区比较容易,直接执行一个命令便可,不过这里须要注意,kafka的分区只能增长不能减小,因此添加分区要考虑到底多少个才合适。测试

接下来咱们便增长了kafka分区的数量,同时修改了spark streaming的executors的个数和kafka的分区个数一一对应,而后就启动了流程序,结果出现了比较诡异的问题,表现以下:spa

造几条测试数据打入kafka中,发现程序老是只能处理其中的一部分数据,而每次总有一些数据丢失。按理说代码没有任何改动,只是增长kafka的分区和spark streaming的executors的个数,应该不会出现问题才对,因而又从新测了原来的旧分区和程序,发现没有问题,通过对比发现问题只会出如今kafka新增分区后,而后出现这种丢数据的状况。而后和运维同窗一块儿看了新增的kafka的分区的磁盘目录是否有数据落入,经查询发现新的分区确实已经有数据进入了,这就很奇怪了丢的数据究竟是怎么丢的?进程

最后我又检查了咱们本身保存的kafka的offset,发现里面的偏移量居然没有新增kafka的分区的偏移量,至此,终于找到问题所在,也就是说,若是没有新增分区的偏移量,那么程序运行时是不会处理新增分区的数据,而咱们新增的分区确确实实有数据落入了,这就是为啥前面说的诡异的丢失数据的缘由,实际上是由于新增kafka的分区的数据程序并无处理过而这个缘由正是咱们的本身保存offset中没有记录新增分区的偏移量。kafka

问题找到了,那么如何修复线上丢失的数据呢?源码

当时想了一个比较笨的方法,由于咱们的kafka线上默认是保留7天的数据,旧分区的数据已经处理过,就是新增的分区数据没有处理,因此咱们删除了已经处理过的旧的分区的数据,而后在业务流量底峰时期,从新启了流程序,让其从最先的数据开始消费处理,这样以来由于旧的分区被删除,只有新分区有数据,因此至关因而把丢失的那部分数据给修复了。修复完成后,又把程序中止,而后配置从最新的偏移量开始处理,这样偏移量里面就能识别到新增的分区,而后就继续正常处理便可。it

注意这里面的删除kafka旧分区的数据,是一个比较危险的操做,它要求kafka的节点须要所有重启才能生效,因此除非特殊状况,不要使用这么危险的方式。spark

后来,仔细分析了咱们使用的一个开源程序管理offset的源码,发现这个程序有一点bug,没有考虑到kafka新增分区的状况,也就是说若是你的kafka分区增长了,你的程序在重启后是识别不到新增的分区的,因此若是新增的分区还有数据进入,那么你的程序必定会丢数据,由于扩展kafka分区这个操做,并不常见,因此这个bug比较难易触发。

知道缘由后,解决起来比较容易了,就是每次启动流程序前,对比一下当前咱们本身保存的kafka的分区的个数和从zookeeper里面的存的topic的分区个数是否一致,若是不一致,就把新增的分区给添加到咱们本身保存的信息中,并发偏移量初始化成0,这样以来在程序启动后,就会自动识别新增分区的数据。

因此,回过头来看上面的那个问题,最简单优雅的解决方法就是,直接手动修改咱们本身的保存的kafka的分区偏移量信息,把新增的分区给加入进去,而后重启流程序便可。

这个案例也就是我上篇文章所说的第三个场景的case,若是是本身手动管理kafka的offset必定要注意兼容新增分区后的这种状况,不然程序可能会出现丢失数据的问题。

相关文章
相关标签/搜索