其余更多java基础文章:
java基础学习(目录)html
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
支持在日志系统中定制各种数据发送方,用于收集数据;
同时,Flume提供对数据进行简单处理,并写到各类数据接受方(好比文本、HDFS、Hbase等)的能力。名词介绍:
Flume OG:Flume original generation,即Flume0.9x版本
Flume NG:Flume next generation,即Flume1.x版本
官网:flume.apache.orgFlume体系结构java
目前,flume-ng处理数据有两种方式:avro-client、agent
avro-client:一次性将数据传输到指定的avro服务的客户端
agent:一个持续传输数据的服务数据库
Agent主要的组件包括:Source、Channel、Sink
Source:完成对日志数据的手机,分红transtion和event打入到channel之中 Channel:主要提供一个队列的功能,对source提供的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库或是提交到远程服务器。
数据在组件传输的单位是Event。apache
source意为来源、源头。vim
从外界采集各类类型的数据,将数据传递给Channel。
好比:监控某个文件只要增长数据就当即采集新增的数据、监控某个目录一旦有新文件产生就采集新文件的内容、监控某个端口等等。缓存
Exec Source、Avro Source、NetCat Source、Spooling Directory Source等安全
详细查看:
flume.apache.org/FlumeUserGu… 或者自带的文档查看。bash
一个数据的存储池,中间通道。服务器
接受source传出的数据,向sink指定的目的地传输。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,能够自动重写,不会形成数据丢失,所以很可靠。网络
channel的类型不少好比:内存中、jdbc数据源中、文件形式存储等。
Memory Channel
File Channel
Spillable Memory Channel等
详细查看: flume.apache.org/FlumeUserGu…
主要做用:接受channel写入的数据以指定的形式表现出来(或存储或展现)。
sink的表现形式不少好比:打印到控制台、hdfs上、avro服务中、文件中等。
HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、File Roll Sink、HBaseSink、Kafka Sink等
详细查看:
flume.apache.org/FlumeUserGu…
HDFSSink须要有hdfs的配置文件和类库。通常采起多个sink汇聚到一台采集机器负责推送到hdfs。
event是Flume NG传输的数据的基本单位,也是事务的基本单位。
在文本文件,一般是一行记录就是一个event。
网络消息传输系统中,一条消息就是一个event。
event里有header、body
Event里面的header类型:Map<String, String>
咱们能够在source中自定义header的key:value,在某些channel和sink中使用header。
解压缩:[uplooking@uplooking01 ~]$ tar -zxvf soft/apache-flume-1.8.0-bin.tar.gz -C app/
重命名:[uplooking@uplooking01 ~]$ mv app/apache-flume-1.8.0-bin/ app/flume
添加到环境变量中
vim ~/.bash_profile
export FLUME_HOME=/home/uplooking/app/flume
export PATH=$PATH:$FLUME_HOME/bin
修改配置文件,conf目录下
cp flume-env.sh.template flume-env.sh
添加JAVA_HOME
export JAVA_HOME=/opt/jdk
复制代码
#####################################################################
## this's flume log purpose is listenning a socket port which product
## data of stream
## this agent is consists of source which is r1 , sinks which is k1,
## channel which is c1
##
## 这里面的a1 是flume一个实例agent的名字
######################################################################定义了当前agent的名字叫作a1
a1.sources = r1 ##定了该agent中的sources组件叫作r1
a1.sinks = k1 ##定了该agent中的sinks组件叫作k1
a1.channels = c1 ##定了该agent中的channels组件叫作c1
# 监听数据源的方式,这里采用监听网络端口
a1.sources.r1.type = netcat #source的类型为网络字节流
a1.sources.r1.bind = uplooking01 #source监听的网络的hostname
a1.sources.r1.port = 52019 #source监听的网络的port
# 采集的数据的下沉(落地)方式 经过日志
a1.sinks.k1.type = logger #sink的类型为logger日志方式,log4j的级别有INFO、Console、file。。。
# 描述channel的部分,使用内存作数据的临时存储
a1.channels.c1.type = memory #channel的类型使用内存进行数据缓存,这是最多见的一种channel
a1.channels.c1.capacity = 1000 #定义了channel对的容量
a1.channels.c1.transactionCapacity = 100 #定义channel的最大的事务容量
# 使用channel将source和sink链接起来# 须要将source和sink使用channel链接起来,组成一个相似流水管道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
复制代码
flume-ng agent -c conf -n a1 -f conf/flume-nc.conf -Dflume.root.logger=INFO,console
-c conf:使用配置文件的方式-n a1:指定agent的名称为a1-f:指定配置文件
由于数据落地是经过日志,因此后面须要指定日志的相关配置选项。
复制代码
yum isntall -y telent
yum install -y nc
复制代码
向端口发送数据:
# 使用telnet
[uplooking@uplooking01 ~]$ telnet uplooking01 52019
Trying 192.168.43.101...
Connected to uplooking01.
Escape character is '^]'.
wo ai ni
OK
sai bei de xue
OK
# 使用nc
[uplooking@uplooking01 ~]$ nc uplooking01 52019
heihei
OK
xpleaf
OK
复制代码
此时能够查看flume agent启动终端的输出:
2018-03-24 20:09:34,390 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.43.101:52019]
2018-03-24 20:10:13,022 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 20 61 69 20 6E 69 0D wo ai ni. }
2018-03-24 20:10:24,139 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 73 61 69 20 62 65 69 20 64 65 20 78 75 65 0D sai bei de xue. }
2018-03-24 20:13:26,190 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 69 68 65 69 heihei }
2018-03-24 20:13:26,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 78 70 6C 65 61 66 xpleaf }
2018-03-24 20:17:01,694 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
复制代码
配置文件以下:
####################################################################### 监听目录中的新增文件## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 这里面的a1 是flume一个实例agent的名字#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 监听数据源的方式,这里采用监听目录中的新增文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/uplooking/data/flume
a1.sources.r1.fileSuffix = .ok
# a1.sources.r1.deletePolicy = immediate
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true
# 采集的数据的下沉(落地)方式 经过日志
a1.sinks.k1.type = logger
# 描述channel的部分,使用内存作数据的临时存储
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 使用channel将source和sink链接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
复制代码
启动flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-dir.conf -Dflume.root.logger=INFO,console
复制代码
在监听目录下新增文件,内容以下:
hello you
hello he
hello me
复制代码
能够看到flume agent终端输出:
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 68 65 hello he }
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 6D 65 hello me }
2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /home/uplooking/data/flume/hello.txt to /home/uplooking/data/flume/hello.txt.ok
复制代码
能够看到提示说,原来的文本文件已经被重命名为.ok,查看数据目录中的文件:
[uplooking@uplooking01 flume]$ ls
hello.txt.ok
复制代码
tail -f与tail -F的说明:
在生产环境中,为了防止日志文件过大,一般会天天生成一个新的日志文件, 这是经过重命名原来的日志文件,而后touch一个原来的日志文件的方式来实现的。
http-xxx.log
http-xxx.log.2017-03-15
http-xxx.log.2017-03-16
-f不会监听分割以后的文件,而-F则会继续监听。
配置文件:
####################################################################### 监听文件中的新增数据## ## this agent is consists of source which is r1 , sinks which is k1,## channel which is c1## ## 这里面的a1 是flume一个实例agent的名字#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 监听数据源的方式,这里监听文件中的新增数据
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/uplooking/data/flume/http-flume.log
# 采集的数据的下沉(落地)方式 经过日志
a1.sinks.k1.type = logger
# 描述channel的部分,使用内存作数据的临时存储
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000000
a1.channels.c1.transactionCapacity = 1000000
# 使用channel将source和sink链接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
复制代码
启动flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-data.conf -Dflume.root.logger=INFO,console
复制代码
向监听文件中添加数据:
cat hello.txt.ok > http-flume.log
复制代码
查看flume agent终端的输出:
2018-03-25 01:28:39,359 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }
2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 65 hello he }
2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6D 65 hello me }
复制代码