修改kafka topic offset 的方法

 

在spark JOB中消费kafka队列数据时,经过zookeeper记录了kafka的偏移量,有时数据量较大,JOB处理不过来,这事须要kafka修改偏移量offset,如:spa

 

 

开始尝试调用kafka内置的类kafka.tools.UpdateOffsetsInZK,修改offset,以下:调试

[bsauser@bsa222 kafka]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest config/consumer.properties tam_format_alarm
updating partition 0 with new offset: 6776033
updating partition 1 with new offset: 6782580
updating partition 2 with new offset: 6778624
updating partition 3 with new offset: 6786418
updating partition 4 with new offset: 6780299
updated the offset for 5 partitionsorm

可是重启spark JOB以后,发现并不成功。忽然想到应该跟新zookeeper中该消费group id的偏移量:blog

操做以前先查看下topic offset的最大值和最小值,进入kafka目录:队列

查看最小值:kafka

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -2it

结果:spark

 

查看最大值:io

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -1form

结果:

 

根据最大值最小值区间,设置kafka的offset。

先进入zookeeper安装目录,进入bin目录,执行./zkCli.sh命令,进入终端:

经过下面命令设置consumer group:bsatam.enhance_alarm topic:tam_format_alarm partition:1  offset 为 6776033:

set /consumers/enhance_alarm/offsets/tam_format_alarm/0 6776033

 

 

一样,设置其他的partition,partition 1-4 设置命令同样,须要修改partiton修改下最后面两个参数的值:

如partition 4的最大值是6780299,如今须要将offset 调为最大,即命令为:

set /consumers/enhance_alarm/offsets/tam_format_alarm/4 6780299

 

 调试完5个partition后,重启JOB,运行正常:

相关文章
相关标签/搜索