从源码分析如何优雅的使用 Kafka 生产者

前言

在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 做为中间件。java

其中有朋友咨询在大量消息的状况下 Kakfa 是如何保证消息的高效及一致性呢?算法

正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。apache

内容较多,对源码感兴趣的朋友请系好安全带😏(源码基于 v0.10.0.0 版本分析)。同时最好是有必定的 Kafka 使用经验,知晓基本的用法。

简单的消息发送

在分析以前先看一个简单的消息发送是怎么样的。bootstrap

如下代码基于 SpringBoot 构建。

首先建立一个 org.apache.kafka.clients.producer.Producer 的 bean。缓存

主要关注 bootstrap.servers,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094安全

其他几个参数暂时不作讨论,后文会有详细介绍。

接着注入这个 bean 便可调用它的发送函数发送消息。网络

这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。并发

但这仅仅只是作到了消息发送,对消息是否成功送达彻底没管,等因而纯异步的方式。app

同步

那么我想知道消息到底发送成功没有该怎么办呢?异步

其实 ProducerAPI 已经帮咱们考虑到了,发送以后只须要调用它的 get() 方法便可同步获取发送结果。

发送结果:

这样的发送效率实际上是比较低下的,由于每次都须要同步等待消息发送的结果。

异步

为此咱们应当采起异步的方式发送,其实 send() 方法默认则是异步的,只要不手动调用 get() 方法。

但这样就无法获知发送结果。

因此查看 send() 的 API 能够发现还有一个参数。

Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);

Callback 是一个回调接口,在消息发送完成以后能够回调咱们自定义的实现。

执行以后的结果:

一样的也能获取结果,同时发现回调的线程并非上文同步时的主线程,这样也能证实是异步回调的。

同时回调的时候会传递两个参数:

  • RecordMetadata 和上文一致的消息发送成功后的元数据。
  • Exception 消息发送过程当中的异常信息。

可是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。

因此正确的写法应当是:

至于为何会只有参数一个有值,在下文的源码分析中会一一解释。

源码分析

如今只掌握了基本的消息发送,想要深入的理解发送中的一些参数配置仍是得源码说了算。

首先仍是来谈谈消息发送时的整个流程是怎么样的,Kafka 并非简单的把消息经过网络发送到了 broker 中,在 Java 内部仍是通过了许多优化和设计。

发送流程

为了直观的了解发送的流程,简单的画了几个在发送过程当中关键的步骤。

从上至下依次是:

  • 初始化以及真正发送消息的 kafka-producer-network-thread IO 线程。
  • 将消息序列化。
  • 获得须要发送的分区。
  • 写入内部的一个缓存区中。
  • 初始化的 IO 线程不断的消费这个缓存来发送消息。

步骤解析

接下来详解每一个步骤。

初始化

调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer。比较麻烦的是初始化 Sender 线程进行缓冲区消费。

初始化 IO 线程处:

能够看到 Sender 线程有须要成员变量,好比:

acks,retries,requestTimeout

等,这些参数会在后文分析。

序列化消息

在调用 send() 函数后其实第一步就是序列化,毕竟咱们的消息须要经过网络才能发送到 Kafka。

其中的 valueSerializer.serialize(record.topic(), record.value()); 是一个接口,咱们须要在初始化时候指定序列化实现类。

咱们也能够本身实现序列化,只须要实现 org.apache.kafka.common.serialization.Serializer 接口便可。

路由分区

接下来就是路由分区,一般咱们使用的 Topic 为了实现扩展性以及高性能都会建立多个分区。

若是是一个分区好说,全部消息都往里面写入便可。

但多个分区就不可避免须要知道写入哪一个分区。

一般有三种方式。

指定分区

能够在构建 ProducerRecord 为每条消息指定分区。

这样在路由时会判断是否有指定,有就直接使用该分区。

这种通常在特殊场景下会使用。

自定义路由策略

若是没有指定分区,则会调用 partitioner.partition 接口执行自定义分区策略。

而咱们也只须要自定义一个类实现 org.apache.kafka.clients.producer.Partitioner 接口,同时在建立 KafkaProducer 实例时配置 partitioner.class 参数。

