Kafka consumer处理大消息数据问题

案例分析

处理kafka consumer的程序的时候,发现以下错误:java

ERROR [2017-01-12 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

如上log能够看出,问题就是有一个较大的消息数据在codeTopic的partition 3上,而后consumer未能消费,提示我能够减少broker容许进入的消息数据的大小,或者增大consumer程序消费数据的大小。算法

从log上来看一目了然,若是要解决当前问题的话,shell

  1. 减少broker消息体大小(设置message.max.bytes参数);
  2. 增大consumer获取数据信息大小(设置fetch.message.max.bytes参数)。默认broker消息体大小为1000000字节即为1M大小。

消费者方面:fetch.message.max.bytes——>这将决定消费者能够获取的数据大小。
broker方面:replica.fetch.max.bytes——>这将容许broker的副本发送消息在集群并确保消息被正确地复制。若是这是过小,则消息不会被复制,所以,消费者永远不会看到的消息,由于消息永远不会承诺(彻底复制)。
broker方面:message.max.bytes——>能够接受数据生产者最大消息数据大小。
服务器

由个人场景来看较大的消息体已经进入到了kafka,我这里要解决这个问题,只须要增长consumer的fetch.message.max.bytes数值就好。我单独把那条数据消费出来,写到一个文件中发现那条消息大小为1.5M左右,为了不再次发生这种问题我把consumer程序的fetch.message.max.bytes参数调节为了3072000即为3M,重启consumer程序,查看log一切正常,解决这个消费错误到此结束,下面介绍一下kafka针对大数据处理的思考。session

kafka的设计初衷

Kafka设计的初衷是迅速处理小量的消息,通常10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,咱们须要处理更大的消息,好比XML文档或JSON内容,一个消息差很少有10-100M,这种状况下,Kakfa应该如何处理?app

针对这个问题,能够参考以下建议:

  • 最好的方法是不直接传送这些大的数据。若是有共享存储,如NAS, HDFS, S3等,能够把这些大的文件存放到共享存储,而后使用Kafka来传送文件的位置信息。性能

  • 第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的全部部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分从新还原为原始的消息。测试

  • 第三,Kafka的生产端能够压缩消息,若是原始消息是XML,当经过压缩以后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codeccommpressed.topics能够开启压缩功能,压缩算法可使用GZipSnappyfetch

不过若是上述方法都不是你须要的,而你最终仍是但愿传送大的消息,那么,则能够在kafka中设置下面一些参数:大数据

broker 配置

message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,不然broker就会由于消费端没法使用这个消息而挂起。
log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。通常说来使用默认值便可(通常一个消息很难大于1G,由于这是一个消息系统,而不是文件系统)。
replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,不然broker会接收此消息,但没法将此消息复制出去,从而形成数据丢失。

Consumer 配置

fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。因此,若是你必定要选择kafka来传送大的消息,还有些事项须要考虑。要传送大的消息,不是当出现问题以后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会下降吞吐量,在设计集群的容量时,尤为要考虑这点。
可用的内存和分区数:Brokers会为每一个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则须要差很少1G的内存,确保 分区数最大的消息不会超过服务器的内存,不然会报OOM错误。一样地,消费端的fetch.message.max.bytes指定了最大消息须要的内存空间,一样,分区数最大须要内存空间 不能超过服务器的内存。因此,若是你有大的消息要传送,则在内存必定的状况下,只能使用较少的分区数或者使用更大内存的服务器。
垃圾回收:到如今为止,我在kafka的使用中还没发现过此问题,但这应该是一个须要考虑的潜在问题。更大的消息会让GC的时间更长(由于broker须要分配更大的块),随时关注GC的日志和服务器的日志信息。若是长时间的GC致使kafka丢失了zookeeper的会话,则须要配置zookeeper.session.timeout.ms参数为更大的超时时间。

相关文章
相关标签/搜索