插曲:Kafka的生产者原理及重要参数说明

前言

原本插曲系列是应你们要求去更新的,可是好像第一篇的kafka效果还能够因此更插曲就勤快些了(毕竟谁不想看着本身被多多点赞呢hhh🤣),上一篇说了一个案例是为了说明如何去考量一个kafka集群的部署,算是一个参考吧,毕竟你们在不一样的公司工做确定也会有本身的一套实施方案。apache

此次咱们再回到原理性的问题,此次会延续第一篇的风格,带领你们把图一步一步画出来。轻松愉快bootstrap

1、Kafka的Producer原理

首先咱们得先有个集群吧,而后集群中有若干台服务器,每一个服务器咱们管它叫Broker,其实就是一个个Kafka进程数组

若是你们还记得第一篇的内容,就不难猜出来,接下来确定会有一个controller和多个follower,还有个zookeeper集群,一开始咱们的Broker都会注册到咱们的zookeeper集群上面。服务器

而后controller也会监听zookeeper集群的变化,在集群产生变化时更改本身的元数据信息。而且follower也会去它们的老大controller那里去同步元数据信息,因此一个Kafka集群中全部服务器上的元数据信息都是一致的。网络

上述准备完成后,咱们正式开始咱们生产者的内容负载均衡

① 名词1 --- ProducerRecord

生产者须要往集群发送消息前,要先把每一条消息封装成ProducerRecord对象,这是生产者内部完成的。以后会经历一个序列化的过程。以前好几篇专栏也是有提到过了,须要通过网络传输的数据都是二进制的一些字节数据,须要进行序列化才能传输。异步

此时就会有一个问题,咱们须要把消息发送到一个Topic下的一个leader partition中,但是生产者是怎样get到这个topic下哪一个分区才是leader partition呢?socket

可能有些小伙伴忘了,提醒一下,controller能够视做为broker的领导,负责管理集群的元数据,而leader partition是作负载均衡用的,它们会分布式地存储在不一样的服务器上面。集群中生产数据也好,消费数据也好,都是针对leader partition而操做的。分布式

② 名词2 --- partitioner

怎么知道哪一个才是leader partition,只须要获取到元数据不就行了嘛。ide

说来要怎么获取元数据也不难,只要随便找到集群下某一台服务器就能够了(由于集群中的每一台服务器元数据都是同样的)

③ 名词3 --- 缓冲区

此时生产者不着急把消息发送出去,而是先放到一个缓冲区

④ 名词4 --- Sender

把消息放进缓冲区以后,与此同时会有一个独立线程Sender去把消息分批次包装成一个个Batch,不难想到若是Kafka真的是一条消息一条消息地传输,一条消息就是一个网络链接,那性能就会被拉得不好。为了提高吞吐量,因此采起了分批次的作法

整好一个个batch以后,就开始发送给对应的主机上面。此时通过第一篇所提到的Kakfa的网络设计中的模型,而后再写到os cache,再写到磁盘上面。

下图是当时咱们已经说明过的Kafka网络设计模型

⑤ 生产者代码

1.设置参数部分

// 建立配置文件对象
Properties props = new Properties();

// 这个参数目的是为了获取kafka集群的元数据
// 写一台主机也行,多个更加保险
// 这里使用的是主机名,要根据server.properties来决定
// 使用主机名的状况须要配置电脑的hosts文件(重点)
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");  

// 这个就是负责把发送的key从字符串序列化为字节数组
// 咱们能够给每一个消息设置key,做用以后再阐述
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 这个就是负责把你发送的实际的message从字符串序列化为字节数组
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 如下属于调优,以后再解释
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
复制代码

2.建立生产者实例

// 建立一个Producer实例:线程资源,跟各个broker创建socket链接资源
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
复制代码

3.建立消息

ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-value");
复制代码

固然你也能够指定一个key,做用以后会说明

ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-key", "test-value");
复制代码

4.发送消息

带有一个回调函数,若是没有异常就返回消息发送成功

// 这是异步发送的模式
producer.send(record, new Callback() {
	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		if(exception == null) {
			// 消息发送成功
			System.out.println("消息发送成功");  
		} else {
			// 消息发送失败,须要从新发送
		}
	}
});
Thread.sleep(10 * 1000); 
		
// 这是同步发送的模式(是通常不会使用的,性能不好,测试可使用)
// 你要一直等待人家后续一系列的步骤都作完,发送消息以后
// 有了消息的回应返回给你,你这个方法才会退出来
producer.send(record).get(); 
复制代码

5.关闭链接

producer.close();
复制代码

2、干货时间:调优部分的代码

区分是否是一个勤于思考的打字员的部分其实就是在1那里尚未讲到的那部分调优,一个个拿出来单独解释,就是下面这一大串

props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
复制代码

① acks 消息验证

props.put("acks", "-1");
复制代码
acks 消息发送成功判断
-1 leader & all follower接收
1 leader接收
0 消息发送便可

这个acks参数有3个值,分别是-1,0,1,设置这3个不一样的值会成为kafka判断消息发送是否成功的依据。Kafka里面的分区是有副本的,若是acks为-1.则说明消息在写入一个分区的leader partition后,这些消息还须要被另外全部这个分区的副本同步完成后,才算发送成功(对应代码就是输出System.out.println("消息发送成功")),此时发送数据的性能下降

若是设置acks为1,须要发送的消息只要写入了leader partition,即算发送成功,可是这个方式存在丢失数据的风险,好比在消息恰好发送成功给leader partition以后,这个leader partition马上宕机了,此时剩余的follower不管选举谁成为leader,都不存在刚刚发送的那一条消息。

