Flume-ng的原理和使用

原文发表在:http://blog.javachen.com/2014/07/22/flume-ng.htmlhtml

1. 介绍

Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它可以将不一样数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到如今的Flume NG,进行了架构重构,而且如今NG版本彻底不兼容原来的OG版本。通过架构重构后,Flume NG更像是一个轻量的小工具,很是简单,容易适应各类方式日志收集,并支持failover和负载均衡。前端

Flume 使用 java 编写,其须要运行在 Java1.6 或更高版本之上。java

2. 架构

Flume的架构主要有一下几个核心概念:node

  • Event:一个数据单元,带有一个可选的消息头
  • Flow:Event从源点到达目的点的迁移的抽象
  • Client:操做位于源点处的Event,将其发送到Flume Agent
  • Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
  • Source:用来消费传递到该组件的Event
  • Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
  • Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(若是有的话)

2.1 数据流

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送必定成功,在送到目的地以前,会先缓存数据,待数据真正到达目的地后,删除本身缓存的数据。git

Flume 传输的数据的基本单位是 Event,若是是文本文件,一般是一行记录,这也是事务的基本单位。Event 从 Source,流向 Channel,再到 Sink,自己为一个 byte 数组,并可携带 headers 信息。Event 表明着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。github

Flume 运行的核心是 Agent。它是一个完整的数据收集工具,含有三个核心组件,分别是 source、channel、sink。经过这些组件,Event 能够从一个地方流向另外一个地方,以下图所示。数据库

  • source 能够接收外部源发送过来的数据。不一样的 source,能够接受不一样的数据格式。好比有目录池(spooling directory)数据源,能够监控指定文件夹中的新文件变化,若是目录中有文件产生,就会马上读取其内容。
  • channel 是一个存储地,接收 source 的输出,直到有 sink 消费掉 channel 中的数据。channel 中的数据直到进入到下一个channel中或者进入终端才会被删除。当 sink 写入失败后,能够自动重启,不会形成数据丢失,所以很可靠。
  • sink 会消费 channel 中的数据,而后送给外部源或者其余 source。如数据能够写入到 HDFS 或者 HBase 中。

2.2 核心组件

2.2.1 source

Client端操做消费数据的来源,Flume 支持 Avro,log4j,syslog 和 http post(body为json格式)。可让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也能够 写一个 Source,以 IPC 或 RPC 的方式接入本身的应用,Avro和 Thrift 均可以(分别有 NettyAvroRpcClient 和 ThriftRpcClient 实现了 RpcClient接口),其中 Avro 是默认的 RPC 协议。具体代码级别的 Client 端数据接入,能够参考官方手册。apache

对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本能够实现无缝接入,不须要对现有程序进行任何改动。
对于直接读取文件 Source,有两种方式:json

  • ExecSource: 以运行 Linux 命令的方式,持续的输出最新的数据,如 tail -F 文件名 指令,在这种方式下,取的文件名必须是指定的。 ExecSource 能够实现对日志的实时收集,可是存在Flume不运行或者指令执行出错时,将没法收集到日志数据,没法保证日志数据的完整性。
  • SpoolSource: 监测配置的目录下新增的文件,并将文件中的数据读取出来。须要注意两点:拷贝到 spool 目录下的文件不能够再打开编辑;spool 目录下不可包含相应的子目录。

SpoolSource 虽然没法实现实时的收集数据,可是可使用以分钟的方式分割文件,趋近于实时。数组

若是应用没法实现以分钟切割日志文件的话, 能够两种收集方式结合使用。 在实际使用的过程当中,能够结合 log4j 使用,使用 log4j的时候,将 log4j 的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。

log4j 有一个 TimeRolling 的插件,能够把 log4j 分割文件到 spool 目录。基本实现了实时的监控。Flume 在传完文件以后,将会修改文件的后缀,变为 .COMPLETED(后缀也能够在配置文件中灵活指定)。

Flume Source 支持的类型:

Source类型 说明
Avro Source 支持Avro协议(其实是Avro RPC),内置支持
Thrift Source 支持Thrift协议,内置支持
Exec Source | 基于Unix的command在标准输出上生产数据
JMS Source 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过
Spooling Directory Source 监控指定目录内数据变动
Twitter 1% firehose Source 经过API持续下载Twitter数据,试验性质
Netcat Source 监控某个端口,将流经端口的每个文本行数据做为Event输入
Sequence Generator Source 序列生成器数据源,生产序列数据
Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)

2.2.2 Channel

当前有几个 channel 可供选择,分别是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见的是前三种 channel。

  • MemoryChannel 能够实现高速的吞吐,可是没法保证数据的完整性。
  • MemoryRecoverChannel 在官方文档的建议上已经建义使用FileChannel来替换。
  • FileChannel保证数据的完整性与一致性。在具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不一样的磁盘,以便提升效率。

