前面的文章对producer流程及其可自定义的配置类作了大致介绍,本文将继续对Kafka生成者编程相关知识点进行讲解。ProducerConfig
类存放着producer客户端可配置的项以及其对应的解释文档,在本文中,主要根据其说明文档,分析kafka内部的一些机制和原理。node
备注:apache
kafka-clients 1.1.0
版本。org.apache.kafka.clients.producer
中。重要性:高
类型:List
默认值:Collections.emptyList()编程
引导producer查找Kafka集群全部broker的引导服务地址列表。bootstrap
顾名思义,该配置项是引导服务列表,即用于查找Kafka集群中全部broker的host:port
列表,producer经过这些host:port
与kafka集群创建链接。producer用该列表中的地址只是用于发现kafka集群中全部的服务broker,而在kafka集群中,broker多是动态改变的。另外,Kafka机制中,能够经过某一个broker而查询到全部其余broker,因此在bootstrap.servers
中,并不须要配置全部broker的host:port
,理想状况下,只须要配置其中的某一个就能够了。但为了提升可用性,避免因该broker挂掉而致使没法查找,那么能够选择配置多个。segmentfault
配置格式为:安全
host1:port1,host2:port2,...
重要性:高
类型:Long
默认值:300000毫秒,即5分钟服务器
元数据最大生存时间,每隔metadata.max.age.ms
时间,producer客户端会强制刷新一遍元数据metadata,即便没有任何partition leadership主动发现新的broker或者新的partition。网络
元数据类org.apache.kafka.clients#Metadata
中,除了记录一些和自身更新策略有关的信息(metadata的更新策略值得另开一篇文章分析)。还保存了kafka集群的一些信息,参见org.apache.kafka.common#Cluster
类:函数
集群中全部结点broker node列表,Node结点中记录告终点的ip、port以及机架信息(rack)。oop
机架信息(rack):broker的机架信息,相似于Hadoop那样,能够更好地利用局部性原 理减小集群中网络开销。若是指定了机架信息(brooker.rack), Kafka在为分区作副 本分配时就会考虑这部分信息,尽量地为副本挑选不一样机架的broker。
集群中每个TopicPartition,对应的分区信息PartitionInfo。org.apache.kafka.common#PartitionInfo
中主要记录了以下信息:
集群中的控制结点信息。
控制结点broker:负责管理整个集群中分区和副本的状态,好比partition的leader 副本故障,由controller 负责为该partition从新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中全部broker更新其MetadataCache信息;或者增长某个topic分区的时候也会由controller管理分区的从新分配工做
重要性:高
类型:Long
默认值:16384字节,即16K
消息记录batch(批)大小限制。kafka producer在将消息记录record发送到集群时,会尝试将一批要发送到相同partition的消息记录压缩在一块儿,称之为batch(批)。每次request,其实不是发送一个record,而是发送若干个batch,而每一个batch里面可能包含多个record。这样成批成批的发送,减小了网络请求,有助于提高producer客户端和kafka集群服务的性能。
batch.size
就是用来设置一个batch的最大字节数byte。当设置为0时,表示彻底禁用batch的功能。若是batch.size
设置过大,那么可能形成内存浪费,由于每一个发送到不一样partition的record都须要预先分配一块batch.size大小的内存。
重要性:高
类型:String
默认值:"1"
应答数设置。producer只有接收到来自server的acks指定数量的应答,才会认为发送给server的消息记录已送达。该配置项用于控制已发送消息记录的持久性,有如下几种设置值:
acks = 0
:表示producer无需等待server端的应答消息,producer将record扔到发送缓冲区,就认为该record已经发送,而后转身走人。这种状况没法保证server端真的成功接收到该消息记录,且此时即便retries
配置项也没法生效,由于producer没法知道是否失败。另外,每一个record返回的offset都被设为-1。acks = 1
:表示接收该消息记录的分区leader将消息记录写入到本地log文件,就返回Acknowledgement,告知producer本次发送已完成,而无需等待其余follower分区的确认。这种状况下,可能出现消息记录没有备份的状况(follower宕机等)。acks = all
:表示消息记录只有获得分区leader以及其余分区副本同步结点队列(ISR)中的分区follower的确认以后,才能回复acknowlegement,告知producer本次发送已完成。这种状况下,只要分区副本同步结点队列(ISR)中某一个follower存活,那么消息记录就不会被丢失。这种方式最安全,但效率也最低。acks = -1
:等同于acks = all
。重要性:中
类型:Long
默认值:0毫秒,表示无延时,当即发送。
延迟发送消息记录的时间,上面及前面文章中也已经提到过,producer在发送消息记录record的时候,会将发送到同一个partition的records压缩在batch中。但一般这只发生在records到达速度快于records发送速度的状况下,很容易理解:若是发送速度大于record到达速度,则每来一个record都会被当即发送出去,根本不存在将多个records压缩为一个的可能。
但不少时候,即使是发送速度大于到达速度,咱们也不但愿每一个record就发送一次,仍是但愿分批次发送,以减小发送次数,提高producer客户端和服务器端的性能。为此,咱们须要人为地加一个发送延迟限制,即每次发送之间,存在必定的时间间隔linger.ms
,在这段时间内,可能有多个records到达,此时就能够对他们分组压缩,成批次发送。这相似于TCP的拥塞控制方法。
注意:
linger.ms
设置了发送延迟的最高时间上限,另外一个配置项batch.size
也同时控制着发送的时机。若是为某个partition压缩的batch字节数已经达到了batch.size
设置的字节数,那么该batch将被当即发送到指定的partition,即便此时延迟时间还没达到linger.ms
的设置。linger.ms
的设置,那么即便压缩累积的batch没有达到batch.size
设置的字节数,也会被发送到指定的partition。linger.ms
是针对每个发送到partition的request。即不一样partition的request并非同时发送的。linger.ms
值。重要程度:中
类型:String
默认值:""
producer 客户端ID,在建立request时,会传送到kafka服务。其目的是为了跟踪记录请求的来源,虽然服务端能够经过ip/port来追踪请求的来源,但ip/port没法表达业务语义,因此,能够经过client.id
来设置一个富有业务逻辑语义的名字(如PDK游戏),有助于后续的分析和记录。
重要程度:中
类型:int
默认值:131072字节,即128K。
TCP发送缓冲区(SO_SNDBUF)的大小,若send.buffer.bytes
设为-1,则使用操做系统的默认值。
重要程度:中
类型:int
默认值:32768字节,即32K。
TCP接收缓冲区(SO_RCVBUF)大小,当receive.buffer.bytes
设置为-1,则使用操做系统默认的大小。
重要程度:中
类型:String
默认值:1048576字节,即1M。
一个请求request中最大字节数,用于限制producer发送的单个请求request中,record batches的最大数量,以免单个请求数据过于巨大。
重要性:低
类型:Long
默认值:50毫秒。
重连间隔时间,避免producer客户端过于紧密循环地重连kafka服务broker。该值针对的是全部client到broker的链接。
重要性:低
类型:Long
默认值:1000毫秒
producer客户端链接一个kafka服务(broker)失败重连的总时间,每次链接失败,重连时间都会指数级增长,每次增长的时间会存在20%的随机抖动,以免链接风暴
。
应用启动的时候,常常可能发生各应用服务器的链接数异常飙升的状况。假设链接数的设置为:min值3,max值10,正常的业务使用链接数在5个左右,当重启应用时,各应用链接数可能会飙升到10个,瞬间甚至还有可能部分应用会报取不到链接。启动完成后接下来的时间内,链接开始慢慢返回到业务的正常值。这就是所谓的链接风暴。
重要性:低
类型:Long
默认值:1000毫秒
该配置值控制着KafkaProducer.send()
函数以及KafkaProducer.partitionsFor()
函数将阻塞的最大时间。另外当发送缓冲区满或者metadata不可用时,这两个方法也会被阻塞。若是阻塞发生在用户提供的自定义序列化类serializers或者是自定义的分区类partitioner,那么这些阻塞的时间不会被计算在该配置值之类。
上面总结了ProducerConfig
类中部分配置项,限于篇幅已经较长,剩余部分的配置项将在后面另起一篇再作介绍。另外,在这篇文章中,本身有一个疑惑:
producer发送消息记录到broker的时机,究竟是个什么机制?从上述配置项的介绍中,batch.size
,max.request.size
,linger.ms
这几个配置项都会影响其发送时机。
先在此记录,后续搞明白了再更新吧。若是有大牛可以帮我回答这个问题,能够在评论中帮我解答。