若是设置acks为0,消息只要是发送出去了,就默认发送成功了。啥都无论了。

② retries 重试次数(重要)

这个参数仍是很是重要的,在生产环境中是必须设置的参数,为设置消息重发的次数

props.put("retries", 3);
复制代码

在kafka中可能会遇到各类各样的异常(能够直接跳到下方的补充异常类型),可是不管是遇到哪一种异常,消息发送此时都出现了问题,特别是网络忽然出现问题,可是集群不可能每次出现异常都抛出,可能在下一秒网络就恢复了呢,因此咱们要设置重试机制。

这里补充一句:设置了retries以后,集群中95%的异常都会本身乘风飞去,我真没开玩笑🤣

代码中我配置了3次,其实设置5~10次都是合理的,补充说明一个,若是咱们须要设置隔多久重试一次,也有参数,没记错的话是retry.backoff.ms,下面我设置了100毫秒重试一次,也就是0.1秒

props.put("retry.backoff.ms",100);
复制代码

③ batch.size 批次大小

批次的大小默认是16K,这里设置了32K,设置大一点能够稍微提升一下吞吐量,设置这个批次的大小还和消息的大小有关,假设一条消息的大小为16K,一个批次也是16K,这样的话批次就失去意义了。因此咱们要事先估算一下集群中消息的大小,正常来讲都会设置几倍的大小。

props.put("batch.size", 323840);
复制代码

④ linger.ms 发送时间限制

好比我如今设置了批次大小为32K,而一条消息是2K,此时已经有了3条消息发送过来,总大小为6K,而生产者这边就没有消息过来了,那在没够32K的状况下就不发送过去集群了吗?显然不是,linger.ms就是设置了固定多长时间,就算没塞满Batch,也会发送,下面我设置了100毫秒,因此就算个人Batch迟迟没有满32K,100毫秒事后都会向集群发送Batch。

props.put("linger.ms", 100);
复制代码

⑤ buffer.memory 缓冲区大小

当咱们的Sender线程处理很是缓慢,而生产数据的速度很快时,咱们中间的缓冲区若是容量不够,生产者就没法再继续生产数据了,因此咱们有必要把缓冲区的内存调大一点,缓冲区默认大小为32M,其实基本也是合理的。

props.put("buffer.memory", 33554432);
复制代码

那应该如何去验证咱们这时候应该调整缓冲区的大小了呢,咱们能够用通常Java计算结束时间减去开始时间的方式测试,当结束时间减去开始时间大于100ms,咱们认为此时Sender线程处理速度慢,须要调大缓冲区大小。

固然通常状况下咱们是不须要去设置这个参数的,32M在广泛状况下已经足以应付了。

Long startTime=System.currentTime();
producer.send(record, new Callback() {
	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		if(exception == null) {
			// 消息发送成功
			System.out.println("消息发送成功");  
		} else {
			// 消息发送失败,须要从新发送
		}
	}
});
Long endTime=System.currentTime();
If(endTime - startTime > 100){//说明内存被压满了
 说明有问题
复制代码

}

⑦ compression.type 压缩方式

compression.type,默认是none,不压缩,可是也可使用lz4压缩,效率仍是不错的,压缩以后能够减少数据量,提高吞吐量,可是会加大producer端的cpu开销

props.put("compression.type", lz4);
复制代码

⑧ max.block.ms

留到源码时候说明,是设置某几个方法的阻塞时间

props.put("max.block.ms", 3000);
复制代码

⑨ max.request.size 最大消息大小

max.request.size:这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1M,这个通常过小了,不少消息可能都会超过1mb的大小,因此须要本身优化调整,把它设置更大一些(企业通常设置成10M),否则程序跑的好好的忽然来了一条2M的消息,系统就报错了,那就得不偿失

props.put("max.request.size", 1048576);    
复制代码

⑩ request.timeout.ms 请求超时

request.timeout.ms:这个就是说发送一个请求出去以后,他有一个超时的时间限制,默认是30秒,若是30秒都收不到响应(也就是上面的回调函数没有返回),那么就会认为异常,会抛出一个TimeoutException来让咱们进行处理。若是公司网络很差,要适当调整此参数

props.put("request.timeout.ms", 30000); 
复制代码

补充:kafka中的异常

无论是异步仍是同步,均可能让你处理异常,常见的异常以下:

1)LeaderNotAvailableException:这个就是若是某台机器挂了,此时leader副本不可用,会致使你写入失败,要等待其余follower副本切换为leader副本以后,才能继续写入,此时能够重试发送便可。若是说你平时重启kafka的broker进程,确定会致使leader切换,必定会致使你写入报错,是LeaderNotAvailableException

2)NotControllerException:这个也是同理,若是说Controller所在Broker挂了,那么此时会有问题,须要等待Controller从新选举,此时也是同样就是重试便可

3)NetworkException:网络异常,重试便可 咱们以前配置了一个参数,retries,他会自动重试的,可是若是重试几回以后仍是不行,就会提供Exception给咱们来处理了。 参数:retries 默认值是3 参数:retry.backoff.ms 两次重试之间的时间间隔

finally

上面从生产者生产消息到发送这一个流程分析下来,从而引出下面的各类各样关于整个过程的参数的设置,若是真的能清晰地理解好这些基础知识,相信对你一定是有所帮助。以后会再带一个生产者的案例和消费者进来。感兴趣的朋友能够关注一下,谢谢。

相关文章
相关标签/搜索