Kafka生产者保证数据可靠传输

Kafka是众多MQ(Message Queue)中的一种,MQ广泛都会面临消息丢失的问题,形成消息丢失的缘由有不少种,例如:java

  1. 生产者将消息发送,可是不确保消息到达MQ中
  2. MQ接收到消息,可是消息丢失了
  3. ...

本文实验采用的Kafka是kafka_2.11-1.1.1版本node

Kafka发送消息模型

考虑有一个Topic,只有一个分区(num.partitions=1),副本因子是2(replication.factor=2).shell

简单发送消息模型

在Kafka中,生产者生产的消息要分配到不一样的partition中,而该Topic只有一个分区,因此只会生产进这个partition.apache

Note: 在Kafka中,生产者与消费者只与Partition leader 交互.测试

当ack=0时

生产者只管发送数据到Kafka中,不在乎Kafka是否接收到. 这样丢数据的几率是比较高的.spa

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// 发送1000W条数据
for (int i = 0; i < 10000000; i++) {
    kafkaProducer.send(new ProducerRecord<>("MillionMessage", i + ""));
}

经过kafka-console-consumer.sh进行消费,最后消费到9982846条数据(Processed a total of 9982846 messages). 丢失17154条数据,丢失的几率大概在0.017%左右..net

数据丢失的缘由就是生产者发送数据的时候,不会对数据进行确认. 在生产者发送数据的过程当中已经丢失. 数据没有到达Kafka中. 对应到上图中就是 生产者到parition leader的过程当中.日志

当Ack=1时

当ack=1时,只保证数据到达partition leader中,可是不会保证数据必定会所有传输到partition replicas中. 在上面代码的基础上,只须要修改第三行代码为props.put(ProducerConfig.ACKS_CONFIG, "1");.code

测试发送100W条数据,最后也接收到100W条数据. 图片

Ack=1,发送100W条数据

这里测试接收到100W条数据,并非100%每次都能接收到100W条数据,按照第一张图片解释,数据只是100%到达了partition leader中,若是此时partition leader所在的broker挂掉了,会从partition replicas中选举出一个replica变为leader来继续对外提供服务.(Kafka保证每一个副本都处于不一样的broker中). 选举后的partition leader中可能没有最新写入的数据,这样就是形成数据丢失的问题.

当Ack=all时

当Ack=all时,生产者写入的效率会变慢,这里测试1000W条数写入Kafka中. 仍是改动上面第三行代码props.put(ProducerConfig.ACKS_CONFIG, "all");. 写入1000W条数据耗时16147,约等于16s. 而ack=1时,耗时会在12s左右(在本地起的两个实例,同步会比较快). 虽然相差不大,可是在生产环境中,不一样的broker部署在不一样的机器中,数据同步耗时相对比较长.

当发送数据的时候,有broker挂掉了

目前Topic的partition的分布状况

➜  kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181
Topic:MillionMessage    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: MillionMessage    Partition: 0    Leader: 1    Replicas: 1,0    Isr: 1,0

设置ack=1,挂掉broker=1

当设置ack=1时,只保证partition leader接收到,因此作一下实验. 写入代码仍是按照前面操做逻辑,模式broker宕机 (使用kill命令).

在模拟宕机后,生产者打出日志以下:

WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (/192.168.0.251:9093) could not be established. Broker may not be available.

说明broker(192.168.0.251:9093)发生宕机, 最后消费数据为9997816. 数据丢失!!!

ack=1,partition leader宕机

设置ack=all,挂掉broker=0

在进行上面这个实验后,重启挂掉的broker,查看主题:

➜  kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181
Topic:MillionMessage    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: MillionMessage    Partition: 0    Leader: 0    Replicas: 1,0    Isr: 0
➜  kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181
Topic:MillionMessage    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: MillionMessage    Partition: 0    Leader: 0    Replicas: 1,0    Isr: 0,1

最后broker成功上线,而且已经同步完成,可是该Topic的partition leader由broker=0中的副本担当. 因此此次实验须要挂掉broker=0.

仍是修改第三行代码props.put(ProducerConfig.ACKS_CONFIG, "all");. 而且也是发送1000W条数据.

经过kafka-console-consumer.sh消费到10000867条数据. 说明数据确定发生了重复消费,可是不能保证数据没有丢失. 这里本身写消费程序.

9999997
9999998
9999999
Processed a total of 10000867 messages

消费程序代码以下:

public static void main(String[] args) {
    final Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer");
    final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Collections.singleton("MillionMessage"));
    final HashSet<String> total = new HashSet<>();
    final HashSet<Long> totalOffset = new HashSet<>();
    while (true) {
        final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(3));
        for (ConsumerRecord<String, String> record : records) {
            total.add(record.key());
            totalOffset.add(record.offset());
        }
        System.out.println("total ==>" + total.size());
        System.out.println("totalOffset ==>" + total.size());
    }
}

最后经过观察位移和key,均可以看出,确定没有消息丢失,可是根据实验,说明发生了消息重复.那么怎么解决消费重复呢?

消费者输出

相关文章
相关标签/搜索