flume源码编译/拦截器分析(一)

flume介绍


因为是第一次进行源码编译与开发,步骤有点复杂,后续再进行简化html

Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume支持在日志系统中定制各种数据发送方用于收集数据,同时Flume提供对数据的简单处理,并将数据处理结果写入各类数据接收方的能力。java


flume源码编译

  1. 编译所需环境:git

    maven 3.xapache

    java sdk 1.6 以上api

    git服务器

  2. 下载源码,本文选用的是apache-flume-1.6.0-src.tar.gzmaven

解压至工做路径中,在git中执行:分布式

mvn clean
mvn package -DskipTests

结果以下图所示即编译成功。 图1工具

而后在以下图的所示的flume-ng-dist文件夹下的target下出现apche-flume-1.6.0-bin.tar.gz与apche-flume-1.6.0-src.tar.gz,这样就能够任意的修改/开发各类本身须要的功能了。oop

输入图片说明

  • 注意,在下载的源码中有hadoop与hbase版本须要修改,不一样版本修改不一样,上网搜一下看看。

flume拦截器

Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中很是有用,Flume-ng 1.6中目前提供了如下拦截器:

在flume源码编译成功的前提下,就能够对源码作为所欲为的修改了,在本文主要对拦截器进行开发, 在org.apache.flume.interceptor下进行开发的:

具体开发原理等见博文flume拦截器分析

kafka向flume发送数据

参考:http://blog.csdn.net/high2011/article/details/53282128

  1. 首先写生产者
  2. 搭建好flume环境并配置好配置文件 flume配置文件
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = ip
:2181,ip2:2181,ip3:2181
tier1.sources.source1.topic = kafka_topicconfig_test_1
tier1.sources.source1.groupId = kafka_topicconfig_default_group
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 1000

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://ip:9000/flume/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
  1. 搭建好hadoop环境 具体见hadoop基础环境搭建,在此教程中hadoop为伪分布式,分布式搭建见博客hadoop分布式搭建
  • 注意(此处有待研究):

    1. 为了得到更高的吞吐量,请配置多个Kafka源以从同一主题读取。
    2. 若是配置具备相同groupID的全部源,而且主题包含多个分区,则每一个源从不一样的分区集中读取数据,从而提升吞吐率。
  • . 调整注释: Kafka源覆盖两个Kafka使用者参数:

    1. auto.commit.enable由源设置为false,而且每一个批处理都提交。 为了提升性能,请使用kafka.auto.commit.enable设置将其设置为true。 若是源在提交前失败,则可能致使数据丢失。
    2. consumer.timeout.ms设置为10,所以当Flume轮询Kafka的新数据时,它等待不超过10毫秒的数据可用。 将此设置为更高的值能够下降CPU利用率,由于轮询频率较低,但在向通道写入批处理时引入了延迟。

flume->kafka

暂尚未实验,先将连接记下:

1.http://blog.csdn.net/high2011/article/details/53282128 2.http://blog.csdn.net/huguoping830623/article/details/48138319

kafka批量发送数据

脚本形式

#pass

api形式

见博文向kafka批量发送已存在的txt文件

压测报告

测试目的:测试flume的过滤器功能是否能够移植到自研软件上使用

测试过程:

准备步骤:

  • 根据业务需求,在flume中开发过滤器KafkaInterceptor.java;
  • 从生产环境下取出大小为4k的2290135条数据;

测试场景:

  1. 在不利用过滤器的条件下,利用kafka将数据发送给flume,而后存到hdfs中;
  2. 在利用过滤器的前提下,利用kafka将数据发送给flume,而后存到hdfs中;

测试结果(测试工具为nomn及nomn analyser):

在tps及硬件环境一致的前提下,场景一此时服务器cpu的平均使用率为:3.18%;而此时服务器cpu的平均使用率为:16.34%,.

测试结论:

在测试环境下,场景2的cpu使用率是场景1的5倍多,全部在生产环境下是否使用或增长该功能,还须要继续讨论。


若是有flume问题,请加QQ群:140467035 ,共同窗习进步

相关文章
相关标签/搜索