【大数据实践】Kafka生产者编程(4)——ProducerConfig详解(上)

前言

前面的文章对producer流程及其可自定义的配置类作了大致介绍,本文将继续对Kafka生成者编程相关知识点进行讲解。ProducerConfig类存放着producer客户端可配置的项以及其对应的解释文档,在本文中,主要根据其说明文档,分析kafka内部的一些机制和原理。node

备注:apache

  • 本文章中针对的是kafka-clients 1.1.0版本。
  • ProducerConfig 类在包org.apache.kafka.clients.producer中。

ProducerConfig各配置项

bootstrap.servers

重要性:高
类型:List
默认值:Collections.emptyList()编程

引导producer查找Kafka集群全部broker的引导服务地址列表bootstrap

顾名思义,该配置项是引导服务列表,即用于查找Kafka集群中全部broker的host:port列表,producer经过这些host:port与kafka集群创建链接。producer用该列表中的地址只是用于发现kafka集群中全部的服务broker,而在kafka集群中,broker多是动态改变的。另外,Kafka机制中,能够经过某一个broker而查询到全部其余broker,因此在bootstrap.servers中,并不须要配置全部broker的host:port,理想状况下,只须要配置其中的某一个就能够了。但为了提升可用性,避免因该broker挂掉而致使没法查找,那么能够选择配置多个。segmentfault

配置格式为:安全

host1:port1,host2:port2,...

metadata.max.age.ms

重要性:高
类型:Long
默认值:300000毫秒,即5分钟服务器

元数据最大生存时间,每隔metadata.max.age.ms时间,producer客户端会强制刷新一遍元数据metadata,即便没有任何partition leadership主动发现新的broker或者新的partition。网络

元数据

元数据类org.apache.kafka.clients#Metadata中,除了记录一些和自身更新策略有关的信息(metadata的更新策略值得另开一篇文章分析)。还保存了kafka集群的一些信息,参见org.apache.kafka.common#Cluster类:函数

  • 集群中全部结点broker node列表,Node结点中记录告终点的ip、port以及机架信息(rack)。oop

    机架信息(rack):broker的机架信息,相似于Hadoop那样,能够更好地利用局部性原 
    理减小集群中网络开销。若是指定了机架信息(brooker.rack), Kafka在为分区作副 
    本分配时就会考虑这部分信息,尽量地为副本挑选不一样机架的broker。
  • 集群中每个TopicPartition,对应的分区信息PartitionInfo。org.apache.kafka.common#PartitionInfo中主要记录了以下信息:

    • 分区所属的topic。
    • 分区partition编号。
    • 分区的leader所在结点。
    • 分区副本结点列表。
    • 分区副本同步结点队列(ISR)。
    • 离线副本结点队列。
  • 集群中的控制结点信息。

    控制结点broker:负责管理整个集群中分区和副本的状态,好比partition的leader 副本故障,由controller 负责为该partition从新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中全部broker更新其MetadataCache信息;或者增长某个topic分区的时候也会由controller管理分区的从新分配工做
  • 集群中每一个topic对应的全部分区列表,至关于以topic做为索引。
  • 集群中每一个topic对应的可用分区列表。
  • 集群中每一个结点broker node对应的全部分区列表,至关于以broker.id做为索引。
  • 集群中每一个结点ID(broker.id)对应的结点信息。

batch.size

重要性:高
类型:Long
默认值:16384字节,即16K

消息记录batch(批)大小限制。kafka producer在将消息记录record发送到集群时,会尝试将一批要发送到相同partition的消息记录压缩在一块儿,称之为batch(批)。每次request,其实不是发送一个record,而是发送若干个batch,而每一个batch里面可能包含多个record。这样成批成批的发送,减小了网络请求,有助于提高producer客户端和kafka集群服务的性能。

batch.size就是用来设置一个batch的最大字节数byte。当设置为0时,表示彻底禁用batch的功能。若是batch.size设置过大,那么可能形成内存浪费,由于每一个发送到不一样partition的record都须要预先分配一块batch.size大小的内存。

acks

重要性:高
类型:String
默认值:"1"

应答数设置。producer只有接收到来自server的acks指定数量的应答,才会认为发送给server的消息记录已送达。该配置项用于控制已发送消息记录的持久性,有如下几种设置值:

  • acks = 0:表示producer无需等待server端的应答消息,producer将record扔到发送缓冲区,就认为该record已经发送,而后转身走人。这种状况没法保证server端真的成功接收到该消息记录,且此时即便retries配置项也没法生效,由于producer没法知道是否失败。另外,每一个record返回的offset都被设为-1。
  • acks = 1:表示接收该消息记录的分区leader将消息记录写入到本地log文件,就返回Acknowledgement,告知producer本次发送已完成,而无需等待其余follower分区的确认。这种状况下,可能出现消息记录没有备份的状况(follower宕机等)。
  • acks = all:表示消息记录只有获得分区leader以及其余分区副本同步结点队列(ISR)中的分区follower的确认以后,才能回复acknowlegement,告知producer本次发送已完成。这种状况下,只要分区副本同步结点队列(ISR)中某一个follower存活,那么消息记录就不会被丢失。这种方式最安全,但效率也最低。
  • acks = -1:等同于acks = all

