今天给你们聊一个颇有意思的话题,你们知道不少公司都会基于Kafka做为MQ来开发一些复杂的大型系统。面试
而在使用Kafka的客户端编写代码与服务器交互的时候,是须要对客户端设置不少的参数的。apache
因此我就见过不少年轻的同窗,可能刚刚加入团队,对Kafka这个技术其实并非很了解。bootstrap
此时就会致使他们看团队里的一些资深同事写的一些代码,会看不懂是怎么回事,不了解背后的含义,这里面尤为是一些Kafka参数的设置。服务器
因此这篇文章,咱们仍是采用老规矩画图的形式,来聊聊Kafka生产端一些常见参数的设置,让你们下次看到一些Kafka客户端设置的参数时,不会再感到发怵。测试
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("buffer.memory", 67108864); props.put("batch.size", 131072); props.put("linger.ms", 100); props.put("max.request.size", 10485760); props.put("acks", "1"); props.put("retries", 10); props.put("retry.backoff.ms", 500); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
首先咱们看看“buffer.memory”这个参数是什么意思?spa
Kafka的客户端发送数据到服务器,通常都是要通过缓冲的,也就是说,你经过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,而后把不少消息收集成一个一个的Batch,再发送到Broker上去的。线程
因此这个“buffer.memory”的本质就是用来约束KafkaProducer可以使用的内存缓冲的大小的,他的默认值是32MB。3d
那么既然了解了这个含义,你们想一下,在生产项目里,这个参数应该怎么来设置呢?调试
你能够先想一下,若是这个内存缓冲设置的太小的话,可能会致使一个什么问题?server
首先要明确一点,那就是在内存缓冲里大量的消息会缓冲在里面,造成一个一个的Batch,每一个Batch里包含多条消息。
而后KafkaProducer有一个Sender线程会把多个Batch打包成一个Request发送到Kafka服务器上去。
那么若是要是内存设置的过小,可能致使一个问题:消息快速的写入内存缓冲里面,可是Sender线程来不及把Request发送到Kafka服务器。
这样是否是会形成内存缓冲很快就被写满?一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。
因此对于“buffer.memory”这个参数应该结合本身的实际状况来进行压测,你须要测算一下在生产环境,你的用户线程会以每秒多少消息的频率来写入内存缓冲。
好比说每秒300条消息,那么你就须要压测一下,假设内存缓冲就32MB,每秒写300条消息到内存缓冲,是否会常常把内存缓冲写满?通过这样的压测,你能够调试出来一个合理的内存大小。
接着你须要思考第二个问题,就是你的“batch.size”应该如何设置?这个东西是决定了你的每一个Batch要存放多少数据就能够发送出去了。
好比说你要是给一个Batch设置成是16KB的大小,那么里面凑够16KB的数据就能够发送了。
这个参数的默认值是16KB,通常能够尝试把这个参数调节大一些,而后利用本身的生产环境发消息的负载来测试一下。
好比说发送消息的频率就是每秒300条,那么若是好比“batch.size”调节到了32KB,或者64KB,是否能够提高发送消息的总体吞吐量。
由于理论上来讲,提高batch的大小,能够容许更多的数据缓冲在里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提高。
可是这个东西也不能无限的大,过于大了以后,要是数据总是缓冲在Batch里迟迟不发送出去,那么岂不是你发送消息的延迟就会很高。
好比说,一条消息进入了Batch,可是要等待5秒钟Batch才凑满了64KB,才能发送出去。那这条消息的延迟就是5秒钟。
因此须要在这里按照生产环境的发消息的速率,调节不一样的Batch大小本身测试一下最终出去的吞吐量以及消息的 延迟,设置一个最合理的参数。
要是一个Batch迟迟没法凑满,此时就须要引入另一个参数了,“linger.ms”
他的含义就是说一个Batch被建立以后,最多过多久,无论这个Batch有没有写满,都必须发送出去了。
给你们举个例子,好比说batch.size是16kb,可是如今某个低峰时间段,发送消息很慢。
这就致使可能Batch被建立以后,陆陆续续有消息进来,可是迟迟没法凑够16KB,难道此时就一直等着吗?
固然不是,假设你如今设置“linger.ms”是50ms,那么只要这个Batch从建立开始到如今已通过了50ms了,哪怕他还没满16KB,也要发送他出去了。
因此“linger.ms”决定了你的消息一旦写入一个Batch,最多等待这么多时间,他必定会跟着Batch一块儿发送出去。
避免一个Batch迟迟凑不满,致使消息一直积压在内存里发送不出去的状况。这是一个很关键的参数。
这个参数通常要很是慎重的来设置,要配合batch.size一块儿来设置。
举个例子,首先假设你的Batch是32KB,那么你得估算一下,正常状况下,通常多久会凑够一个Batch,好比正常来讲可能20ms就会凑够一个Batch。
那么你的linger.ms就能够设置为25ms,也就是说,正常来讲,大部分的Batch在20ms内都会凑满,可是你的linger.ms能够保证,哪怕遇到低峰时期,20ms凑不满一个Batch,仍是会在25ms以后强制Batch发送出去。
若是要是你把linger.ms设置的过小了,好比说默认就是0ms,或者你设置个5ms,那可能致使你的Batch虽然设置了32KB,可是常常是还没凑够32KB的数据,5ms以后就直接强制Batch发送出去,这样也不太好其实,会致使你的Batch形同虚设,一直凑不满数据。
“max.request.size”这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值,这个其实能够根据你本身的消息的大小来灵活的调整。
给你们举个例子,大家公司发送的消息都是那种大的报文消息,每条消息都是不少的数据,一条消息可能都要20KB。
此时你的batch.size是否是就须要调节大一些?好比设置个512KB?而后你的buffer.memory是否是要给的大一些?好比设置个128MB?
只有这样,才能让你在大消息的场景下,还能使用Batch打包多条消息的机制。可是此时“max.request.size”是否是也得同步增长?
由于可能你的一个请求是很大的,默认他是1MB,你是否是能够适当调大一些,好比调节到5MB?
“retries”和“retries.backoff.ms”决定了重试机制,也就是若是一个请求失败了能够重试几回,每次重试的间隔是多少毫秒。
这个你们适当设置几回重试的机会,给必定的重试间隔便可,好比给100ms的重试间隔。
“acks”参数决定了发送出去的消息要采用什么样的持久化策略,这个涉及到了不少其余的概念,你们能够参考以前专门为“acks”写过的一篇文章: