深刻理解Kafka必知必会(上)

Kafka的用途有哪些?使用场景如何?

  • 消息系统: Kafka 和传统的消息系统(也称做消息中间件)都具有系统解耦、冗余存储、流量削峰、缓冲、异步通讯、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。java

  • 存储系统: Kafka 把消息持久化到磁盘,相比于其余基于内存存储的系统而言,有效地下降了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,咱们能够把 Kafka 做为长期的数据存储系统来使用,只须要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能便可。shell

  • 流式处理平台: Kafka 不只为每一个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,好比窗口、链接、变换和聚合等各种操做。bootstrap

Kafka中的ISR、AR又表明什么?ISR的伸缩又指什么

分区中的全部副本统称为 AR(Assigned Replicas)。全部与 leader 副本保持必定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。数组

ISR的伸缩: leader 副本负责维护和跟踪 ISR 集合中全部 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。若是 OSR 集合中有 follower 副本“追上”了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。默认状况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(不过这个原则也能够经过修改相应的参数配置来改变)。缓存

replica.lag.time.max.ms : 这个参数的含义是 Follower 副本可以落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。安全

unclean.leader.election.enable:是否容许 Unclean 领导者选举。开启 Unclean 领导者选举可能会形成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于中止对外提供服务,所以提高了高可用性。网络

Kafka中的HW、LEO、LSO、LW等分别表明什么?

HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 以前的消息。多线程

