详解 Kafka 生产者

上一篇文章咱们主要介绍了什么是 Kafka,Kafka 的基本概念是什么,Kafka 单机和集群版的搭建,以及对基本的配置文件进行了大体的介绍,还对 Kafka 的几个主要角色进行了描述,咱们知道,无论是把 Kafka 用做消息队列、消息总线仍是数据存储平台来使用,最终是绕不过消息这个词的,这也是 Kafka 最最核心的内容,Kafka 的消息从哪里来?到哪里去?都干什么了?别着急,一步一步来,先说说 Kafka 的消息从哪来。html

生产者概述

在 Kafka 中,咱们把产生消息的那一方称为生产者,好比咱们常常回去淘宝购物,你打开淘宝的那一刻,你的登录信息,登录次数都会做为消息传输到 Kafka 后台,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会做为一个个消息传递给 Kafka 后台,而后淘宝会根据你的爱好作智能推荐,导致你的钱包历来都禁不住诱惑,那么这些生产者产生的消息是怎么传到 Kafka 应用程序的呢?发送过程是怎么样的呢?java

尽管消息的产生很是简单,可是消息的发送过程仍是比较复杂的,如图git

咱们从建立一个ProducerRecord 对象开始,ProducerRecord 是 Kafka 中的一个核心类,它表明了一组 Kafka 须要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成。程序员

在发送 ProducerRecord 时,咱们须要将键值对对象由序列化器转换为字节数组,这样它们才可以在网络上传输。而后消息到达了分区器。github

若是发送过程当中指定了有效的分区号,那么在发送记录时将使用该分区。若是发送过程当中未指定分区,则将使用key 的 hash 函数映射指定一个分区。若是发送的过程当中既没有分区号也没有,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪一个主题和分区发送数据了。算法

ProducerRecord 还有关联的时间戳,若是用户没有提供时间戳,那么生产者将会在记录中使用当前的时间做为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型。apache

  • 若是将主题配置为使用 CreateTime,则生产者记录中的时间戳将由 broker 使用。
  • 若是将主题配置为使用LogAppendTime,则生产者记录中的时间戳在将消息添加到其日志中时,将由 broker 重写。

而后,这条消息被存放在一个记录批次里,这个批次里的全部消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到 Kafka Broker 上。bootstrap

Kafka Broker 在收到消息时会返回一个响应,若是写入成功,会返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户。若是写入失败,会返回一个错误。生产者在收到错误以后会尝试从新发送消息,几回以后若是仍是失败的话,就返回错误消息。数组

建立 Kafka 生产者

要往 Kafka 写入消息,首先须要建立一个生产者对象,并设置一些属性。Kafka 生产者有3个必选的属性服务器

  • bootstrap.servers

该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不须要包含全部的 broker 地址,生产者会从给定的 broker 里查找到其余的 broker 信息。不过建议至少要提供两个 broker 信息,一旦其中一个宕机,生产者仍然可以链接到集群上。

  • key.serializer

broker 须要接收到序列化以后的 key/value值,因此生产者发送的消息须要通过序列化以后才传递给 Kafka Broker。生产者须要知道采用何种方式把 Java 对象转换为字节数组。key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化为字节数组。这里拓展一下 Serializer 类

Serializer 是一个接口,它表示类将会采用何种方式序列化,它的做用是把对象转换为字节,实现了 Serializer 接口的类主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默认使用的序列化器,其余的序列化器还有不少,你能够经过 这里 查看其余序列化器。要注意的一点:key.serializer 是必需要设置的,即便你打算只发送值的内容

  • value.serializer

与 key.serializer 同样,value.serializer 指定的类会将值序列化。

下面代码演示了如何建立一个 Kafka 生产者,这里只指定了必要的属性,其余使用默认的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);
复制代码

来解释一下这段代码

  • 首先建立了一个 Properties 对象
  • 使用 StringSerializer 序列化器序列化 key / value 键值对
  • 在这里咱们建立了一个新的生产者对象,并为键值设置了恰当的类型,而后把 Properties 对象传递给他。

