Apache Flume是一个分布式、可靠、高可用的日志收集系统,支持各类各样的数据来源,如http,log文件,jms,监听端口数据等等,能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中,如kafka、分布式文件系统、Solr搜索服务器等;html
Apache Flume主要有如下几大模块组成:算法
模块组成图以下所示:数据库
下面将对各个模块作个简单的介绍,在这以前,有必要先了解一下什么是事件?apache
在Flume中,所谓的事件指的是Flume数据流中的数据单位,包含header和body,用于存储日志数据,其中header是一个map结构,咱们能够往header存放一些信息,如时间戳,appid等,以便后续对事件进行处理,body存放的是收集的日志内容字节流,结构以下图所示:json
先看下source模块在流程图中所处的位置,这里以最简单的架构图来做为示例,以下图所示:缓存
Flume source主要功能是消费传递给它的事件;服务器
Flume内置了各类类型的Source,用于处理各类类型的事件,以下所示,理论上Flume支持全部类型的事件,由于Flume支持自定义Source:架构
这里列举几个比较经常使用的source,app
如Exec Source,经过它咱们能够监听一个日志文件的变化,以下配置,负载均衡
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1
Avro Source,经过它,咱们能够将两个Flume Agent关联起来(由于agent的source和sink都支持Avro),正是这个特性,大大提升了flume的灵活性,可用性...
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
HTTP Source,经过它,能够接收http请求上报的数据,以下是配置示例,监听5140端口的http请求,这里的handle是能够自定义的,也就是说咱们能够接收任何类型的上报数据,如json格式、xml等等。
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.handler = org.example.rest.RestHandler a1.sources.r1.handler.nickname = random props
先看下interceptor模块在流程图中所处的位置,以下图所示:
拦截器主要的功能是对事件进行过滤,修改;
Flume内置支持的拦截器以下(主要两类:过滤和修改):
固然,flume是支持自定义拦截器的,以下是一个简单的配置示例:
#拦截器 a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = org.apache.flume.sw.interceptor.SignCheckInterceptor$Builder a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.RegexFilteringInterceptor$Builder a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
先看下interceptor模块在流程图中所处的位置,以下图所示:
通道选择器的主要功能是对事件流进行复制和分流;
Flume内置了两种类型的通道选择器:
以下是一个分流的配置示例:
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
固然,通道选择器是支持自定义的,咱们能够本身实现通道选择器,并作以下配置:
a1.sources = r1 a1.channels = c1 a1.sources.r1.selector.type = org.example.MyChannelSelector
先看下channel模块在流程图中所处的位置,以下图所示:
通道Channel的主要功能是缓存日志事件;
Flume内置的Channel以下:
一样,Flume支持自定义通道;
以下是一个内存通道的配置示例:
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
先看下Sink处理器在流程图中所处的位置,以下图所示:
Sink处理器的主要功能是让一组sink groups支持负载均衡和灾难转移功能,我以为跟通道选择器有点相似经过自定义的方式,我以为是能够实现通道选择器的功能的;
Flume内置的sink处理器以下:
一样的,也支持自定义sink处理器;
以下是一个负载均衡的例子,使用随机选择算法:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
先看下Sink模块在流程图中所处的位置,以下图所示:
Sink的主要功能是将事件输出到下一个agent的source或其它存储系统如,分布式文件系统、kafka、本地文件系统、日志等;
Flume内置的sink以下:
固然,flume也是支持自定义的;
咱们举个本地文件系统的例子,配置以下便可:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume
序列化在流程图中所处的位置与Sink同样,这里就不画了,简单地说,Sink负责将事件输出到外部,那么以何种形式输出(直接文本形式仍是其它形式),须要包含哪些东西(body仍是header仍是其它内容...),就是由事件序列化来完成的;
Flume内置的事件序列化以下:
Flume一样支持自定义事件序列化,须要实现EventSerializer接口;
下面举个Body Text Serializer的配置示例:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
上面对flume各个模块,或者说组件,作了一个简短的介绍,基本知道了Flume是个怎么回事,接下来将对各个组件作个介绍,并开发各个组件的自定义实现。