一次flume exec source采集日志到kafka由于单条日志数据很是大同步失败的踩坑带来的思考

本次遇到的问题描述,日志采集同步时,当单条日志(日志文件中一行日志)超过2M大小,数据没法采集同步到kafka,分析后,共踩到以下几个坑。
一、flume采集时,经过shell+EXEC(tail -F xxx.log 的方式) source来获取日志时,当单条日志过大超过1M时,source端没法从日志中获取到Event。
二、日志超过1M后,flume的kafka sink 做为生产者发送给日志给kafka失败,kafka没法收到消息。
如下针对踩的这两个坑作分析,flume 我使用的是1.9.0版本。 kafka使用的是2.11-2.0.0版本html

问题1、flume采集时,经过shell+EXEC(tail -F  xxx.log 的方式) source来获取日志时,当单条日志过大超过1M时,source端没法从日志中获取到Event。flume的配置以下:java

 ......
 agent.sources = seqGenSrc
 ......
 # For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = exec
#agent.sources.seqGenSrc.command = tail -F /opt/logs/test.log|grep businessCollection|awk -F '- {' '{print "{"$2}'
agent.sources.seqGenSrc.command = tail -F /opt/logs/test.log|grep businessCollection
agent.sources.seqGenSrc.shell = /bin/bash -c
agent.sources.seqGenSrc.batchSize = 1
agent.sources.seqGenSrc.batchTimeout = 90000
......

  缘由:采用shell+EXEC方式的时候,flume的源码中使用的是以下的方式来获取日志shell

    private Process process = null;
	//使用这种方式来执行命令。
process = Runtime.getRuntime().exec(commandArgs);
//读取日志
 reader = new BufferedReader(  new InputStreamReader(process.getInputStream(), charset));

  

在一行日志超过1M后,这个代码就假死了,一直宕住,致使没法获取到数据。

针对这个问题处理方式:
方式一:修改源码的实现方式。(1.9.0的源码 对应的是源码中的flume-ng-core 项目中的org.apache.flume.source.ExecSource.java 这个类)apache

//process的采用以下方式获和执行命令,就改一行代码。增长.redirectErrorStream(true)后,输入流就均可以获取到,哪怕超过1M
process = new ProcessBuilder(commandArgs).redirectErrorStream(true).start();

  

  

 

修改完成后,从新打包编译,而后将生成的jar包替换原来老的jar包。json

  方式二:放弃EXECSource,使用TAILDIR Source。 使用这个source时,对应的配置以下:bash

 ......
 agent.sources = seqGenSrc
 ......
 # For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = TAILDIR
agent.sources.seqGenSrc.positionFile = ./taildir_position.json
agent.sources.seqGenSrc.filegroups = seqGenSrc
agent.sources.seqGenSrc.filegroups.seqGenSrc = /opt/logs/test.log
agent.sources.seqGenSrc.fileHeader = false
agent.sources.seqGenSrc.batchSize = 1
......

  建议采用TAILDIR Source 比较好,这个能够对多个日志进行监控和采集,并且日志采集时会记录日志采集位置到positionFile 中,这样日志采集不会重复。EXEC SOURCE在重启采集时数据会重复采集,还须要其余的方式去避免重复采集socket

问题2、日志超过1M后,flume的kafka sink 做为生产者发送给日志给kafka失败,kafka没法收到消息
缘由:kafka 在默认状况下,只能接收1M大小之内的消息,在没有作自定义设置时。因此单条消息大于1M后是没法处理的。
处理方式以下:fetch

1)、修改kafka 服务端server.properties文件,作以下设置(修改大小限制)ui

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=502400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=502400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
message.max.bytes=5242880
replica.fetch.max.bytes=6291456

2)、修改producer.properties,作以下设置(修改大小限制)spa

# the maximum size of a request in bytes
max.request.size= 9242880

3)、java代码中在初始化kafka 生产者时,也须要指定max.request.size= 9242880

 

        Properties properties = new Properties();
		...
		      properties.put("max.request.size", 5242880);
			  ...
			KafkaProducer<Object,Object>  kafkaProducer = new KafkaProducer<Object,Object>(properties);

  4)、消费者在消费kafka数据时,也须要注意设置消费消息的大小限制

            Properties properties = new Properties();
			...
            properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 6291456);		
				...
				 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);  

对于flume不了的同窗,能够看flume 1.9中文版用户指南:https://www.h3399.cn/201906/700076.html  

相关文章
相关标签/搜索