linger.ms

重要性:中
类型:Long
默认值:0毫秒,表示无延时,当即发送。

延迟发送消息记录的时间,上面及前面文章中也已经提到过,producer在发送消息记录record的时候,会将发送到同一个partition的records压缩在batch中。但一般这只发生在records到达速度快于records发送速度的状况下,很容易理解:若是发送速度大于record到达速度,则每来一个record都会被当即发送出去,根本不存在将多个records压缩为一个的可能。

但不少时候,即使是发送速度大于到达速度,咱们也不但愿每一个record就发送一次,仍是但愿分批次发送,以减小发送次数,提高producer客户端和服务器端的性能。为此,咱们须要人为地加一个发送延迟限制,即每次发送之间,存在必定的时间间隔linger.ms,在这段时间内,可能有多个records到达,此时就能够对他们分组压缩,成批次发送。这相似于TCP的拥塞控制方法。

注意:

  • linger.ms设置了发送延迟的最高时间上限,另外一个配置项batch.size也同时控制着发送的时机。若是为某个partition压缩的batch字节数已经达到了batch.size设置的字节数,那么该batch将被当即发送到指定的partition,即便此时延迟时间还没达到linger.ms的设置。
  • 一样的,若是延迟的时间已经达到了linger.ms的设置,那么即便压缩累积的batch没有达到batch.size设置的字节数,也会被发送到指定的partition。
  • linger.ms是针对每个发送到partition的request。即不一样partition的request并非同时发送的。
  • 延迟觉得这性能下降,须要在延迟和性能之间进行平衡,找到一个合适的linger.ms值。

client.id

重要程度:中
类型:String
默认值:""

producer 客户端ID,在建立request时,会传送到kafka服务。其目的是为了跟踪记录请求的来源,虽然服务端能够经过ip/port来追踪请求的来源,但ip/port没法表达业务语义,因此,能够经过client.id来设置一个富有业务逻辑语义的名字(如PDK游戏),有助于后续的分析和记录。

send.buffer.bytes

重要程度:中
类型:int
默认值:131072字节,即128K。

TCP发送缓冲区(SO_SNDBUF)的大小,若send.buffer.bytes设为-1,则使用操做系统的默认值。

receive.buffer.bytes

重要程度:中
类型:int
默认值:32768字节,即32K。

TCP接收缓冲区(SO_RCVBUF)大小,当receive.buffer.bytes设置为-1,则使用操做系统默认的大小。

max.request.size

重要程度:中
类型:String
默认值:1048576字节,即1M。

一个请求request中最大字节数,用于限制producer发送的单个请求request中,record batches的最大数量,以免单个请求数据过于巨大。

max.request.size & batch.size

  • 一个请求request中,可能包含多个record batch。
  • max.request.size可能影响record batch的大小上限,即当batch.size 大于 max.request.size时,batch的上限就变成了max.request.size设置的大小。

reconnect.backoff.ms

重要性:低
类型:Long
默认值:50毫秒。

重连间隔时间,避免producer客户端过于紧密循环地重连kafka服务broker。该值针对的是全部client到broker的链接。

reconnect.backoff.max.ms

重要性:低
类型:Long
默认值:1000毫秒

producer客户端链接一个kafka服务(broker)失败重连的总时间,每次链接失败,重连时间都会指数级增长,每次增长的时间会存在20%的随机抖动,以免链接风暴

链接风暴

应用启动的时候,常常可能发生各应用服务器的链接数异常飙升的状况。假设链接数的设置为:min值3,max值10,正常的业务使用链接数在5个左右,当重启应用时,各应用链接数可能会飙升到10个,瞬间甚至还有可能部分应用会报取不到链接。启动完成后接下来的时间内,链接开始慢慢返回到业务的正常值。这就是所谓的链接风暴。

max.block.ms

重要性:低
类型:Long
默认值:1000毫秒

该配置值控制着KafkaProducer.send()函数以及KafkaProducer.partitionsFor()函数将阻塞的最大时间。另外当发送缓冲区满或者metadata不可用时,这两个方法也会被阻塞。若是阻塞发生在用户提供的自定义序列化类serializers或者是自定义的分区类partitioner,那么这些阻塞的时间不会被计算在该配置值之类。

小结

上面总结了ProducerConfig类中部分配置项,限于篇幅已经较长,剩余部分的配置项将在后面另起一篇再作介绍。另外,在这篇文章中,本身有一个疑惑:

producer发送消息记录到broker的时机,究竟是个什么机制?从上述配置项的介绍中, batch.sizemax.request.sizelinger.ms这几个配置项都会影响其发送时机。

先在此记录,后续搞明白了再更新吧。若是有大牛可以帮我回答这个问题,能够在评论中帮我解答。

相关文章
相关标签/搜索