一般须要自定义分区通常是在想尽可能的保证消息的顺序性。

或者是写入某些特有的分区,由特别的消费者来进行处理等。

默认策略

最后一种则是默认的路由策略,若是咱们啥都没作就会执行该策略。

该策略也会使得消息分配的比较均匀。

来看看它的实现:

简单的来讲分为如下几步:

  • 获取 Topic 分区数。
  • 将内部维护的一个线程安全计数器 +1。
  • 与分区数取模获得分区编号。

其实这就是很典型的轮询算法,因此只要分区数不频繁变更这种方式也会比较均匀。

写入内部缓存

send() 方法拿到分区后会调用一个 append() 函数:

该函数中会调用一个 getOrCreateDeque() 写入到一个内部缓存中 batches

消费缓存

在最开始初始化的 IO 线程实际上是一个守护线程,它会一直消费这些数据。

经过图中的几个函数会获取到以前写入的数据。这块内容能够没必要深究,但其中有个 completeBatch 方法却很是关键。

调用该方法时候确定已是消息发送完毕了,因此会调用 batch.done() 来完成以前咱们在 send() 方法中定义的回调接口。

从这里也能够看出为何以前说发送完成后元数据和异常信息只会出现一个。

Producer 参数解析

发送流程讲完了再来看看 Producer 中比较重要的几个参数。

acks

acks 是一个影响消息吞吐量的一个关键参数。

主要有 [all、-1, 0, 1] 这几个选项,默认为 1。

因为 Kafka 不是采起的主备模式,而是采用相似于 Zookeeper 的主备模式。

前提是 Topic 配置副本数量 replica > 1

acks = all/-1 时:

意味着会确保全部的 follower 副本都完成数据的写入才会返回。

这样能够保证消息不会丢失!

但同时性能和吞吐量倒是最低的。

acks = 0 时:

producer 不会等待副本的任何响应,这样最容易丢失消息但同时性能倒是最好的!

acks = 1 时:

这是一种折中的方案,它会等待副本 Leader 响应,但不会等到 follower 的响应。

一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都获得了必定的保证。

batch.size

这个参数看名称就知道是内部缓存区的大小限制,对他适当的调大能够提升吞吐量。

但也不能极端,调太大会浪费内存。小了也发挥不了做用,也是一个典型的时间和空间的权衡。

上图是几个使用的体现。

retries

retries 该参数主要是来作重试使用,当发生一些网络抖动都会形成重试。

这个参数也就是限制重试次数。

但也有一些其余问题。

  • 由于是重发因此消息顺序可能不会一致,这也是上文提到就算是一个分区消息也不会是彻底顺序的状况。
  • 仍是因为网络问题,原本消息已经成功写入了可是没有成功响应给 producer,进行重试时就可能会出现消息重复。这种只能是消费者进行幂等处理。

高效的发送方式

若是消息量真的很是大,同时又须要尽快的将消息发送到 Kafka。一个 producer 始终会收到缓存大小等影响。

那是否能够建立多个 producer 来进行发送呢?

  • 配置一个最大 producer 个数。
  • 发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List 中,保存时作好同步处理防止并发问题。
  • 获取发送者时能够按照默认的分区策略使用轮询的方式获取(保证使用均匀)。

这样在大量、频繁的消息发送场景中能够提升发送效率减轻单个 producer 的压力。

关闭 Producer

最后则是 Producer 的关闭,Producer 在使用过程当中消耗了很多资源(线程、内存、网络等)所以须要显式的关闭从而回收这些资源。

默认的 close() 方法和带有超时时间的方法都是在必定的时间后强制关闭。

但在过时以前都会处理完剩余的任务。

因此使用哪个得视状况而定。

总结

本文内容较多,从实例和源码的角度分析了 Kafka 生产者。

但愿看完的朋友能有收获,同时也欢迎留言讨论。

不出意外下期会讨论 Kafka 消费者。

若是对你有帮助还请分享让更多的人看到。

欢迎关注公众号一块儿交流:

相关文章
相关标签/搜索