【大数据实践】Kafka生产者编程(5)——ProducerConfig详解(下)

前言

上一篇文章【大数据实践】Kafka生产者编程(4)——ProducerConfig详解(上)中,对kafka producer的相关配置项进行了部分介绍,在本文章中,将继续完成剩下配置项的介绍。apache

ProducerConfig类

buffer.memory

重要性:高
类型:Long
默认值:33554432字节,即32M编程

producer能够用于缓存等待发送到服务端的消息记录的缓冲区大小,当消息记录发送到缓冲区的速度大于传输到server的速度,那么等待发送的消息记录将会放在缓冲区,缓冲区若是满了,那么producer会阻塞max.block.ms指定的毫秒数,超过该毫秒数时,将抛出异常。segmentfault

注意:该缓冲区的大小设置与整个producer须要使用到的producer大致一致,可是要注意并非全部的缓冲区都是用来存放待发送的records的,好比还有一部分用于压缩数据(当压缩数据的选项开启),还有一部分用于维护in-flight(正在发送)的请求列表。缓存

retry.backoff.ms

重要性:低
类型:Long
默认值:100毫秒session

当一个producer到指定的partition的请求request失败时,在重连以前,须要等待的毫秒数。这是为了不在某些失败的场景下,过于密集地重复发送请求。app

compression.type

重要性:高
类型:String
默认值:"none"ide

producer对数据使用的压缩类型,包括:大数据

  • none:无压缩类型
  • gzip
  • snappy
  • lz4

producer在压缩数据时,是对全部batches的数据一块儿进行压缩,而不是一个batch一个batch压缩,因此,一次压缩的batches越多,压缩率越高,压缩效果越好。.net

metrics.sample.window.ms

重要性:低
类型:Long
默认值:30000毫秒,即30秒code

计算度量样本的时间窗口,度量用于kafka监控。

metrics.num.samples

重要性:低
类型:int
默认值:2

维护用于计算度量metrics的样本数量。

metrics.recording.level

重要性:低
类型:String
默认值:info

用于metrics的最高纪录等级。

metric.reporters

重要性:低
类型:List
默认值:Collections.emptyList()

被做为metrics reporter的类的列表,这些类都实现了接口org.apache.kafka.common.metrics.MetricsReporter,所以当有新的metric生成时,这些reporters类均可以接收到通知。一般都会包含JmxReporter类,以用于注册JMX统计。

JMX(Java Management Extensions)

kafka使用jmx调取kafka broker的内部数据,来监控一些敏感的数据。

JMX相关资料:

从零开始玩转JMX(一)——简介和Standard MBean
从零开始玩转JMX(二)——Condition
从零开始玩转JMX(三)——Model MBean
从零开始玩转JMX(四)——Apache Commons Modeler & Dynamic MBean

max.in.flight.requests.per.connection

重要性:低
类型:int
默认值:5

在单个链接中,producer客户端在阻塞以前,能够容许未被确认的最大请求数,即当一个链接中未被确认的请求数超过了该设置,那么该producer客户端将会阻塞。注意:若是该值设置得比1大,当出现发送失败的状况,且retries配置项又开启时,那么存在消息被从新排序的风险。

retries

重要性:低
类型:int
默认值:0,表示不重试

当该值被设置成大于0时,客户端会从新发送消息,而且记录发送失败的错误。注意,该重试配置项和客户端因收到错误而重发是同样的。当retries配置项大于0,且max.in.flight.requests.per.connection配置项的值大于1时,存在将重试记录从新排序的风险,也就是说,消息记录的顺序可能会被打乱。缘由是:当两个batch被发送到同一个partition时,若是第一个失败,而第二个成功,那么第一个会被重试,此时第二个batch就排在前面了。

key.serializer

重要性:高
类型:Class
默认值:无

消息记录key的序列化类。

value.serializer

重要性:高
类型:Class
默认值:无

消息记录中value的序列化类。

connections.max.idle.ms

重要性:中
类型:Long
默认值:540000毫秒,即9分钟

若是一个链接空闲时间超过该配置值,那么该链接将会被关闭。

partitioner.class

重要性:中
类型:Class
默认值:无

计算消息记录要分配到哪一个partitioner的类。在前面的文章【大数据实践】Kafka生产者编程(3)——Interceptor & Partitioner中,对partitioner有详细讲解。

interceptor.classes

重要性:低
类型:List
默认值: Collections.emptyList()

拦截链,在前面的文章【大数据实践】Kafka生产者编程(3)——Interceptor & Partitioner有详细讲解。

enable.idempotence

重要性:低
类型:Boolean
默认值:false

是否使用幂等性。若是设置为true,表示producer将确保每一条消息都刚好有一份备份;若是设置为false,则表示producer因发送数据到broker失败重试使,可能往数据流中写入多分重试的消息。

注意:若是使用idempotence,即enable.idempotence为true,那么要求配置项max.in.flight.requests.per.connection的值必须小于或等于5;配置项retries的值必须大于0;acks配置项必须设置为all。若是这些值没有被用户明确地设置,那么系统将自动选择合适的值。若是设置的值不合适,那么会抛出ConfigException异常。

transaction.timeout.ms

重要性:低
类型:Int
默认值:60000毫秒,即60秒

在主动停止(aborting)一个事务transaction以前,事务协调员(transaction coordinator)最多等待的最长时间——为了让producer更新事务状态。若是该值大于kafka broker中设置的transaction.max.timeout.ms配置项的值,那么producer 的请求将由于InvalidTransactionTimeout错误而失败。

transactional.id

重要性:低
类型:String
默认值:null,表示transactions不能被使用

配置TransactionalId,用于事务的递交(delivery)。该配置项能够为跨多个producer的session提供可靠性语义,由于它能够保证在开始一个新的事务以前,使用相同事务ID(TransactionalId)的事务必定会完成。

注意:若是配置了transactional.id的值,那么必须配置enable.idempotence为true。在发布环境中,事务要求一个kafka集群必须有最少3个brokers(推荐设置)。在开发环境中,能够经过调整broker的配置项transaction.state.log.replication.factor来进行调整,以方便开发。

小结

终于将producer的各个配置项过了一遍,一般咱们须要关注的大概都是重要性为高的配置项。从上面的配置项中,咱们也了解到一些新的概念,好比度量(metrics)事务transactions等。这些概念可能会在接下来的文章中进行讲解。

相关文章
相关标签/搜索