File Channel 是一个持久化的隧道(channel),它持久化全部的事件,并将其存储到磁盘中。所以,即便 Java 虚拟机当掉,或者操做系统崩溃或重启,再或者事件没有在管道中成功地传递到下一个代理(agent),这一切都不会形成数据丢失。Memory Channel 是一个不稳定的隧道,其缘由是因为它在内存中存储全部事件。若是 java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间收到 RAM大小的限制,而 File Channel 这方面是它的优点,只要磁盘空间足够,它就能够将全部事件数据存储到磁盘上。

Flume Channel 支持的类型:

Channel类型 说明
Memory Channel Event数据存储在内存中
JDBC Channel Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel Event数据存储在磁盘文件中
Spillable Memory Channel Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)
Pseudo Transaction Channel 测试用途
Custom Channel 自定义Channel实现

2.2.3 sink

Sink在设置存储数据时,能够向文件系统、数据库、hadoop存数据,在日志数据较少时,能够将数据存储在文件系中,而且设定必定的时间间隔保存数据。在日志数据较多时,能够将相应的日志数据存储到Hadoop中,便于往后进行相应的数据分析。

Flume Sink支持的类型

Sink类型 说明
HDFS Sink 数据写入HDFS
Logger Sink 数据写入日志文件
Avro Sink 数据被转换成Avro Event,而后发送到配置的RPC端口上
Thrift Sink 数据被转换成Thrift Event,而后发送到配置的RPC端口上
IRC Sink 数据在IRC上进行回放
File Roll Sink 存储数据到本地文件系统
Null Sink 丢弃到全部数据
HBase Sink 数据写入HBase数据库
Morphline Solr Sink 数据发送到Solr搜索服务器(集群)
ElasticSearch Sink 数据发送到Elastic Search搜索服务器(集群)
Kite Dataset Sink 写数据到Kite Dataset,试验性质的
Custom Sink 自定义Sink实现

更多sink的内容能够参考官方手册

2.3 可靠性

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送必定成功,在送到目的地以前,会先缓存数据,待数据真正到达目的地后,删除本身缓存的数据。

Flume 使用事务性的方式保证传送Event整个过程的可靠性。Sink 必须在 Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地以后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 不管是在一个 agent 里仍是多个 agent 之间流转,都能保证可靠,由于以上的事务保证了 event 会被成功存储起来。而 Channel 的多种实如今可恢复性上有不一样的保证。也保证了 event 不一样程度的可靠性。好比 Flume 支持在本地保存一份文件 channel 做为备份,而memory channel 将 event 存在内存 queue 里,速度快,但丢失的话没法恢复。

2.4 可恢复性

3. 使用场景

下面,根据官网文档,咱们展现几种Flow Pipeline,各自适应于什么样的应用场景:

  • 多个 agent 顺序链接:

能够将多个Agent顺序链接起来,将最初的数据源通过收集,存储到最终的存储系统中。这是最简单的状况,通常状况下,应该控制这种顺序链接的Agent的数量,由于数据流经的路径变长了,若是不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

  • 多个Agent的数据汇聚到同一个Agent:

这种状况应用的场景比较多,好比要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每一个节点都产生用户行为日志,能够为每一个节点都配置一个Agent来单独收集日志数据,而后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

  • 多路(Multiplexing)Agent

这种模式,有两种方式,一种是用来复制(Replication),另外一种是用来分流(Multiplexing)。Replication方式,能够将最前端的数据源复制多份,分别传递到多个channel中,每一个channel接收到的数据都是相同的。

配置格式示例以下:

properties# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

上面指定了selector的type的值为replication,其余的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,而后数据被传递到Sink1和Sink2。

Multiplexing方式,selector能够根据header的值来肯定数据传递到哪个channel,配置格式,以下所示:

properties# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:若是header的值为Value一、Value2,数据从Source1路由到Channel1;若是header的值为Value二、Value3,数据从Source1路由到Channel2。

  • 实现load balance功能

Load balancing Sink Processor可以实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每一个Sink组件分别链接到一个独立的Agent上,示例配置,以下所示:

propertiesa1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
  • 实现failover能

Failover Sink Processor可以实现failover功能,具体流程相似load balance,可是内部处理机制与load balance彻底不一样:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。若是一个Sink可以成功处理Event,则会加入到一个Pool中,不然会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置以下所示:

propertiesa1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000

4. 安装和使用

Flume 的 rpm 安装方式很简单,这里不作说明。

示例1: avro 数据源

安装成功以后,在 /etc/flume/conf 目录建立f1.conf 文件,内容以下:

propertiesagent-1.channels.ch-1.type = memory

agent-1.sources.avro-source1.channels = ch-1
agent-1.sources.avro-source1.type = avro
agent-1.sources.avro-source1.bind = 0.0.0.0
agent-1.sources.avro-source1.port = 41414
agent-1.sources.avro-source1.threads = 5

agent-1.sinks.log-sink1.channel = ch-1
agent-1.sinks.log-sink1.type = logger

agent-1.channels = ch-1
agent-1.sources = avro-source1
agent-1.sinks = log-sink1

关于 avro-source 配置说明,请参考 avro-source

接下来启动 agent:

bash$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f1.conf -Dflume.root.logger=DEBUG,console -n agent-1

