Kafka是众多MQ(Message Queue)中的一种,MQ广泛都会面临消息丢失的问题,形成消息丢失的缘由有不少种,例如:java
本文实验采用的Kafka是kafka_2.11-1.1.1版本node
考虑有一个Topic,只有一个分区(num.partitions=1),副本因子是2(replication.factor=2).shell
在Kafka中,生产者生产的消息要分配到不一样的partition中,而该Topic只有一个分区,因此只会生产进这个partition.apache
Note: 在Kafka中,生产者与消费者只与Partition leader 交互.测试
生产者只管发送数据到Kafka中,不在乎Kafka是否接收到. 这样丢数据的几率是比较高的.spa
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "0"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); // 发送1000W条数据 for (int i = 0; i < 10000000; i++) { kafkaProducer.send(new ProducerRecord<>("MillionMessage", i + "")); }
经过kafka-console-consumer.sh
进行消费,最后消费到9982846条数据(Processed a total of 9982846 messages). 丢失17154条数据,丢失的几率大概在0.017%左右..net
数据丢失的缘由就是生产者发送数据的时候,不会对数据进行确认. 在生产者发送数据的过程当中已经丢失. 数据没有到达Kafka中. 对应到上图中就是 生产者到parition leader的过程当中.日志
当ack=1时,只保证数据到达partition leader中,可是不会保证数据必定会所有传输到partition replicas中. 在上面代码的基础上,只须要修改第三行代码为props.put(ProducerConfig.ACKS_CONFIG, "1");
.code
测试发送100W条数据,最后也接收到100W条数据. 图片
这里测试接收到100W条数据,并非100%每次都能接收到100W条数据,按照第一张图片解释,数据只是100%到达了partition leader中,若是此时partition leader所在的broker挂掉了,会从partition replicas中选举出一个replica变为leader来继续对外提供服务.(Kafka保证每一个副本都处于不一样的broker中). 选举后的partition leader中可能没有最新写入的数据,这样就是形成数据丢失的问题.
当Ack=all时,生产者写入的效率会变慢,这里测试1000W条数写入Kafka中. 仍是改动上面第三行代码props.put(ProducerConfig.ACKS_CONFIG, "all");
. 写入1000W条数据耗时16147,约等于16s. 而ack=1时,耗时会在12s左右(在本地起的两个实例,同步会比较快). 虽然相差不大,可是在生产环境中,不一样的broker部署在不一样的机器中,数据同步耗时相对比较长.
目前Topic的partition的分布状况
➜ kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181 Topic:MillionMessage PartitionCount:1 ReplicationFactor:2 Configs: Topic: MillionMessage Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
当设置ack=1时,只保证partition leader接收到,因此作一下实验. 写入代码仍是按照前面操做逻辑,模式broker宕机 (使用kill命令).
在模拟宕机后,生产者打出日志以下:
WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (/192.168.0.251:9093) could not be established. Broker may not be available.
说明broker(192.168.0.251:9093)发生宕机, 最后消费数据为9997816. 数据丢失!!!
在进行上面这个实验后,重启挂掉的broker,查看主题:
➜ kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181 Topic:MillionMessage PartitionCount:1 ReplicationFactor:2 Configs: Topic: MillionMessage Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0 ➜ kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181 Topic:MillionMessage PartitionCount:1 ReplicationFactor:2 Configs: Topic: MillionMessage Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
最后broker成功上线,而且已经同步完成,可是该Topic的partition leader由broker=0中的副本担当. 因此此次实验须要挂掉broker=0.
仍是修改第三行代码props.put(ProducerConfig.ACKS_CONFIG, "all");
. 而且也是发送1000W条数据.
经过kafka-console-consumer.sh
消费到10000867条数据. 说明数据确定发生了重复消费,可是不能保证数据没有丢失. 这里本身写消费程序.
9999997 9999998 9999999 Processed a total of 10000867 messages
消费程序代码以下:
public static void main(String[] args) { final Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer"); final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Collections.singleton("MillionMessage")); final HashSet<String> total = new HashSet<>(); final HashSet<Long> totalOffset = new HashSet<>(); while (true) { final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(3)); for (ConsumerRecord<String, String> record : records) { total.add(record.key()); totalOffset.add(record.offset()); } System.out.println("total ==>" + total.size()); System.out.println("totalOffset ==>" + total.size()); } }
最后经过观察位移和key,均可以看出,确定没有消息丢失,可是根据实验,说明发生了消息重复.那么怎么解决消费重复呢?