因为是第一次进行源码编译与开发,步骤有点复杂,后续再进行简化html
Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume支持在日志系统中定制各种数据发送方用于收集数据,同时Flume提供对数据的简单处理,并将数据处理结果写入各类数据接收方的能力。java
编译所需环境:git
maven 3.xapache
java sdk 1.6 以上api
git服务器
下载源码,本文选用的是apache-flume-1.6.0-src.tar.gzmaven
解压至工做路径中,在git中执行:分布式
mvn clean mvn package -DskipTests
结果以下图所示即编译成功。 工具
而后在以下图的所示的flume-ng-dist文件夹下的target下出现apche-flume-1.6.0-bin.tar.gz与apche-flume-1.6.0-src.tar.gz,这样就能够任意的修改/开发各类本身须要的功能了。oop
Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中很是有用,Flume-ng 1.6中目前提供了如下拦截器:
在flume源码编译成功的前提下,就能够对源码作为所欲为的修改了,在本文主要对拦截器进行开发, 在org.apache.flume.interceptor下进行开发的:
具体开发原理等见博文flume拦截器分析
参考:http://blog.csdn.net/high2011/article/details/53282128
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.zookeeperConnect = ip :2181,ip2:2181,ip3:2181 tier1.sources.source1.topic = kafka_topicconfig_test_1 tier1.sources.source1.groupId = kafka_topicconfig_default_group tier1.sources.source1.channels = channel1 tier1.sources.source1.interceptors = i1 tier1.sources.source1.interceptors.i1.type = timestamp tier1.sources.source1.kafka.consumer.timeout.ms = 1000 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = hdfs://ip:9000/flume/%{topic}/%y-%m-%d tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1
注意(此处有待研究):
. 调整注释: Kafka源覆盖两个Kafka使用者参数:
暂尚未实验,先将连接记下:
1.http://blog.csdn.net/high2011/article/details/53282128 2.http://blog.csdn.net/huguoping830623/article/details/48138319
#pass
准备步骤:
测试场景:
在tps及硬件环境一致的前提下,场景一此时服务器cpu的平均使用率为:3.18%;而此时服务器cpu的平均使用率为:16.34%,.
在测试环境下,场景2的cpu使用率是场景1的5倍多,全部在生产环境下是否使用或增长该功能,还须要继续讨论。