参数说明:

  • -n 指定agent名称
  • -c 指定配置文件目录
  • -f 指定配置文件
  • -Dflume.root.logger=DEBUG,console 设置日志等级

下面能够启动一个 avro-client 客户端生产数据:

bash$ flume-ng avro-client -c /etc/flume-ng/conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console

示例2:spooldir 数据源

在 /etc/flume/conf 目录建立 f2.conf 文件,内容以下:

propertiesagent-1.channels = ch-1
agent-1.sources = src-1

agent-1.channels.ch-1.type = memory

agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /root/log
agent-1.sources.src-1.fileHeader = true

agent-1.sinks.log-sink1.channel = ch-1
agent-1.sinks.log-sink1.type = logger

agent-1.sinks = log-sink1

关于 Spooling Directory Source 配置说明,请参考 Spooling Directory Source

接下来启动 agent:

bash$ flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/f2.conf -Dflume.root.logger=DEBUG,console -n agent-1

而后,手动拷贝一个文件到 /root/log 目录,观察日志输出以及/root/log 目录下的变化。

示例3:spooldir 数据源,写入 hdfs

在 /etc/flume/conf 目录建立 f3.conf 文件,内容以下:

propertiesagent-1.channels.ch-1.type = file
agent-1.channels.ch-1.checkpointDir= /root/checkpoint
agent-1.channels.ch-1.dataDirs= /root/data

agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /root/log
agent-1.sources.src-1.deletePolicy= never
agent-1.sources.src-1.fileHeader = true

agent-1.sources.src-1.interceptors =i1
agent-1.sources.src-1.interceptors.i1.type = timestamp

agent-1.sinks.sink_hdfs.channel = ch-1
agent-1.sinks.sink_hdfs.type = hdfs
agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d
agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs
agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = .
agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30
agent-1.sinks.sink_hdfs.hdfs.rollSize = 0
agent-1.sinks.sink_hdfs.hdfs.rollCount = 0
agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000
agent-1.sinks.sink_hdfs.hdfs.writeFormat = text
agent-1.sinks.sink_hdfs.hdfs.fileType = DataStream
#agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream
#agent-1.sinks.sink_hdfs.hdfs.codeC = lzop

agent-1.channels = ch-1
agent-1.sources = src-1
agent-1.sinks = sink_hdfs

关于 HDFS Sink配置说明,请参考 HDFS Sink

说明:

  1. 经过 interceptors 往 header 里添加 timestamp,这样作,能够在 hdfs.path 引用系统内部的时间变量或者主机的 hostname。
  2. 经过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当作隐藏文件,以免在 mr 过程当中读到这些临时文件,引发一些错误
  3. 若是使用 lzo 压缩,则须要手动建立 lzo 索引,能够经过修改 HdfsSink 的代码,经过代码建立索引
  4. FileChannel 的目录最好是和 spooldir 的数据目录处于不一样磁盘。

示例4:spooldir 数据源,写入 HBase

关于 HBase Sink 配置说明,请参考 HBase Sink

5. 开发相关

5.1 编译源代码

从 github 下载源代码并编译:

bash$ git clone git@github.com:cloudera/flume-ng.git -b cdh4-1.4.0_4.7.0
$ cd flume-ng
$ mvn install -DskipTests -Phadoop-2

若是提示找不到 hadoop-test 的 jar 包,则修改 pom.xml 中的版本,如改成 2.0.0-mr1-cdh4.7.0,具体版本视你使用的分支版本而定,我这里是 cdh4.7.0。

若是提示找不到 uanodeset-parser 的 jarb,则在 pom.xml 中添加下面仓库:

xml<repository>
  <id>tempo-db</id>
  <url>http://maven.tempo-db.com/artifactory/list/twitter/
  </url>
  <snapshots>
    <enabled>false</enabled>
  </snapshots>
</repository>

6. 最佳实践

参考基于Flume的美团日志收集系统(一)架构和设计,列出一些最佳实践:

  • 模块命名规则:全部的 Source 以 src 开头,全部的 Channel 以 ch 开头,全部的 Sink 以 sink 开头;
  • 模块之间内部通讯统一使用 Avro 接口;
  • 将日志采集系统系统分为三层:Agent 层,Collector 层和 Store 层,其中 Agent 层每一个机器部署一个进程,负责对单机的日志收集工做;Collector 层部署在中心服务器上,负责接收Agent层发送的日志,而且将日志根据路由规则写到相应的 Store 层中;Store 层负责提供永久或者临时的日志存储服务,或者将日志流导向其它服务器。
  • 扩展 MemoryChannel 和 FileChannel ,提供 DualChannel 的实现,以提供高吞吐和大缓存
  • 监控 collector HdfsSink写数据到 hdfs 的速度、FileChannel 中拥堵的 events 数量,以及写 hdfs 状态(查看是否有 .tmp 文件生成)

美团对 flume 的改进代码见 github:https://github.com/javachen/mt-flume

7. 参考文章

相关文章
相关标签/搜索