原创声明:做者:Arnold.zhao 博客园地址:https://www.cnblogs.com/zh94html
上一篇文章记录了kafka的副本机制和容错功能的说明,本篇则主要在上一篇文章的基础上,验证多分区Topic的消费者的功能验证;java
目录:redis
新建1副本,2分区的Topic作测试验证shell
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic arnold_consumer_test
查看对应的Topic分区状况bootstrap
[root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_consumer_test Topic:arnold_consumer_test PartitionCount:2 ReplicationFactor:1 Configs: Topic: arnold_consumer_test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: arnold_consumer_test Partition: 1 Leader: 2 Replicas: 2 Isr: 2
建立Topic每一个分区只设置了一个副本及主副本,因此如上可看到,各分区所在的broker节点的状况。api
配置消费者组group.id信息为:test-consumer-group-arnold-1缓存
修改 kafka下 config目录下的consumer.properties,修改内容为: bootstrap.servers=10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 group.id=test-consumer-group-arnold-1
分别在两台kafka服务器上的 kafka 主目录下启动两个消费者,并指定对应的消费者配置为 consumer.properties文件,消费的Topic 为arnold_consumer_test topic服务器
10.0.6.39启动消费者 [gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties 10.0.3.17 启动消费者 [root@dev kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties 两个消费者都是使用的相同的consumer.properties文件,即都是在一个消费组里面(为何要在两台服务器上启动两个消费者?不能在一个服务器上启动两个消费者吗?答:均可以,我之因此用两个不一样的服务器启动消费者是由于我当前39服务器启动消费者后,当前的shell进程就已经被占用了,处于等待状态,除非我再开一个39服务器的会话,从新开一个消费者。)
OK,消费者启动之后,观察下消费者和Topic分区的对应状况app
查看当前全部的消费组的列表信息分布式
[root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --list test-consumer-group-arnold-1 test-consumer-group-arnold test-consumer-group
如上,能够知道当前kafka服务器上已有的消费组分别是有三个,而咱们如今已经启动了的消费者组是test-consumer-group-arnold-1,因此,详细查看下消费组test-consumer-group-arnold-1的详细信息
[root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --describe --group test-consumer-group-arnold-1 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID arnold_consumer_test 0 19 19 0 consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae /10.0.6.39 consumer-1 arnold_consumer_test 1 19 19 0 consumer-1-917ecb37-3027-45de-b293-fe5125867432 /10.0.3.17 consumer-1
CURRENT-OFFSET: 当前消费组消费到的偏移量
LOG-END-OFFSET: 日志最后的偏移量
CURRENT-OFFSET = LOG-END-OFFSET 说明当前消费组已经所有消费了;
LAG:表示落后未消费的数据量
能够看到当前topic arnold_consumer_test 的Partition 0分区对应的消费者id是
consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae,该消费者对应的host是
10.0.6.39;经过上述内容就能够很清晰的知道,当前所启动的消费组下的两个消费者分别对应消费的是topic的那个分区,OK进行下测试
启动生产者生产数据
[root@dev bin]# ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_consumer_test >message1 >message2 按照kafka的消息路由策略,此时插入message1和message2两条消息,将会采用轮训的策略分别插入到两个分区中;(不清楚的话能够看下上篇的内容,这块都有作过说明) 此时partition0分区中将会接收到 message1的消息,partition2分区中将会接受到message2的消息,而后又分别由partition0分区所对应的 10.0.6.39的消费者消费到对应的数据,partition1同理 此时查看消费者的情况以下: 10.0.6.39 [root@dev kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties message1 10.0.3.17 [gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties message2
验证完毕,内容很简单,可是想要表达记录下来还真的是着实有些麻烦;因此,后续其余的一些规则,此处就直接放总结了,再也不列出来实验过程;
上述的内容,则也是均可以经过使用kafka-consumer-groups.sh命令,查看消费组下的消费者所对应的分区的状况便可得知对应的结果;
此时若是一个消费组已经在消费的状况下,此时又来了新的消费组进行消费,那就按照新的消费组规则来消费便可, 不会影响到其余消费组;举例,此时一个消费组三个消费者,在进行数据的消费;此时新来了一个消费组,只有一个消费者,那么此时这个消费者会消费全部的消费分区,不会和其余的消费组有任何的重叠,原理是,kafka的消费组其实在kafka中也是一个消费者topic分区的概念,分区中记录各个消费组的消费的offset位移信息,以此保证全部的消费者所消费的内容的offset位移互不影响,关于这个概念后续会详细说明一下,其实挺重要的。
另外,上述只作了部分的测试验证,便直接给出了最终的总结内容,对于部分测试内容并无再在本篇列出来(由于不少步骤其实都是重复的);可是,无心中发现了一个老哥的博客,已经对这方面也作了详细的测试,详情还须要看剩下的测试方式的,能够点击这个连接查看;
原创声明:做者:Arnold.zhao 博客园地址:https://www.cnblogs.com/zh94
原本这篇文章在上述的消费者和Partition的关系介绍完之后也就结束了,可是在写完之后,翻了下博客园的首页发现有一个推荐的kafka的帖子就顺手点进去想get点技能,而后结果有点伤心,文章中对于一些kafka数据重复的问题一笔带过。。甚至没有说明为何kakfa会出现数据重复的问题,只是说这是kafka的一种自我保护的机制产生的。。。这,就很伤心,因而本篇内容再对kafka数据重复的问题作一下说明,这些问题迟早也都要记录的。
对于kafka的使用上,其实Java代码的实现是相对简单的,网上的内容也有不少,可是若是对于kafka的一些基本概念就不熟悉的话,在使用过程当中便会出现不少懵逼的事情,因此这篇文章包括前两篇的文章,则都是重点在说kafka的一些机制的问题,固然后续对于kafka java的一些配置和实现,也会作一些记录说明。
回到问题自己,为何kafka有时候会出现消费者的数据重复问题?首先,消费者的数据自己是来自于生产者生产的数据,因此了解生产者所生产数据的可靠性机制,便和当前的问题有这直接的关联了。
生产者的数据可靠性,在配置上是根据kafka 生产者的 Request.required.acks 来配置生产者消息可靠性;
Request.required.acks=-1 (ISR全量同步确认,强可靠性保证) Request.required.acks = 1(leader 确认收到,无需保证其它副本也确认收到, 默认) Request.required.acks = 0 (不确认,可是吞吐量大)
在分布式的系统中,有一个对应的ACP理论,分别是:
可用性(Availability):在集群中一部分节点故障后,集群总体是否还能响应客户端的读写请求。(对数据更新具有高可用性)
一致性(Consistency):在分布式系统中的全部数据备份,在同一时刻是否一样的值。(等同于全部节点访问同一份最新的数据副本)
分区容忍性(Partition tolerance):以实际效果而言,分区至关于对通讯的时限要求。系统若是不能在时限内达成数据一致性,就意味着发生了分区的状况,必须就当前操做在C和A之间作出选择。
在分布式系统的设计中,没有一种设计能够同时知足一致性,可用性,分区容错性 3个特性;因此kafka也不例外;
若是想实现 kafka 配置为 CP(Consistency & Partition tolerance) 系统, 配置须要以下:
request.required.acks=-1 min.insync.replicas = ${N/2 + 1} unclean.leader.election.enable = false
如图所示,在 acks=-1 的状况下,新消息只有被 ISR 中的全部 follower(f1 和 f2, f3) 都从 leader 复制过去才会回 ack, ack 后,不管那种机器故障状况(所有或部分), 写入的 msg4,都不会丢失, 消息状态知足一致性 C 要求。
正常状况下,全部 follower 复制完成后,leader 回 producer ack。
异常状况下,若是当数据发送到 leader 后部分副本(f1 和 f2 同步), leader 挂了?此时任何 follower 都有可能变成新的 leader, producer 端会获得返回异常,此时producer端会从新发送数据,此时数据重复
解决消费重复的方式有不少啊,第一个就是你的业务场景无需在乎数据重复的问题,这个天然也就业务上解决了;第二个则是消费者本身作一层缓存过滤便可,由于生产数据重复毕竟是节点down机才会出现的问题,在down机的这一刹那没有被同步到follower的数据并不会不少,因此,以数据量为缓存,或者以时间为缓存均可以解决这个问题,好比加一个缓存区,只要判断数据重复了则再也不重复消费便可,而后当缓存的数据超过了1M,则清除一次缓存区;或者直接缓存到redis中,使用redis api来去重,定时清理一下redis中的数据也能够;
除了生产者数据重复外,还有一种问题是,生产者数据没有重复,可是消费者消费的数据重复了,这种问题,则是因为消费者offset自动提交的问题,如,消费者offset是1s提交一次,此时0.5s消费了5条数据,可是消费者尚未到1s自动提交的时候,消费者挂掉,此时已经消费的5条数据的偏移量因为没有提交到kafka,因此kafka中是没有记录到当前已经消费到的偏移量的,此时消费者重启,则会从5条数据前从新消费,这个问题通常比较好解决,由于大多数状况下若是使用消费者手动提交的模式,通常不会出现这种问题(手动提交的状况下若是出现异常,没有执行提交代码,那么代码中作好数据消费的回滚操做便可,更加可控);
除了数据重复的状况,另一种问题,则是kafka数据丢失的问题
首先按照上述的kafka的cp系统的配置方式,是绝对不会出现数据丢失的状况的,由于要么各节点不工做,要么各节点数据同步完成后,才会返回ack,此时消息不会丢失且消息状态一致;
除了配置kakfa为cp系统外,还能够配置kafka为AP(Availability & Partition tolerance)系统
request.required.acks=1 min.insync.replicas = 1 unclean.leader.election.enable = false
AP系统下生产者的吞吐量相对更高,可是因为request.required.acks 配置为1,即leader主副本收到消息便直接返回ack,此时若是leader接收到生产者消息后,返回了ack的标识,可是此时副本节点还都没有进行同步,此时leader节点挂掉,从新进行leader选举,新的follower选为leader后,则此时消息丢失;
因此根据合适的业务场景,使用合适的kafka模式则是最佳的选择。
原创声明:做者:Arnold.zhao 博客园地址:https://www.cnblogs.com/zh94
新建Topic ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic arnold_consumer_test 查看Topic详细信息 [root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_consumer_test Topic:arnold_consumer_test PartitionCount:2 ReplicationFactor:1 Configs: Topic: arnold_consumer_test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: arnold_consumer_test Partition: 1 Leader: 2 Replicas: 2 Isr: 2 启动消费者 bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties 启动生产者 ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_consumer_test 查看当前全部的消费组的列表信息 ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --list test-consumer-group-arnold-1 查看消费者组的详细信息 [root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --describe --group test-consumer-group-arnold-1 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID arnold_consumer_test 0 19 19 0 consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae /10.0.6.39 consumer-1 arnold_consumer_test 1 19 19 0 consumer-1-917ecb37-3027-45de-b293-fe5125867432 /10.0.3.17 consumer-1