原文发表在:http://blog.javachen.com/2014/07/22/flume-ng.htmlhtml
Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它可以将不一样数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到如今的Flume NG,进行了架构重构,而且如今NG版本彻底不兼容原来的OG版本。通过架构重构后,Flume NG更像是一个轻量的小工具,很是简单,容易适应各类方式日志收集,并支持failover和负载均衡。前端
Flume 使用 java 编写,其须要运行在 Java1.6 或更高版本之上。java
Flume的架构主要有一下几个核心概念:node
Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送必定成功,在送到目的地以前,会先缓存数据,待数据真正到达目的地后,删除本身缓存的数据。git
Flume 传输的数据的基本单位是 Event,若是是文本文件,一般是一行记录,这也是事务的基本单位。Event 从 Source,流向 Channel,再到 Sink,自己为一个 byte 数组,并可携带 headers 信息。Event 表明着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。github
Flume 运行的核心是 Agent。它是一个完整的数据收集工具,含有三个核心组件,分别是 source、channel、sink。经过这些组件,Event 能够从一个地方流向另外一个地方,以下图所示。数据库
Client端操做消费数据的来源,Flume 支持 Avro,log4j,syslog 和 http post(body为json格式)。可让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也能够 写一个 Source,以 IPC 或 RPC 的方式接入本身的应用,Avro和 Thrift 均可以(分别有 NettyAvroRpcClient 和 ThriftRpcClient 实现了 RpcClient接口),其中 Avro 是默认的 RPC 协议。具体代码级别的 Client 端数据接入,能够参考官方手册。apache
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本能够实现无缝接入,不须要对现有程序进行任何改动。
对于直接读取文件 Source,有两种方式:json
tail -F 文件名
指令,在这种方式下,取的文件名必须是指定的。 ExecSource 能够实现对日志的实时收集,可是存在Flume不运行或者指令执行出错时,将没法收集到日志数据,没法保证日志数据的完整性。SpoolSource 虽然没法实现实时的收集数据,可是可使用以分钟的方式分割文件,趋近于实时。数组
若是应用没法实现以分钟切割日志文件的话, 能够两种收集方式结合使用。 在实际使用的过程当中,能够结合 log4j 使用,使用 log4j的时候,将 log4j 的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。
log4j 有一个 TimeRolling 的插件,能够把 log4j 分割文件到 spool 目录。基本实现了实时的监控。Flume 在传完文件以后,将会修改文件的后缀,变为 .COMPLETED(后缀也能够在配置文件中灵活指定)。
Flume Source 支持的类型:
Source类型 | 说明 |
---|---|
Avro Source | 支持Avro协议(其实是Avro RPC),内置支持 |
Thrift Source | 支持Thrift协议,内置支持 |
Exec Source | 基于Unix的command在标准输出上生产数据 | |
JMS Source | 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 |
Spooling Directory Source | 监控指定目录内数据变动 |
Twitter 1% firehose Source | 经过API持续下载Twitter数据,试验性质 |
Netcat Source | 监控某个端口,将流经端口的每个文本行数据做为Event输入 |
Sequence Generator Source | 序列生成器数据源,生产序列数据 |
Syslog Sources | 读取syslog数据,产生Event,支持UDP和TCP两种协议 |
HTTP Source | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 |
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
当前有几个 channel 可供选择,分别是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见的是前三种 channel。
File Channel 是一个持久化的隧道(channel),它持久化全部的事件,并将其存储到磁盘中。所以,即便 Java 虚拟机当掉,或者操做系统崩溃或重启,再或者事件没有在管道中成功地传递到下一个代理(agent),这一切都不会形成数据丢失。Memory Channel 是一个不稳定的隧道,其缘由是因为它在内存中存储全部事件。若是 java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间收到 RAM大小的限制,而 File Channel 这方面是它的优点,只要磁盘空间足够,它就能够将全部事件数据存储到磁盘上。
Flume Channel 支持的类型:
Channel类型 | 说明 |
---|---|
Memory Channel | Event数据存储在内存中 |
JDBC Channel | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
File Channel | Event数据存储在磁盘文件中 |
Spillable Memory Channel | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
Pseudo Transaction Channel | 测试用途 |
Custom Channel | 自定义Channel实现 |
Sink在设置存储数据时,能够向文件系统、数据库、hadoop存数据,在日志数据较少时,能够将数据存储在文件系中,而且设定必定的时间间隔保存数据。在日志数据较多时,能够将相应的日志数据存储到Hadoop中,便于往后进行相应的数据分析。
Flume Sink支持的类型
Sink类型 | 说明 |
---|---|
HDFS Sink | 数据写入HDFS |
Logger Sink | 数据写入日志文件 |
Avro Sink | 数据被转换成Avro Event,而后发送到配置的RPC端口上 |
Thrift Sink | 数据被转换成Thrift Event,而后发送到配置的RPC端口上 |
IRC Sink | 数据在IRC上进行回放 |
File Roll Sink | 存储数据到本地文件系统 |
Null Sink | 丢弃到全部数据 |
HBase Sink | 数据写入HBase数据库 |
Morphline Solr Sink | 数据发送到Solr搜索服务器(集群) |
ElasticSearch Sink | 数据发送到Elastic Search搜索服务器(集群) |
Kite Dataset Sink | 写数据到Kite Dataset,试验性质的 |
Custom Sink | 自定义Sink实现 |
更多sink的内容能够参考官方手册。
Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送必定成功,在送到目的地以前,会先缓存数据,待数据真正到达目的地后,删除本身缓存的数据。
Flume 使用事务性的方式保证传送Event整个过程的可靠性。Sink 必须在 Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地以后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 不管是在一个 agent 里仍是多个 agent 之间流转,都能保证可靠,由于以上的事务保证了 event 会被成功存储起来。而 Channel 的多种实如今可恢复性上有不一样的保证。也保证了 event 不一样程度的可靠性。好比 Flume 支持在本地保存一份文件 channel 做为备份,而memory channel 将 event 存在内存 queue 里,速度快,但丢失的话没法恢复。
下面,根据官网文档,咱们展现几种Flow Pipeline,各自适应于什么样的应用场景:
能够将多个Agent顺序链接起来,将最初的数据源通过收集,存储到最终的存储系统中。这是最简单的状况,通常状况下,应该控制这种顺序链接的Agent的数量,由于数据流经的路径变长了,若是不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
这种状况应用的场景比较多,好比要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每一个节点都产生用户行为日志,能够为每一个节点都配置一个Agent来单独收集日志数据,而后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
这种模式,有两种方式,一种是用来复制(Replication),另外一种是用来分流(Multiplexing)。Replication方式,能够将最前端的数据源复制多份,分别传递到多个channel中,每一个channel接收到的数据都是相同的。
配置格式示例以下:
properties# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
上面指定了selector的type的值为replication,其余的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,而后数据被传递到Sink1和Sink2。
Multiplexing方式,selector能够根据header的值来肯定数据传递到哪个channel,配置格式,以下所示:
properties# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:若是header的值为Value一、Value2,数据从Source1路由到Channel1;若是header的值为Value二、Value3,数据从Source1路由到Channel2。
Load balancing Sink Processor可以实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每一个Sink组件分别链接到一个独立的Agent上,示例配置,以下所示:
propertiesa1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
Failover Sink Processor可以实现failover功能,具体流程相似load balance,可是内部处理机制与load balance彻底不一样:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。若是一个Sink可以成功处理Event,则会加入到一个Pool中,不然会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置以下所示:
propertiesa1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000
Flume 的 rpm 安装方式很简单,这里不作说明。
安装成功以后,在 /etc/flume/conf 目录建立f1.conf 文件,内容以下:
propertiesagent-1.channels.ch-1.type = memory agent-1.sources.avro-source1.channels = ch-1 agent-1.sources.avro-source1.type = avro agent-1.sources.avro-source1.bind = 0.0.0.0 agent-1.sources.avro-source1.port = 41414 agent-1.sources.avro-source1.threads = 5 agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.channels = ch-1 agent-1.sources = avro-source1 agent-1.sinks = log-sink1
关于 avro-source 配置说明,请参考 avro-source
接下来启动 agent:
bash$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f1.conf -Dflume.root.logger=DEBUG,console -n agent-1
参数说明:
-n
指定agent名称-c
指定配置文件目录-f
指定配置文件-Dflume.root.logger=DEBUG,console
设置日志等级下面能够启动一个 avro-client 客户端生产数据:
bash$ flume-ng avro-client -c /etc/flume-ng/conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console
在 /etc/flume/conf 目录建立 f2.conf 文件,内容以下:
propertiesagent-1.channels = ch-1 agent-1.sources = src-1 agent-1.channels.ch-1.type = memory agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.fileHeader = true agent-1.sinks.log-sink1.channel = ch-1 agent-1.sinks.log-sink1.type = logger agent-1.sinks = log-sink1
关于 Spooling Directory Source 配置说明,请参考 Spooling Directory Source
接下来启动 agent:
bash$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f2.conf -Dflume.root.logger=DEBUG,console -n agent-1
而后,手动拷贝一个文件到 /root/log 目录,观察日志输出以及/root/log 目录下的变化。
在 /etc/flume/conf 目录建立 f3.conf 文件,内容以下:
propertiesagent-1.channels.ch-1.type = file agent-1.channels.ch-1.checkpointDir= /root/checkpoint agent-1.channels.ch-1.dataDirs= /root/data agent-1.sources.src-1.type = spooldir agent-1.sources.src-1.channels = ch-1 agent-1.sources.src-1.spoolDir = /root/log agent-1.sources.src-1.deletePolicy= never agent-1.sources.src-1.fileHeader = true agent-1.sources.src-1.interceptors =i1 agent-1.sources.src-1.interceptors.i1.type = timestamp agent-1.sinks.sink_hdfs.channel = ch-1 agent-1.sinks.sink_hdfs.type = hdfs agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = . agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30 agent-1.sinks.sink_hdfs.hdfs.rollSize = 0 agent-1.sinks.sink_hdfs.hdfs.rollCount = 0 agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000 agent-1.sinks.sink_hdfs.hdfs.writeFormat = text agent-1.sinks.sink_hdfs.hdfs.fileType = DataStream #agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream #agent-1.sinks.sink_hdfs.hdfs.codeC = lzop agent-1.channels = ch-1 agent-1.sources = src-1 agent-1.sinks = sink_hdfs
关于 HDFS Sink配置说明,请参考 HDFS Sink
说明:
hdfs.inUsePrefix
,例如设置为 .
时,hdfs 会把该文件当作隐藏文件,以免在 mr 过程当中读到这些临时文件,引发一些错误关于 HBase Sink 配置说明,请参考 HBase Sink
从 github 下载源代码并编译:
bash$ git clone git@github.com:cloudera/flume-ng.git -b cdh4-1.4.0_4.7.0 $ cd flume-ng $ mvn install -DskipTests -Phadoop-2
若是提示找不到 hadoop-test 的 jar 包,则修改 pom.xml 中的版本,如改成 2.0.0-mr1-cdh4.7.0
,具体版本视你使用的分支版本而定,我这里是 cdh4.7.0。
若是提示找不到 uanodeset-parser 的 jarb,则在 pom.xml 中添加下面仓库:
xml<repository> <id>tempo-db</id> <url>http://maven.tempo-db.com/artifactory/list/twitter/ </url> <snapshots> <enabled>false</enabled> </snapshots> </repository>
参考基于Flume的美团日志收集系统(一)架构和设计,列出一些最佳实践:
美团对 flume 的改进代码见 github:https://github.com/javachen/mt-flume。