首先kafka监听不获得数据,检查以下mysql
@Component 犯了最不该该出差错的问题
若是出现监听不到数据的问题,那么就试试更改方法一二,若是不能够在去试试方法三,以前出现这个问题也是查过 通常查到都会说 “低版本的服务器接收不到高版本的生产者发送的消息”,可是净由测试使用 用1.0.5RELEASE 和 2.6.3反复测试,并无任何的问题。redis
若是按照版本一致,那么根本就不现实,由于可能不一样的项目,springboot版本不一致的话,可能有的springboot版本低,那么你还得要求本身维护项目版本升级?若是出现第四种状况就无话可说了。spring
重复数据的发送问题以下sql
目前我是使用的Redis进行的排重法,用的是Redis中的set,保证里面不存在重复,保证Redis里面不会存入太多的脏数据。并按期清理apache
粘贴一下个人排重(Redis排重法)bootstrap
//kafka prefix String cache = "kafka_cache"; //kafka suffix Calendar c = Calendar.getInstance(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //0点,目前是为了设置为这一天的固定时间。这个彻底能够去写个工具类本身弄,为了看的更清楚,麻烦了一点的写入 SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00"); String gtimeStart = sdf2.format(c.getTime()); long time = sdf.parse(gtimeStart).getTime(); //此位置为了设置是不是新的一天,新的一天须要设置定时时间,保证redis中不会存储太多无用数据 Boolean flag = false; //数据接收 Set<String> range = new HashSet<>(); //判断是否存在 if (redisTemplate.hasKey(cache + time)) { //存在则取出这个set range = redisTemplate.opsForSet().members(cache + time); }else { //不存在,则为下面过时时间的设置铺垫 flag = true; } //判断监听到的数据是不是重复 if (range.contains("测试须要")) { //重复则排出,根据逻辑本身修改 continue; } else { //添加进去 redisTemplate.opsForSet().add(cache + time, i+""); if (flag){ //设置为24小时,保证新一天使用,以前使用的存储会消失掉 redisTemplate.expire(cache + time,24,TimeUnit.HOURS); //不会在进入这个里面,若是屡次的存入过时时间,那么这个key的过时时间就永远是24小时,一直就不会过时 flag = false; } }
缘由是由于在不一样group-id之下,kafka接收到之后,会给监听他的每个组发送一个他所收到的消息,可是两个消费端监听同一个group-id,那么就只有一个消费端能够消费到。springboot
# 指定kafka 代理地址,能够多个,用逗号间隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id= test
# 是否自动提交
spring.kafka.consumer.enable-auto-commit= true
# 提交间隔的毫秒
spring.kafka.consumer.auto-commit-interval.ms=60000
# 最大轮询的次数
spring.kafka.consumer.max-poll-records=1
# 将偏移量重置为最新偏移量
spring.kafka.consumer.auto-offset-reset=earliest
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
若有什么地方错误或者不明白请下方评论指出,谢谢。讨论解决使咱们共同进步服务器