首先、这节水的东西就比较少了,大部分是例子。html
Avro端口监听并接收来自外部的Avro客户流的事件。当内置Avro 去Sinks另外一个配对Flume代理,它就能够建立分层采集的拓扑结构。官网说的比较绕,固然个人翻译也很弱,其实就是flume能够多级代理,而后代理与代理之间用Avro去链接java
下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了。linux
Property Nameshell |
Defaultapache |
Descriptionapp |
channelsdom |
–curl |
|
typetcp |
–ide |
The component type name, needs to be avro |
bind |
– |
hostname or IP address to listen on |
port |
– |
Port # to bind to |
threads |
– |
Maximum number of worker threads to spawn |
selector.type |
||
selector.* |
||
interceptors |
– |
Space-separated list of interceptors |
interceptors.* |
||
compression-type |
none |
This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl |
FALSE |
Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”. |
keystore |
– |
This is the path to a Java keystore file. Required for SSL. |
keystore-password |
– |
The password for the Java keystore. Required for SSL. |
keystore-type |
JKS |
The type of the Java keystore. This can be “JKS” or “PKCS12”. |
ipFilter |
FALSE |
Set this to true to enable ipFiltering for netty |
ipFilter.rules |
– |
Define N netty ipFilter pattern rules with this config. |
官网的例子就不放了,这边用实际例子显示。
[html] view plain copy
#配置文件avro_case2.conf 其实和第二节的pull.conf 如出一辙
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 192.168.233.128
a1.sources.r1.port = 55555
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel= c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/avro_case2.conf -n a1 -Dflume.root.logger=INFO,console
成功与否就不说明,与第二节的pull.conf 同。。。
#而后在另外一个终端进行测试
flume-ng avro-client -cconf -H 192.168.233.128 -p 44444 -F /tmp/logs/test.log
这个就是模拟第二节push代理费pull代理发数据,这里不写配置直接命令方式测试。
发送事件成功,这里和push代理不同的是没有用spool,因此日志文件名不会被更名称。
看接受终端显示
ok数据发送成功。
ThriftSource 与Avro Source 基本一致。只要把source的类型改为thrift便可,例如a1.sources.r1.type = thrift
比较简单,不作赘述。
ExecSource的配置就是设定一个Unix(Linux)命令,而后经过这个命令不断输出数据。若是进程退出,Exec Source也一块儿退出,不会产生进一步的数据。
下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了。
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be exec |
command |
– |
The command to execute |
shell |
– |
A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle |
10000 |
Amount of time (in millis) to wait before attempting a restart |
restart |
FALSE |
Whether the executed cmd should be restarted if it dies |
logStdErr |
FALSE |
Whether the command’s stderr should be logged |
batchSize |
20 |
The max number of lines to read and send to the channel at a time |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
Depends on the selector.type value |
|
interceptors |
– |
Space-separated list of interceptors |
interceptors.* |
下面是实际例子显示。
[html] view plain copy
#配置文件exec_case3.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/logs/test.log
a1.sources.r1.channels = c1
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel= c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
这里咱们用tail –F命令去一直都日志的尾部。
#敲命令
flume-ng agent -cconf -f conf/exec_case3.conf -n a1 -Dflume.root.logger=INFO,console
这边会显示读取日志的全部数据
上图是日志,这边咱们继续往日志里添加数据
echo"looklook5" >> test.log ,会发现终端也在输出数据。
官网说:JMS Sourcereads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested withActiveMQ.
简单说的,官网JMSsource 就测试了ActiveMQ,其余的尚未。下面是官网的例子:
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=jms
a1.sources.r1.channels=c1
a1.sources.r1.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory=GenericConnectionFactory
a1.sources.r1.providerURL=tcp://mqserver:61616
a1.sources.r1.destinationName=BUSINESS_DATA
a1.sources.r1.destinationType=QUEUE
下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be jms |
initialContextFactory |
– |
Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory |
– |
The JNDI name the connection factory shoulld appear as |
providerURL |
– |
The JMS provider URL |
destinationName |
– |
Destination name |
destinationType |
– |
Destination type (queue or topic) |
messageSelector |
– |
Message selector to use when creating the consumer |
userName |
– |
Username for the destination/provider |
passwordFile |
– |
File containing the password for the destination/provider |
batchSize |
100 |
Number of messages to consume in one batch |
converter.type |
DEFAULT |
Class to use to convert messages to flume events. See below. |
converter.* |
– |
Converter properties. |
converter.charset |
UTF-8 |
Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
介于这个源目前还不成熟,那咱们等他成熟了再来研究吧,这里偷点懒。
Spooling Directory Source在第二节的时候已经讲过,这里复述一下:监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不能够再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途做为对日志的准实时监控。
下面是官网给出的source的配置,加粗的参数是必选。可选项太多,这边就加一个fileSuffix,即文件读取后添加的后缀名,这个是能够更改。
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be spooldir. |
spoolDir |
– |
The directory from which to read files from. |
fileSuffix |
.COMPLETED |
Suffix to append to completely ingested files |
下面给出例子,这个与第二节的push.conf 类似
[html] view plain copy
#配置文件:spool_case4.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =spooldir
a1.sources.r1.spoolDir =/tmp/logs
a1.sources.r1.fileHeader= true
a1.sources.r1.channels =c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
这里咱们监控日志目录/tmp/logs
#敲命令
flume-ng agent -cconf -f conf/spool_case4.conf -n a1 -Dflume.root.logger=INFO,console
终端将数据都显示出来了。咱们查看监控日志目录/tmp/logs
被读取的文件已经被加上后缀名,表示已经完成读取。
Netcat source 在某一端口上进行侦听,它将每一行文字变成一个事件源,也就是数据是基于换行符分隔。它的工做就像命令nc -k -l [host] [port] 换句话说,它打开一个指定端口,侦听数据将每一行文字变成Flume事件,并经过链接通道发送。
下面是官网给出的source的配置,加粗的参数是必选
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be netcat |
bind |
– |
Host name or IP address to bind to |
port |
– |
Port # to bind to |
max-line-length |
512 |
Max line length per event body (in bytes) |
ack-every-event |
TRUE |
Respond with an “OK” for every event received |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
Depends on the selector.type value |
|
interceptors |
– |
Space-separated list of interceptors |
interceptors.* |
实际例子话,第二节的第一个例子就是Netcat source,这里不演示了。
一个简单的序列发生器,不断产成与事件计数器0和1的增量开始。主要用于测试(官网说),这里也不作赘述。
读取syslog数据,并生成Flume 事件。 这个Source分红三类SyslogTCP Source、
Multiport Syslog TCP Source(多端口)与SyslogUDP Source。其中TCP Source为每个用回车(\ n)来分隔的字符串建立一个新的事件。而UDP Source将整个消息做为一个单一的事件。
这个是最初的Syslog Sources
下面是官网给出的source的配置,加粗的参数是必选,这里可选我省略了。
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be syslogtcp |
host |
– |
Host name or IP address to bind to |
port |
– |
Port # to bind to |
官网案例
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=syslogtcp
a1.sources.r1.port=5140
a1.sources.r1.host=localhost
a1.sources.r1.channels=c1
下面是实际的例子
[html] view plain copy
#配置文件:syslog_case5.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =syslogtcp
a1.sources.r1.port =50000
a1.sources.r1.host =192.168.233.128
a1.sources.r1.channels =c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
这里咱们设置的侦听端口为192.168.233.128 50000
#敲命令
flume-ng agent -cconf -f conf/syslog_case5.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另外一个终端输入,往侦听端口送数据
echo "hellolooklook5" | nc 192.168.233.128 50000
而后看以前的终端,将会有以下显示:
数据已经发送过来了。
这是一个更新,更快,支持多端口版本的SyslogTCP Source。他不只仅监控一个端口,还能够监控多个端口。官网配置基本差很少,就是可选配置比较多
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be multiport_syslogtcp |
host |
– |
Host name or IP address to bind to. |
ports |
– |
Space-separated list (one or more) of ports to bind to. |
portHeader |
– |
If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
这里说明下须要注意的是这里ports设置已经取代tcp 的port,这个千万注意。还有portHeader这个能够与后面的interceptors 与 channel selectors自定义逻辑路由使用。
下面是官网例子:
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=multiport_syslogtcp
a1.sources.r1.channels=c1
a1.sources.r1.host=0.0.0.0
a1.sources.r1.ports=10001 10002 10003
a1.sources.r1.portHeader=port
下面是实际例子
[html] view plain copy
#配置文件:syslog_case6.conf
# Name thecomponents on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.ports = 50000 60000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe thesink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
# Use a channelwhich buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
这里咱们侦探192.168.233.128的2个端口50000与60000
#敲命令
flume-ng agent -cconf -f conf/syslog_case6.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另外一个终端输入,往侦听端口送数据
echo "hellolooklook5" | nc 192.168.233.128 50000
echo "hello looklook6"| nc 192.168.233.128 60000
而后看以前的终端,将会有以下显示:
2个端口的数据已经发送过来了。
关于这个官网都懒的介绍了,其实就是与TCP不一样的协议而已。
官网配置与TCP一致,就不说了。下面是官网例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=syslogudp
a1.sources.r1.port=5140
a1.sources.r1.host=localhost
a1.sources.r1.channels=c1
下面是实际例子
[html] view plain copy
#配置文件:syslog_case7.conf
# Name thecomponents on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe thesink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
# Use a channelwhich buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/syslog_case7.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另外一个终端输入,往侦听端口送数据
echo "hellolooklook5" | nc –u 192.168.233.128 50000
#在启动的终端查看console输出
Ok,数据已经发送过来了
8、HTTP Source
HTTP Source是HTTP POST和GET来发送事件数据的,官网说GET应只用于实验。Flume 事件使用一个可插拔的“handler”程序来实现转换,它必须实现的HTTPSourceHandler接口。此处理程序须要一个HttpServletRequest和返回一个flume 事件列表。
全部在一个POST请求发送的事件被认为是在一个事务里,一个批量插入flume 通道的行为。
下面是官网给出的source的配置,加粗的参数是必选
Property Name |
Default |
Description |
type |
The component type name, needs to be http |
|
port |
– |
The port the source should bind to. |
bind |
0.0.0.0 |
The hostname or IP address to listen on |
handler |
org.apache.flume.source.http.JSONHandler |
The FQCN of the handler class. |
官网例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=5140
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.example.rest.RestHandler
a1.sources.r1.handler.nickname=random props
下面是实际用例:
[html] view plain copy
#配置文件:http_case8.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= http
a1.sources.r1.port= 50000
a1.sources.r1.channels= c1
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/http_case8.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
#咱们用生成JSON 格式的POSTrequest发数据
curl -X POST -d '[{"headers" :{"looklook1" : "looklook1 isheader","looklook2" : "looklook2 isheader"},"body" : "hello looklook5"}]' http://192.168.233.128:50000
#在启动的终端查看console输出
这里headers与body都正常输出。
9、Twitter 1%firehose Source(实验的)
官网警告,慎用,说不定下个版本就木有了
这个实验source 是经过时搜索服务,从Twitter的1%样本信息中获取事件数据。须要Twitter开发者帐号。好吧,对于400网站,咱们迫不得已用不到,就很少解释了。
10、自定义Source
一个自定义 Source实际上是对Source接口的实现。当咱们开始flume代理的时候必须将自定义 Source和相依赖的jar包放到代理的classpath下面。自定义 Source的type就是咱们实现Source接口对应的类全路径。
这里后面的内容里会详细介绍,这里不作赘述。