实例化生产者对象后,接下来就能够开始发送消息了,发送消息主要由下面几种方式

直接发送,不考虑结果

使用这种发送方式,不会关心消息是否到达,会丢失一些消息,由于 Kafka 是高可用的,生产者会自动尝试重发,这种发送方式和 UDP 运输层协议很类似。

同步发送

同步发送仍然使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就能够知道消息时候否发送成功。

异步发送

异步发送指的是咱们调用 send() 方法,并制定一个回调函数,服务器在返回响应时调用该函数。

下一节咱们会从新讨论这三种实现。

向 Kafka 发送消息

简单消息发送

Kafka 最简单的消息发送以下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);
复制代码

代码中生产者(producer)的 send() 方法须要把 ProducerRecord 的对象做为参数进行发送,ProducerRecord 有不少构造函数,这个咱们下面讨论,这里调用的是

public ProducerRecord(String topic, K key, V value) {}
复制代码

这个构造函数,须要传递的是 topic主题,key 和 value。

把对应的参数传递完成后,生产者调用 send() 方法发送消息(ProducerRecord对象)。咱们能够从生产者的架构图中看出,消息是先被写入分区中的缓冲区中,而后分批次发送给 Kafka Broker。

发送成功后,send() 方法会返回一个 Future(java.util.concurrent) 对象,Future 对象的类型是 RecordMetadata 类型,咱们上面这段代码没有考虑返回值,因此没有生成对应的 Future 对象,因此没有办法知道消息是否发送成功。若是不是很重要的信息或者对结果不会产生影响的信息,可使用这种方式进行发送。

咱们能够忽略发送消息时可能发生的错误或者在服务器端可能发生的错误,但在消息发送以前,生产者还可能发生其余的异常。这些异常有多是 SerializationException(序列化失败)BufferedExhaustedException 或 TimeoutException(说明缓冲区已满),又或是 InterruptedException(说明发送线程被中断)

同步发送消息

第二种消息发送机制以下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

复制代码

这种发送消息的方式较上面的发送方式有了改进,首先调用 send() 方法,而后再调用 get() 方法等待 Kafka 响应。若是服务器返回错误,get() 方法会抛出异常,若是没有发生错误,咱们会获得 RecordMetadata 对象,能够用它来查看消息记录。

生产者(KafkaProducer)在发送的过程当中会出现两类错误:其中一类是重试错误,这类错误能够经过重发消息来解决。好比链接的错误,能够经过再次创建链接来解决;无错误则能够经过从新为分区选举首领来解决。KafkaProducer 被配置为自动重试,若是屡次重试后仍没法解决问题,则会抛出重试异常。另外一类错误是没法经过重试来解决的,好比消息过大对于这类错误,KafkaProducer 不会进行重试,直接抛出异常。

异步发送消息

同步发送消息都有个问题,那就是同一时间只能有一个消息在发送,这会形成许多消息没法直接发送,形成消息滞后,没法发挥效益最大化。

好比消息在应用程序和 Kafka 集群之间一个来回须要 10ms。若是发送完每一个消息后都等待响应的话,那么发送100个消息须要 1 秒,可是若是是异步方式的话,发送 100 条消息所须要的时间就会少不少不少。大多数时候,虽然Kafka 会返回 RecordMetadata 消息,可是咱们并不须要等待响应。

为了在异步发送消息的同时可以对异常状况进行处理,生产者提供了回掉支持。下面是回调的一个例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}
复制代码

首先实现回调须要定义一个实现了org.apache.kafka.clients.producer.Callback的类,这个接口只有一个 onCompletion方法。若是 kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常,这里咱们只是简单的把它打印出来,若是是生产环境须要更详细的处理,而后在 send() 方法发送的时候传递一个 Callback 回调的对象。

生产者分区机制

Kafka 对于数据的读写是以分区为粒度的,分区能够分布在多个主机(Broker)中,这样每一个节点可以实现独立的数据写入和读取,而且可以经过增长新的节点来增长 Kafka 集群的吞吐量,经过分区部署在多个 Broker 来实现负载均衡的效果。

