最近开发一cdc应用,为了测试极端状况,须要kafka传递100万条数据过去,1个G左右,因为其余环节限制,不便进行拆包(注:测下来,大包走kafka不必定性能更好,甚至可能更低)。html
测试百万以上的变动数据时,报消息超过kafka broker容许的最大值,所以须要修改以下参数,保证包可以正常发送:算法
socket.request.max.bytes=2147483647 # 设置了socket server接收的最大请求大小
log.segment.bytes=2147483647 # kafka数据文件的大小,确保这个数值大于一个消息的长度。通常说来使用默认值便可(通常一个消息很难大于1G,由于这是一个消息系统,而不是文件系统)。
message.max.bytes=2147483647 # 设置了kafka server接收的最大消息大小,应小于等于socket.request.max.bytes
replica.fetch.max.bytes=2147483647 #每一个分区试图获取的消息字节数。要大于等于message.max.bytes,不然broker会接收此消息,但没法将此消息复制出去,从而形成数据丢失。
fetch.message.max.bytes=2147483647 #每一个提取请求中为每一个主题分区提取的消息字节数。要大于等于message.max.bytes,不然broker就会由于消费端没法使用这个消息而挂起。apache
生产者能够以下设定:服务器
kafkaProps.put("max.request.size", 2147483647); # 要小于 message.max.bytes,也能够设置在producer.properties配置文件中
kafkaProps.put("buffer.memory", 2147483647);
kafkaProps.put("timeout.ms", 3000000);
kafkaProps.put("request.timeout.ms", 30000000);
消费者设定以下:session
props.put("request.timeout.ms", 30000000);
props.put("session.timeout.ms", "3000000");
props.put("fetch.max.wait.ms", "3000000");
各参数的含义能够参考kafka官方文档https://kafka.apache.org/documentation/#configuration。app
kafka基础知识体系,请参考LZ学习笔记kafka学习指南(总结版)。socket
注,各参数对内存的影响以下:Brokers会为每一个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则须要差很少1G的内存,确保 分区数*最大的消息不会超过服务器的内存,不然会报OOM错误。一样地,消费端的fetch.message.max.bytes指定了最大消息须要的内存空间,一样,分区数*最大须要内存空间 不能超过服务器的内存。因此,若是你有大的消息要传送,则在内存必定的状况下,只能使用较少的分区数或者使用更大内存的服务器。性能
虽然上面的方法能够奏效,可是并不推荐。Kafka设计的初衷是迅速处理短小的消息,通常10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,咱们须要处理更大的消息,好比XML文档或JSON内容,一个消息差很少有10-100M,这种状况下,Kakfa应该如何处理?学习
针对这个问题,有如下几个建议:测试
上面这些值太大还会形成一个问题,就是消息没有在指定时间内(max.poll.interval.ms(默认300秒))消费完,致使被rebalance,而kafka自己有个bug(服务器端的rebalance.timeout.ms(默认60秒)不生效),这会致使消费者组的rebalance时间比较长,因此这是须要注意的,参见https://blog.csdn.net/u013200380/article/details/87868696。