Flume 1.2.0 用户向导

概览
     Apache的Flume是一个分布式的,质量可靠,可有效地收集,汇总和来自许多不一样来源的大量日志数据到集中的数据存储系统。java

    Apache的Flume是在Apache软件基金会的顶级项目。目前有两个版本的代码行,0.9.x版本和1.x的版本本文件适用于1.x的代码行。请点击这里Flume 0.9.x版本的用户指南。node

系统环境要求web

    JDK1.6以上版本shell

数据流数据库

Flume事件被定义为一个单位的数据流量有一个字节的有效载荷和一个可选字符串属性。Flume代理(JVM)的过程当中,承载组件,经过这些事件流从外部源的下一个目的地(跳)。缓存

flume

一个web服务器的产生的事件由 Flume源消耗。外部源发送事件发送到Flume中,会带着一个识别的格式。例如: 例如:一个Avro Flume源能够用来接收从Avro clients 其余flume代理Avro link发送事件。当一个Flume 源接收一个事件,他会存储到一个活多个channels中,这些channel会一直保存着event,直到被Flume sink消费处理掉,例如JDBC Channel做为一个例子-它使用一个文件系统支持嵌入式数据库,sink从channel中移除事件,同时放入到一个外部的仓库,好比HDFS,或者流转到下一个Flume source 源,source和sink在agent中是以异步运行方式运行事件。服务器

复杂数据流app

Flume到达最终目的地以前,容许用户创建多跳流活动,经过多个代理。它还容许fan-infan-out flows,内容路由和备份路由失败(故障转移)。dom

配置一个代理(agent)异步

Flume代理配置存储在本地配置文件。这是一个文本文件格式以下Java属性文件格式。在相同的配置文件,能够指定一个或多个代理的配置。配置文件包括每一个源,接收器和代理渠道的性质和它们链接在一块儿,造成数据流。

配置单个组件

流中每一个组件(源,接收器或通道)的名称,类型,和一组特定的类型和实例的属性。例如 Avro源须要一个主机名(或IP地址)和接收数据的端口号。一个内存通道能够有最大队列大小(“能力”),HDFS的散热器须要知道文件系统的URI,路径建立文件,文件的旋转频率(“hdfs.rollInterval”)等,全部这些组件的属性须要设置在托管 Flume 代理的属性文件

 组合组件(Wiring the pieces together)

代理须要知道什么加载各个组件以及它们是如何链接,以构成的流动。这是经过列出的源,汇和代理渠道的名称,而后指定每一个接收器和源的链接通道。例如,代理到HDFS flume HDFS cluster1中经过JDBC JDBC通道通道流动称为avroWeb Avro 源的事件。该配置文件将包含这些组件和JDBC通道为avroWeb源和HDFS cluster1中汇做为共享信道的名称。

启动代理(starting an agent)

代理人是开始使用shell脚本称为flume-NG是位于flume分布在bin目录。你须要在命令行上指定的代理的名称,config目录,配置文件:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

如今,代理将开始运行的源和汇的配置在给定的属性文件。

A simple example

在这里,咱们举一个例子,配置文件,描述一个单节点的Flume部署。这种配置可让用户生成的事件和随后输出到控制台。

# example.conf: A single-node Flume configuration

# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = localhost
agent1.sources.source1.port = 44444

# Describe sink1
agent1.sinks.sink1.type = logger

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

这个配置定义了一个单一的代理,称为agent1。 agent1监听44444端口,通道缓存在内存中事件数据,事件数据记录到控制台和一个接收器上的数据源。配置文件名的各个组成部分,而后介绍了他们的类型和配置参数。一个给定的配置文件可能会定义多个命名的代理人;一个给定的Flume进程启动时传递一个标志,告诉它的具名代理体现。

结合此配置文件,咱们启动Flume按以下参数:

$ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.root.logger=INFO,console

请注意,在完整部署,咱们一般会包括一个选项: - CONF=<conf-dir>。 <conf-dir>目录将包括一个shell脚本flume-env.sh和内置的Log4j属性文件。在这个例子中,咱们使用一个Java选项强制flume登陆到控制台

咱们能够从一个单独的终端,而后telnet端口44444和发送flume事件:

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

他原来的flume终端输出日志信息的事件。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

恭喜你 - 你已经成功地配置和部署了一个flume代理!随后的章节涵盖更详细的代理配置。

数据获取

flume支持从从外部数据源获取数据的机制。

 RPC

在flume中 ,Avro客户端使用AVRO RPC机制能够发送一个给定的文件 Avro 源:

$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

上面的命令将发送的/ usr/logs/log.10的内容到  flume源监听端

Executing commands

还有一个exec执行一个给定的命令得到输出的源。一个单一的输出,即“line”。回车('\ R')或换行符('\ N'),或二者一块儿的文本。

注:Flume不支持tail作为一个源,不过能够经过exec tail。

Network streams

Flume支持如下的机制,从流行的日志流类型读取数据

  1. Avro
  2. Syslog
  3. Netcat

Flume部署种类

    设置多代理流程


    合并

在日志收集的一个很是广泛的状况是大量生产客户日志的数据发送到一些消费者代理链接到存储子系统。举例来讲,从数以百计的Web服务器收集的日志发送到十几代理写入HDFS集群


This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent. This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.

    多路复用流


  定义流

   在一个单一的代理定义的流,你须要经过一个通道的来源和接收器连接。你须要列出源,接收器和通道,为给定的代理,而后指向源和接收器及通道。一个源的实例能够指定多个通道,但只能指定一个接收器实例通道。格式以下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

例如一个代理名为weblog-agent,外部经过avro客户端,而且发送数据经过内存通道给hdfs。在配置文件weblog.config的可能看起来像这样:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1

# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

这将使事件流从avro-AppSrv-source到hdfs-Cluster1-sink经过内存通道mem-channel-1。当代理开始weblog.config做为其配置文件,它会实例化流。

 

  配置单个组件

    定义流以后,你须要设置每一个源,接收器和通道的属性。能够分别设定组件的属性值。

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>

# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

“type”属性必须为每一个组件设置,以了解它须要什么样的对象。每一个源,接收器和通道类型有其本身的一套,它所需的性能,以实现预期的功能。全部这些,必须根据须要设置。在前面的例子中,咱们拿到从hdfs-Cluster1-sink中的流到HDFS,经过内存通道mem-channel-1的avro-AppSrv-source源。下面是一个例子,显示了这些组件的配置。

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1

# set channel for sources, sinks

# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000

# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100

# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

在一个代理中添加多个流

单个Flume代理能够包含几个独立的流。你能够在一个配置文件中列出多个源,接收器和通道。这些组件能够链接造成多个流。

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

那么你就能够链接源和接收器到其相应的通道,设置两个不一样的流。例如,若是您须要设置一个weblog代理两个流,一个从外部Avro客户端到HDFS,另一个是tail的输出到Avro接收器,而后在这里是作一个配置:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 jdbc-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = jdbc-channel-2
agent_foo.sinks.avro-forward-sink2.channel = jdbc-channel-2

配置多代理流程

    设置一个多层的流,你须要有一个指向下一跳avro源的第一跳的avro 接收器。这将致使第一Flume代理转发事件到下一个Flume代理。例如,若是您按期发送的文件,每一个事件(1文件)AVRO客户端使用本地Flume代理,那么这个当地的代理能够转发到另外一个有存储的代理。

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = jdbc-channel

# define the flow
agent_foo.sources.avro-AppSrv-source.channels = jdbc-channel
agent_foo.sinks.avro-forward-sink.channel = jdbc-channel

# avro sink properties
agent_foo.sources.avro-forward-sink.type = avro
agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sources.avro-forward-sink.port = 10000

# configure other pieces

HDFS agent config:

list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel

# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel

# avro sink properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000

# configure other pieces
#...

这里咱们链接从weblog-agent的avro-forward-sink 到hdfs-agent的avro-collection-source收集源。最终结果从外部源的appserver最终存储在HDFS的事件。

 

 

Fan out flow

Flume支持Fan out流从一个源到多个通道。有两种模式的Fan out,分别是复制和复用。在复制的状况下,流的事件被发送到全部的配置通道。在复用的状况下,事件被发送到可用的渠道中的一个子集。Fan out流须要指定源和Fan out通道的规则。这是经过添加一个通道“选择”,能够复制或复。再进一步指定选择的规则,若是它是一个多路。若是你不指定一个选择,则默认状况下它复制

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

复用的选择集的属性进一步分叉。这须要指定一个事件属性映射到一组通道。选择配置属性中的每一个事件头检查。若是指定的值相匹配,那么该事件被发送到全部的通道映射到该值。若是没有匹配,那么该事件被发送到设置为默认配置的通道。

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

映射容许每一个值通道能够重叠。默认值能够包含任意数量的通道。下面的示例中有一个单一的流复用两条路径。代理有一个单一的avro源和链接道两个接收器的两个通道。

 

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 jdbc-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 jdbc-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = jdbc-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

“State”做为Header的选择检查。若是值是“CA”,而后将其发送到mem-channel-1,若是它的“AZ”的,那么jdbc-channel-2,若是它的“NY”那么发到这两个。若是“State”头未设置或不匹配的任何三个,而后去默认的mem-channel-1通道。

Flume Sources

Avro Source

Avro端口监听并接收来自外部的Avro客户流的事件。当内置AvroSink另外一个(前跳)Flume代理,它能够建立分层集合配对拓扑。

Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
interceptors Space separated list of interceptors
interceptors.*    

Example for agent named agent_foo

agent_foo.sources = avrosource-1
agent_foo.channels = memoryChannel-1
agent_foo.sources.avrosource-1.type = avro
agent_foo.sources.avrosource-1.channels = memoryChannel-1
agent_foo.sources.avrosource-1.bind = 0.0.0.0
agent_foo.sources.avrosource-1.port = 4141

Exec Source

此源启动运行一个给定的Unix命令,预计这一过程当中不断产生标准输出(stderr被简单地丢弃,除非logStdErr= TRUE)上的数据。若是因任何缘由的进程退出时,源也退出,并不会产生任何进一步的数据。

Property Name Default Description
channels  
type The component type name, needs to be exec
command The command to execute
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space separated list of interceptors
interceptors.*    

Warning

 

The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. You have been warned.

备注: 在ExecSource不能保证,若是有一个失败的放入到通道的事件,客户也知道。在这种状况下,数据将丢失。

Example for agent named agent_foo:

agent_foo.sources = tailsource-1
agent_foo.channels = memoryChannel-1
agent_foo.sources.tailsource-1.type = exec
agent_foo.sources.tailsource-1.command = tail -F /var/log/secure
agent_foo.sources.tailsource-1.channels = memoryChannel-1
相关文章
相关标签/搜索