上面咱们介绍了生产者的发送方式有三种:无论结果如何直接发送发送并返回结果发送并回调。因为消息是存在主题(topic)的分区(partition)中的,因此当 Producer 生产者发送产生一条消息发给 topic 的时候,你如何判断这条消息会存在哪一个分区中呢?

这其实就设计到 Kafka 的分区机制了。

分区策略

Kafka 的分区策略指的就是将生产者发送到哪一个分区的算法。Kafka 为咱们提供了默认的分区策略,同时它也支持你自定义分区策略。

若是要自定义分区策略的话,你须要显示配置生产者端的参数 Partitioner.class,咱们能够看一下这个类它位于 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}
复制代码

Partitioner 类有三个方法,分别来解释一下

  • partition(): 这个类有几个参数: topic,表示须要传递的主题;key 表示消息中的键值;keyBytes表示分区中序列化事后的key,byte数组的形式传递;value 表示消息的 value 值;valueBytes 表示分区中序列化后的值数组;cluster表示当前集群的原数据。Kafka 给你这么多信息,就是但愿让你可以充分地利用这些信息对消息进行分区,计算出它要被发送到哪一个分区中。
  • close() : 继承了 Closeable 接口可以实现 close() 方法,在分区关闭时调用。
  • onNewBatch(): 表示通知分区程序用来建立新的批次

其中与分区策略息息相关的就是 partition() 方法了,分区策略有下面这几种

顺序轮训

顺序分配,消息是均匀的分配给每一个 partition,即每一个分区存储一次消息。就像下面这样

上图表示的就是轮训策略,轮训策略是 Kafka Producer 提供的默认策略,若是你不使用指定的轮训策略的话,Kafka 默认会使用顺序轮训策略的方式。

随机轮训

随机轮训简而言之就是随机的向 partition 中保存消息,以下图所示

实现随机分配的代码只须要两行,以下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
复制代码

先计算出该主题总的分区数,而后随机地返回一个小于它的正整数。

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,因此若是追求数据的均匀分布,仍是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改成轮询了。

按照 key 进行消息保存

这个策略也叫作 key-ordering 策略,Kafka 中每条消息都会有本身的key,一旦消息被定义了 Key,那么你就能够保证同一个 Key 的全部消息都进入到相同的分区里面,因为每一个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,以下图所示

实现这个策略的 partition 方法一样简单,只须要下面两行代码便可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
复制代码

上面这几种分区策略都是比较基础的策略,除此以外,你还能够自定义分区策略。

生产者压缩机制

压缩一词简单来说就是一种互换思想,它是一种经典的用 CPU 时间去换磁盘空间或者 I/O 传输量的思想,但愿以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。若是你还不了解的话我但愿你先读完这篇文章 程序员须要了解的硬核知识之压缩算法,而后你就明白压缩是怎么回事了。

Kafka 压缩是什么

Kafka 的消息分为两层:消息集合 和 消息。一个消息集合中包含若干条日志项,而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 一般不会直接操做具体的一条条消息,它老是在消息集合这个层面上进行写入操做。

在 Kafka 中,压缩会发生在两个地方:Kafka Producer 和 Kafka Consumer,为何启用压缩?说白了就是消息太大,须要变小一点 来使消息发的更快一些。

Kafka Producer 中使用 compression.type 来开启压缩

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");
复制代码

上面代码代表该 Producer 的压缩算法使用的是 GZIP

有压缩必有解压缩,Producer 使用压缩算法压缩消息后并发送给服务器后,由 Consumer 消费者进行解压缩,由于采用的何种压缩算法是随着 key、value 一块儿发送过去的,因此消费者知道采用何种压缩算法。

Kafka 重要参数配置

在上一篇文章 带你涨姿式的认识一下kafka中,咱们主要介绍了一下 kafka 集群搭建的参数,本篇文章咱们来介绍一下 Kafka 生产者重要的配置,生产者有不少可配置的参数,在文档里(kafka.apache.org/documentati…

key.serializer

用于 key 键的序列化,它实现了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用于 value 值的序列化,实现了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 参数指定了要有多少个分区副本接收消息,生产者才认为消息是写入成功的。此参数对消息丢失的影响较大

  • 若是 acks = 0,就表示生产者也不知道本身产生的消息是否被服务器接收了,它才知道它写成功了。若是发送的途中产生了错误,生产者也不知道,它也比较懵逼,由于没有返回任何消息。这就相似于 UDP 的运输层协议,只管发,服务器接受不接受它也不关心。
  • 若是 acks = 1,只要集群的 Leader 接收到消息,就会给生产者返回一条消息,告诉它写入成功。若是发送途中形成了网络异常或者 Leader 还没选举出来等其余状况致使消息写入失败,生产者会受到错误消息,这时候生产者每每会再次重发数据。由于消息的发送也分为 同步异步,Kafka 为了保证消息的高效传输会决定是同步发送仍是异步发送。若是让客户端等待服务器的响应(经过调用 Future 中的 get() 方法),显然会增长延迟,若是客户端使用回调,就会解决这个问题。
  • 若是 acks = all,这种状况下是只有当全部参与复制的节点都收到消息时,生产者才会接收到一个来自服务器的消息。不过,它的延迟比 acks =1 时更高,由于咱们要等待不仅一个服务器节点接收消息。

buffer.memory

此参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。若是应用程序发送消息的速度超过发送到服务器的速度,会致使生产者空间不足。这个时候,send() 方法调用要么被阻塞,要么抛出异常,具体取决于 block.on.buffer.null 参数的设置。

compression.type

此参数来表示生产者启用何种压缩算法,默认状况下,消息发送时不会被压缩。该参数能够设置为 snappy、gzip 和 lz4,它指定了消息发送给 broker 以前使用哪种压缩算法进行压缩。下面是各压缩算法的对比

retries

生产者从服务器收到的错误有多是临时性的错误(好比分区找不到首领),在这种状况下,reteis 参数的值决定了生产者能够重发的消息次数,若是达到这个次数,生产者会放弃重试并返回错误。默认状况下,生产者在每次重试之间等待 100ms,这个等待参数能够经过 retry.backoff.ms 进行修改。

batch.size

当有多个消息须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可使用的内存大小,按照字节数计算。当批次被填满,批次里的全部消息会被发送出去。不过生产者井不必定都会等到批次被填满才发送,任意条数的消息均可能被发送。

client.id

此参数能够是任意的字符串,服务器会用它来识别消息的来源,通常配置在日志里

max.in.flight.requests.per.connection

此参数指定了生产者在收到服务器响应以前能够发送多少消息,它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1 能够保证消息是按照发送的顺序写入服务器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生产者在发送数据时等待服务器返回的响应时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(好比目标分区的首领是谁)时等待服务器返回响应的时间。若是等待时间超时,生产者要么重试发送数据,要么返回一个错误。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配----若是在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。

max.block.ms

此参数指定了在调用 send() 方法或使用 partitionFor() 方法获取元数据时生产者的阻塞时间当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

max.request.size

该参数用于控制生产者发送的请求大小。它能够指能发送的单个消息的最大值,也能够指单个请求里全部消息的总大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基于 TCP 实现的,为了保证可靠的消息传输,这两个参数分别指定了 TCP Socket 接收和发送数据包的缓冲区的大小。若是它们被设置为 -1,就使用操做系统的默认值。若是生产者或消费者与 broker 处于不一样的数据中心,那么能够适当增大这些值。

文章参考:

《Kafka 权威指南》

极客时间 -《Kafka 核心技术与实战》

kafka.apache.org/documentati…

Kafka 源码

blog.cloudflare.com/squeezing-t…

github.com/facebook/zs…

关注公众号获取更多优质电子书,关注一下你就知道资源是有多好了

相关文章
相关标签/搜索