LSO是LogStartOffset,通常状况下,日志文件的起始偏移量 logStartOffset 等于第一个日志分段的 baseOffset,但这并非绝对的,logStartOffset 的值能够经过 DeleteRecordsRequest 请求(好比使用 KafkaAdminClient 的 deleteRecords()方法、使用 kafka-delete-records.sh 脚本、日志的清理和截断等操做进行修改。 架构

如上图所示,它表明一个日志文件,这个日志文件中有9条消息,第一条消息的 offset(LogStartOffset)为0,最后一条消息的 offset 为8,offset 为9的消息用虚线框表示,表明下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取到 offset 在0至5之间的消息,而 offset 为6的消息对消费者而言是不可见的。框架

LEO 是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset,上图中 offset 为9的位置即为当前日志文件的 LEO,LEO 的大小至关于当前日志分区中最后一条消息的 offset 值加1。分区 ISR 集合中的每一个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 以前的消息。

LW 是 Low Watermark 的缩写,俗称“低水位”,表明 AR 集合中最小的 logStartOffset 值。副本的拉取请求(FetchRequest,它有可能触发新建日志分段而旧的被清理,进而致使 logStartOffset 的增长)和删除消息请求(DeleteRecordRequest)都有可能促使 LW 的增加。

Kafka中是怎么体现消息顺序性的?

能够经过分区策略体现消息顺序性。 分区策略有轮询策略、随机策略、按消息键保序策略。

按消息键保序策略:一旦消息被定义了 Key,那么你就能够保证同一个 Key 的全部消息都进入到相同的分区里面,因为每一个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
复制代码

Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

  • 序列化器:生产者须要用序列化器(Serializer)把对象转换成字节数组才能经过网络发送给 Kafka。而在对侧,消费者须要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。
  • 分区器:分区器的做用就是为消息分配分区。若是消息 ProducerRecord 中没有指定 partition 字段,那么就须要依赖分区器,根据 key 这个字段来计算 partition 的值。
  • Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。
    • 生产者拦截器既能够用来在消息发送前作一些准备工做,好比按照某个规则过滤不符合要求的消息、修改消息的内容等,也能够用来在发送回调逻辑前作一些定制化的需求,好比统计类工做。
    • 消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操做。

消息在经过 send() 方法发往 broker 的过程当中,有可能须要通过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列做用以后才能被真正地发往 broker。拦截器(下一章会详细介绍)通常不是必需的,而序列化器是必需的。消息通过序列化以后就须要肯定它发往的分区,若是消息 ProducerRecord 中指定了 partition 字段,那么就不须要分区器的做用,由于 partition 表明的就是所要发往的分区号。

处理顺序 :拦截器->序列化器->分区器

KafkaProducer 在将消息序列化和计算分区以前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操做。 而后生产者须要用序列化器(Serializer)把对象转换成字节数组才能经过网络发送给 Kafka。 最后可能会被发往分区器为消息分配分区。

Kafka生产者客户端的总体结构是什么样子的?

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。 在主线程中由 KafkaProducer 建立消息,而后经过可能的拦截器、序列化器和分区器的做用以后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。 Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。 RecordAccumulator 主要用来缓存消息以便 Sender 线程能够批量发送,进而减小网络传输的资源消耗以提高性能。

Kafka生产者客户端中使用了几个线程来处理?分别是什么?

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 建立消息,而后经过可能的拦截器、序列化器和分区器的做用以后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

Kafka的旧版Scala的消费者客户端的设计有什么缺陷?

老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各类各样的协调管理。将位移保存在 ZooKeeper 外部系统的作法,最显而易见的好处就是减小了 Kafka Broker 端的状态保存开销。

ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新倒是一个很是频繁的操做。这种大吞吐量的写操做会极大地拖慢 ZooKeeper 集群的性能

“消费组中的消费者个数若是超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?若是正确,那么有没有什么hack的手段?

通常来讲若是消费者过多,出现了消费者的个数大于分区个数的状况,就会有消费者分配不到任何分区。

开发者能够继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者均可以消费订阅主题的全部分区:

public class BroadcastAssignor extends AbstractPartitionAssignor{
    @Override
    public String name() {
        return "broadcast";
    }

    private Map<String, List<String>> consumersPerTopic(
            Map<String, Subscription> consumerMetadata) {
        (具体实现请参考RandomAssignor中的consumersPerTopic()方法)
    }

    @Override
    public Map<String, List<TopicPartition>> assign(
            Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic =
                consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
		   //Java8
        subscriptions.keySet().forEach(memberId ->
                assignment.put(memberId, new ArrayList<>()));
		   //针对每个主题,为每个订阅的消费者分配全部的分区
        consumersPerTopic.entrySet().forEach(topicEntry->{
            String topic = topicEntry.getKey();
            List<String> members = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null || members.isEmpty())
                return;
            List<TopicPartition> partitions = AbstractPartitionAssignor
                    .partitions(topic, numPartitionsForTopic);
            if (!partitions.isEmpty()) {
                members.forEach(memberId ->
                        assignment.get(memberId).addAll(partitions));
            }
        });
        return assignment;
    }
}
复制代码

注意组内广播的这种实现方式会有一个严重的问题—默认的消费位移的提交会失效。

消费者提交消费位移时提交的是当前消费到的最新消息的offset仍是offset+1?

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。 当前消费者须要提交的消费位移是offset+1

有哪些情形会形成重复消费?

  1. Rebalance 一个consumer正在消费一个分区的一条消息,尚未消费完,发生了rebalance(加入了一个consumer),从而致使这条消息没有消费成功,rebalance后,另外一个consumer又把这条消息消费一遍。
  2. 消费者端手动提交 若是先消费消息,再更新offset位置,致使消息重复消费。
  3. 消费者端自动提交 设置offset为自动提交,关闭kafka时,若是在close以前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
  4. 生产者端 生产者由于业务问题致使的宕机,在重启以后可能数据会重发

那些情景下会形成消息漏消费?

  1. 自动提交 设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时恰好把线程kill掉,那么offset已经提交,可是数据未处理,致使这部份内存中的数据丢失。
  2. 生产者发送消息 发送消息设置的是fire-and-forget(发后即忘),它只管往 Kafka 中发送消息而并不关心消息是否正确到达。不过在某些时候(好比发生不可重试异常时)会形成消息的丢失。这种发送方式的性能最高,可靠性也最差。
  3. 消费者端 先提交位移,可是消息还没消费完就宕机了,形成了消息没有被消费。自动位移提交同理
  4. acks没有设置为all 若是在broker还没把消息同步到其余broker的时候宕机了,那么消息将会丢失

KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?

  1. 线程封闭,即为每一个线程实例化一个 KafkaConsumer 对象

一个线程对应一个 KafkaConsumer 实例,咱们能够称之为消费线程。一个消费线程能够消费一个或多个分区中的消息,全部的消费线程都隶属于同一个消费组。

  1. 消费者程序使用单或多线程获取消息,同时建立多个消费线程执行消息处理逻辑。 获取消息的线程能够是一个,也能够是多个,每一个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来作,从而实现消息获取与消息处理的真正解耦。具体架构以下图所示:

两个方案对比:

简述消费者与消费组之间的关系

  1. Consumer Group 下能够有一个或多个 Consumer 实例。这里的实例能够是一个单独的进程,也能够是同一进程下的线程。在实际场景中,使用进程更为常见一些。
  2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识惟一的一个 Consumer Group。
  3. Consumer Group 下全部实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区固然也能够被其余的 Group 消费。

当你使用kafka-topics.sh建立(删除)了一个topic以后,Kafka背后会执行什么逻辑?

在执行完脚本以后,Kafka 会在 log.dir 或 log.dirs 参数所配置的目录下建立相应的主题分区,默认状况下这个目录为/tmp/kafka-logs/。

在 ZooKeeper 的/brokers/topics/目录下建立一个同名的实节点,该节点中记录了该主题的分区副本分配方案。示例以下:

[zk: localhost:2181/kafka(CONNECTED) 2] get /brokers/topics/topic-create
{"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}
复制代码

topic的分区数可不能够增长?若是能够怎么增长?若是不能够,那又是为何?

能够增长,使用 kafka-topics 脚本,结合 --alter 参数来增长某个主题的分区数,命令以下:

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
复制代码

当分区数增长时,就会触发订阅该主题的全部 Group 开启 Rebalance。 首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程当中,全部 Consumer 实例都会中止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。 其次,目前 Rebalance 的设计是全部 Consumer 实例共同参与,所有从新分配全部分区。其实更高效的作法是尽可能减小分配方案的变更。 最后,Rebalance 实在是太慢了。

topic的分区数可不能够减小?若是能够怎么减小?若是不能够,那又是为何?

不支持,由于删除的分区中的消息很差处理。若是直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于 Spark、Flink 这类须要消息时间戳(事件时间)的组件将会受到影响;若是分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源,并且在复制期间,此主题的可用性又如何获得保障?与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的。

建立topic时如何选择合适的分区数?

在 Kafka 中,性能与分区数有着必然的关系,在设定分区数时通常也须要考虑性能的因素。对不一样的硬件而言,其对应的性能也会不太同样。 可使用Kafka 自己提供的用于生产者性能测试的 kafka-producer- perf-test.sh 和用于消费者性能测试的 kafka-consumer-perf-test.sh来进行测试。 增长合适的分区数能够在必定程度上提高总体吞吐量,但超过对应的阈值以后吞吐量不升反降。若是应用对吞吐量有必定程度上的要求,则建议在投入生产环境以前对同款硬件资源作一个完备的吞吐量相关的测试,以找到合适的分区数阈值区间。 分区数的多少还会影响系统的可用性。若是分区数很是多,若是集群中的某个 broker 节点宕机,那么就会有大量的分区须要同时进行 leader 角色切换,这个切换的过程会耗费一笔可观的时间,而且在这个时间窗口内这些分区也会变得不可用。 分区数越多也会让 Kafka 的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不只会增长日志清理的耗时,并且在被删除时也会耗费更多的时间。

#java学习笔记/Kafka

相关文章
相关标签/搜索