在spark JOB中消费kafka队列数据时,经过zookeeper记录了kafka的偏移量,有时数据量较大,JOB处理不过来,这事须要kafka修改偏移量offset,如:spa
开始尝试调用kafka内置的类kafka.tools.UpdateOffsetsInZK,修改offset,以下:调试
[bsauser@bsa222 kafka]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest config/consumer.properties tam_format_alarm
updating partition 0 with new offset: 6776033
updating partition 1 with new offset: 6782580
updating partition 2 with new offset: 6778624
updating partition 3 with new offset: 6786418
updating partition 4 with new offset: 6780299
updated the offset for 5 partitionsorm
可是重启spark JOB以后,发现并不成功。忽然想到应该跟新zookeeper中该消费group id的偏移量:blog
操做以前先查看下topic offset的最大值和最小值,进入kafka目录:队列
查看最小值:kafka
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -2it
结果:spark
查看最大值:io
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -1form
结果:
根据最大值最小值区间,设置kafka的offset。
先进入zookeeper安装目录,进入bin目录,执行./zkCli.sh命令,进入终端:
经过下面命令设置consumer group:bsatam.enhance_alarm topic:tam_format_alarm partition:1 offset 为 6776033:
set /consumers/enhance_alarm/offsets/tam_format_alarm/0 6776033
一样,设置其他的partition,partition 1-4 设置命令同样,须要修改partiton修改下最后面两个参数的值:
如partition 4的最大值是6780299,如今须要将offset 调为最大,即命令为:
set /consumers/enhance_alarm/offsets/tam_format_alarm/4 6780299
调试完5个partition后,重启JOB,运行正常: