Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各种数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各类数据接受方(好比文本、HDFS、Hbase等)的能力。在搭建环境和使用前,请你们自行了解一下Flume,主要是它的核心组件:Source、Channel、Sink,下面将说下常见的几种使用方式下环境的搭建。html
主环境:虚拟机slave01,slave02,slave03,基于以前已搭建好的环境,包括JDK、Zookeeper、Hadoop,详见以前的博客介绍。java
下载地址:http://flume.apache.org/download.htmlnode
1、基于netcat的source+channel(memory)+sink(logger)的数据传输过程git
(1)配置环境变量FLUME_HOMEapache
将解压的文件移目录下,如 /usr/local/,经过命令 vim /etc/profile 添加环境变量:vim
JAVA_HOME=/usr/java/jdk1.8.0_161 JRE_HOME=/usr/java/jdk1.8.0_161/jre SCALA_HOME=/usr/local/scala HADOOP_HOME=/usr/local/hadoop SPARK_HOME=/usr/local/spark ZOOKEEPER_HOME=/usr/local/zookeeper HBASE_HOME=/usr/local/hbase KAFKA_HOME=/usr/local/kafka HIVE_HOME=/usr/local/hive DERBY_HOME=/usr/local/derby FLUME_HOME=/usr/local/flume PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$ZOOKEEPER_HOME/bin:$HBASE_HOME/bin:$KAFKA_HOME/bin:$HIVE_HOME/bin:$DERBY_HOME/bin:$FLUME_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbyclient.jar:$DERBY_HOME/lib/derbytools.jar:$DERBY_HOME/lib/derbynet.jar export JAVA_HOME JRE_HOME SCALA_HOME HADOOP_HOME SPARK_HOME ZOOKEEPER_HOME HBASE_HOME KAFKA_HOME HIVE_HOME DERBY_HOME FLUME_HOME PATH CLASSPATH
运行命令 source /etc/profile 使变量生效。分布式
验证是否配置成功,cd到 flume/bin 目录下,执行命令:flume-ng versionoop
[hadoop@slave01 bin]$ flume-ng version 错误: 找不到或没法加载主类 org.apache.flume.tools.GetJavaProperty Flume 1.8.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f Compiled by denes on Fri Sep 15 14:58:00 CEST 2017 From source with checksum fbb44c8c8fb63a49be0a59e27316833d
验证成功,但出现了报错信息,大都说的是JDK版本(改成1.8如下)或HBase配置(注释掉hbase/conf/hbase-env.sh中的 export HBASE_CLASSPATH)问题,不过,不影响的话就不用在乎这个了。测试
(2)修改 conf/目录下配置文件 spa
复制一份 flume-env.sh.template ,重命名为 flume-env.sh
[hadoop@slave01 conf]$ cp flume-env.sh.template flume-env.sh
添加JAVA_HOME配置,内容以下:
export JAVA_HOME=/usr/java/jdk1.8.0_161
复制一份 flume-conf.properties.template ,重命名为 flume-conf.properties
[hadoop@slave01 conf]$ cp flume-conf.properties.template flume-conf.properties
修改成一个最简单基本的配置,以下:
a1.sources = so1 a1.channels = c1 a1.sinks = s1 # For each one of the sources, the type is defined a1.sources.so1.type = netcat a1.sources.so1.bind = slave01 a1.sources.so1.port = 8888 # The channel can be defined as follows. a1.sources.so1.channels = c1 # Each sink's type must be defined a1.sinks.s1.type = logger #Specify the channel the sink should use a1.sinks.s1.channel = c1 # Each channel's type is defined. a1.channels.c1.type = memory a1.channels.c1.capacity = 100 a1.channels.c1.transactionCapacity = 100
其中:
a1表示名为 a1 的agent,在下面启动命令中 --name a1用到,
a1.sinks.s1.type=logger,表示收集到的日志直接在flume的logger中打印出来,其余类型的你们能够自行尝试。
a1.sources.so1.type = netcat,表示组件Source使用netcat,
a1.sources.so1.bind = slave01,表示日志须要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听,
a1.sources.so1.port = 8888,表示日志须要发送到的端口号,该端口号要有netcat类型的source在监听,
其余的配置参数,你们自行查看。
(3)启动测试
在bin目录下运行,启动flume agent a1 服务端,其中flume-conf.properties为加载的配置文件,注意其所在目录:
flume-ng agent --conf conf --conf-file ../conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
命令参数你们自行百度查看,显示以下:
Info: Including Hadoop libraries found via (/usr/local/hadoop/bin/hadoop) for HDFS access Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access Info: Including Hive libraries found via (/usr/local/hive) for Hive access ..... ..... ..... (中间省略部分) ..... ..... ..... 18/06/22 16:20:20 INFO node.Application: Starting new configuration:{ sourceRunners:{so1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:so1,state:IDLE} }} sinkRunners:{s1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@7060583 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/06/22 16:20:20 INFO node.Application: Starting Channel c1 18/06/22 16:20:20 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms 18/06/22 16:20:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/06/22 16:20:21 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/06/22 16:20:21 INFO node.Application: Starting Sink s1 18/06/22 16:20:21 INFO node.Application: Starting Source so1 18/06/22 16:20:21 INFO source.NetcatSource: Source starting 18/06/22 16:20:21 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:8888]
到此,flume启动成功,下面是测试:
安装telnet,若已安装则跳过,检查是否安装命令:rpm -qa | grep telnet,若无返回值则表示未安装,安装命令:yum install telnet,安装后一样检查是否安装成功。能够参考以下连接:https://blog.csdn.net/typa01_kk/article/details/46604967
打开另外一个终端,输入以下命令,用于链接端口,并输入任意字符串,如 "hello":
[hadoop@slave01 conf]$ telnet slave01 8888 Trying 127.0.0.1... Connected to slave01. Escape character is '^]'. hello OK
能够在第一个终端看到输出以下:
18/06/22 16:20:36 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
到此,这种基于netcat的source+channel(memory)+logger数据传输的方式算是完成了,固然这只是单机Flume的数据传输,也能够多个节点进行数据传输,你们能够试试。
2、基于netcat的source+channel(file)+sink(hdfs)的数据传输过程
(1)环境变量同上保持不变
(2)修改conf/目录下配置文件
同上,只修改配置文件 flume-conf.properties,以下:
a.sources = r1 a.sinks = k1 a.channels = c1 # Describe/configure the source a.sources.r1.type = netcat a.sources.r1.bind = slave01 a.sources.r1.port = 8889 # Describe the sink a.sinks.k1.type = hdfs #指定hdfs地址中的输出目录 a.sinks.k1.hdfs.path = hdfs://slave01:9000/output a.sinks.k1.hdfs.writeFormat = Text a.sinks.k1.hdfs.fileType = DataStream a.sinks.k1.hdfs.rollInterval = 10 a.sinks.k1.hdfs.rollSize = 0 a.sinks.k1.hdfs.rollCount = 0 a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a.channels.c1.type = file a.channels.c1.checkpointDir = /usr/local/flume/checkpoint a.channels.c1.dataDirs = /usr/local/flume/data # Bind the source and sink to the channel a.sources.r1.channels = c1 a.sinks.k1.channel = c1
其中:
a.sinks.k1.type = hdfs,表示在HDFS中查看flume收集到的日志数据,两个文件夹checkpoint和data可手动建立;
hdfs地址中的输出目录是必须的,这是接下来启动Hadoop访问文件的目录。
(3)启动测试
在这以前先启动Hadoop集群,成功启动后在进程中会出现DataNode,必定要保证,否则后面会报错,可参考以前的关于Hadoop集群的博客:
运行命令启动flume agent a 服务端:
flume-ng agent --conf conf --conf-file ../conf/flume-conf.properties --name a -Dflume.root.logger=INFO,console
再经过telnet命令链接端口,输入字符串:
[hadoop@slave01 sbin]$ telnet slave01 8889 Trying 127.0.0.1... Connected to slave01. Escape character is '^]'. qwer1234 OK
在刚才启动的服务端界面能够看到输出为:
18/06/25 10:17:51 INFO hdfs.HDFSEventSink: Writer callback called. 18/06/25 10:17:51 INFO hdfs.BucketWriter: Closing hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596.tmp 18/06/25 10:17:51 INFO hdfs.BucketWriter: Renaming hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596.tmp to hdfs://slave01:9000/output/2018-06-25-10-17-41.1529893061596 18/06/25 10:17:51 INFO hdfs.HDFSEventSink: Writer callback called. 18/06/25 10:17:52 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 18/06/25 10:17:52 INFO hdfs.BucketWriter: Creating hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp 18/06/25 10:18:02 INFO hdfs.BucketWriter: Closing hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp 18/06/25 10:18:02 INFO hdfs.BucketWriter: Renaming hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686.tmp to hdfs://slave01:9000/output/2018-06-25-10-17-52.1529893072686 18/06/25 10:18:02 INFO hdfs.HDFSEventSink: Writer callback called. 18/06/25 10:18:08 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/checkpoint/checkpoint, elements to sync = 3 18/06/25 10:18:08 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1529893058855, queueSize: 0, queueHead: 0 18/06/25 10:18:08 INFO file.Log: Updated checkpoint for file: /usr/local/flume/data/log-8 position: 325 logWriteOrderID: 1529893058855
这时,访问slave01:50070,打开文件菜单并输入存放路径:
在hdfs的output目录中能够看到目录中多出了以时间戳命名的文件,文件中写入了你的测试数据(qwer1234),点击上面的文件名并下载保存到本地:
在目录中查看文件内容,能够看出文件中保存的正是你写入的数据:
[hadoop@slave01 下载]$ cat 2018-06-25-10-17-52.1529893072686 qwer1234 [hadoop@slave01 下载]$
以上是比较经常使用且常见的两种数据传输方式:控制台打印和HDFS,在日志收集中都是很方便,特别是HDFS,在基于Hadoop集群状态下,应用非常普遍。