第1章 Flume概述1.1 Flume定义1.2 Flume组成架构1.2.1 Agent1.2.2 Source1.2.3 Channel1.2.4 Sink1.2.5 Event1.3 Flume拓扑结构1.4 Flume Agent内部原理1.5 Hadoop三大发行版本第2章 Flume快速入门2.1 Flume安装地址2.2 安装部署第3章 Flume企业开发案例3.1 监控端口数据官方案例3.2 实时读取本地文件到HDFS案例3.3 实时读取目录文件到HDFS案例3.4 单数据源多出口案例(选择器)3.5 单数据源多出口案例(Sink组)3.6 多数据源汇总案例第4章 Flume监控之Ganglia4.1 Ganglia的安装与部署4.2 操做Flume测试监控第5章 Flume高级之自定义MySQLSource5.1 自定义Source说明5.2 自定义MySQLSource组成5.3 自定义MySQLSource步骤5.4 代码实现5.4.1 导入pom依赖5.4.2 添加配置信息5.4.3 SQLSourceHelper5.4.4 MySQLSource5.5 测试5.5.1 Jar包准备5.5.2 配置文件准备5.5.3 MySql表准备5.5.4测试并查看结果第6章 知识扩展6.1 常见正则表达式语法6.2 练习第7章 Flume企业真实面试题(重点)7.1 你是如何实现Flume数据传输的监控的?7.2 Flume的Source,Sink,Channel的做用?大家Source是什么类型?7.3 Flume的Channel Selectors7.4 Flume参数调优7.5 Flume的事务机制7.6 Flume采集数据会丢失吗?php
Flume(水槽) 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
。Flume基于流式架构
,灵活简单。
在2009年Flume被捐赠了apache软件基金会,为hadoop相关组件之一。尤为近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;
,同时flume内部的各类组件不断丰富,用户在开发的过程当中使用的便利性获得很大的改善,现已成为apache top项目之一。css
Flume组成架构以下图所示:html
下面咱们来详细介绍一下Flume架构中的组件。java
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的地,是Flume数据传输的基本单元
。
Agent主要有3个部分组成,Source、Channel、Sink。mysql
Source是负责接收数据到Flume Agent的组件
。Source组件能够处理各类类型、各类格式的日志数据,包括avro、thrift、exec(Linux命令)
、jms、spooling directory
、netcat、sequence generator、syslog
、http、legacy。linux
Channel是位于Source和Sink之间的缓冲区
。所以,Channel容许Source和Sink运做在不一样的速率上。Channel是线程安全的
,能够同时处理几个Source的写入操做和几个Sink的读取操做。
Flume自带两种Channel:Memory Channel 和 File Channel。
Memory Channel是内存中的队列
。Memory Channel 在不须要关心数据丢失的情景下适用
。若是须要关心数据丢失,那么Memory Channel就不该该使用,由于程序死亡、机器宕机或者重启都会致使数据丢失。
File Channel将全部事件写到磁盘
。所以在程序关闭或机器宕机的状况下不会丢失数据。web
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另外一个Flume Agent
。
Sink是彻底事务性的
。在从Channel批量删除数据以前,每一个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从本身的内部缓冲区删除事件。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。面试
传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。正则表达式
Flume的拓扑结构以下图所示:
Flume Agent链接sql
Hadoop(哈道普)三大发行版本:Apache、Cloudera、Hortonworks。
Apache 版本最原始(最基础)的版本,对于入门学习最好。
Cloudera 在大型互联网企业中用的较多。(简称:CDH版,收费)
Hortonworks 文档较好。
一、Apache Hadoop
官网地址:http://hadoop.apache.org/releases.html
下载地址:https://archive.apache.org/dist/hadoop/common/
二、Cloudera Hadoop
官网地址:https://www.cloudera.com/downloads/cdh/5-10-0.html
下载地址:http://archive-primary.cloudera.com/cdh5/cdh/5/
2009年Hadoop的创始人Doug Cutting也加盟Cloudera公司。
Cloudera产品主要为CDH,Cloudera Manager,Cloudera Support。三、Hortonworks Hadoop
官网地址:https://hortonworks.com/products/data-center/hdp/
下载地址:https://hortonworks.com/downloads/#data-platform
公司成立之初就吸纳了大约25名至30名专门研究Hadoop的雅虎工程师,上述工程师均在2005年开始协助雅虎开发Hadoop,贡献了Hadoop80%的代码。
1) Flume官网地址
http://flume.apache.org/
2)文档查看地址
http://flume.apache.org/FlumeUserGuide.html
3)下载地址
http://archive.apache.org/dist/flume/
1)将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下
2)解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下
[atguigu@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
3)修改apache-flume-1.7.0-bin的名称为flume
[atguigu@hadoop102 module]$ mv apache-flume-1.7.0-bin flume
4)将flume/conf下的flume-env.sh.template文件修改成flume-env.sh,并配置flume-env.sh文件
[atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[atguigu@hadoop102 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
1)案例需求:首先,Flume监控本机44444端口,而后经过telnet工具向本机44444端口发送消息,最后Flume将监听的数据实时显示在控制台。
2)需求分析:
[atguigu@hadoop102 software]$ sudo rpm -ivh xinetd-2.3.14-40.el6.x86_64.rpm
[atguigu@hadoop102 software]$ sudo rpm -ivh telnet-0.17-48.el6.x86_64.rpm
[atguigu@hadoop102 software]$ sudo rpm -ivh telnet-server-0.17-48.el6.x86_64.rpm
2.判断44444端口是否被占用
[atguigu@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444
功能描述:netstat命令是一个监控TCP/IP网络的很是有用的工具,它能够显示路由表、实际的网络链接以及每个网络接口设备的状态信息。
基本语法:netstat [选项]
选项参数:
-t或--tcp:显示TCP传输协议的连线情况;
-u或--udp:显示UDP传输协议的连线情况;
-n或--numeric:直接使用ip地址,而不经过域名服务器;
-l或--listening:显示监控中的服务器的Socket;
-p或--programs:显示正在使用Socket的程序识别码和程序名称;
3.建立Flume Agent配置文件flume-telnet-logger.conf
在flume目录下建立job文件夹并进入job文件夹。
[atguigu@hadoop102 flume]$ pwd
/opt/module/flume
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/
在job文件夹下建立Flume Agent配置文件flume-telnet-logger.conf
[atguigu@hadoop102 job]$ touch flume-telnet-logger.conf
在flume-telnet-logger.conf文件中添加以下内容:
[atguigu@hadoop102 job]$ vim flume-telnet-logger.conf
添加内容以下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置文件来源于官方手册:http://flume.apache.org/FlumeUserGuide.html
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf conf/ :表示配置文件存储在conf/目录
--name a1 :表示给agent起名为a1(要与配置文件一致)
--conf-file job/flume-telnet.conf :flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件
-Dflume.root.logger==INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error
5.使用telnet工具向本机的44444端口发送内容
[atguigu@hadoop102 ~]$ telnet localhost 44444
以下图所示:
1)案例需求:实时监控Hive日志,并上传到HDFS中。(实际开发中是tomcat中产生的日志:订单日志、点击流日志等)
2)需求分析:
将
commons-configuration-1.6.jar
hadoop-auth-2.7.2.jar
hadoop-common-2.7.2.jar
hadoop-hdfs-2.7.2.jar
commons-io-2.4.jar
htrace-core-3.1.0-incubating.jar
拷贝到/opt/module/flume/lib文件夹下。
2.建立flume-file-hdfs.conf文件
建立文件
[atguigu@hadoop102 job]$ touch flume-file-hdfs.conf
注:
要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。因为Hive日志在Linux系统中,因此读取文件的类型选择:exec即execute执行的意思。表示执行Linux命令来读取文件。
[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf
添加以下内容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位建立一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#从新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每一个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
配置文件解析:
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
4.开启Hadoop和Hive并操做Hive产生日志
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
5.在HDFS上查看文件。
1)案例需求:使用Flume监听整个目录的文件。
2)需求分析:
[atguigu@hadoop102 job]$ touch flume-dir-hdfs.conf
打开文件
[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf
添加以下内容:
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略全部以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位建立一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#从新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每一个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
配置文件解析:
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
说明:
在使用Spooling Directory Source时
1) 不要在监控目录中建立并持续修改文件
2) 上传完成的文件会以.COMPLETED结尾
3) 被监控文件夹每500毫秒扫描一次文件变更
[atguigu@hadoop102 flume]$ mkdir upload
向upload文件夹中添加文件
[atguigu@hadoop102 upload]$ touch atguigu.txt
[atguigu@hadoop102 upload]$ touch atguigu.tmp
[atguigu@hadoop102 upload]$ touch atguigu.log
查看数据
[atguigu@hadoop102 upload]$ pwd
/opt/module/flume/upload
[atguigu@hadoop102 upload]$ ll
总用量 0
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 00:09 atguigu.log.COMPLETED
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 00:09 atguigu.tmp
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 00:09 atguigu.txt.COMPLETED
单Source多Channel、Sink,以下图所示:
[atguigu@hadoop102 job]$ mkdir group1
[atguigu@hadoop102 job]$ cd group1/
在/opt/module/datas/目录下建立flume3文件夹
[atguigu@hadoop102 datas]$ mkdir flume3
1.建立flume-file-flume.conf
配置1个接收日志文件的source和2个channel、2个sink,分别输送给flume-flume-hdfs和flume-flume-dir。
建立配置文件并打开:
[atguigu@hadoop102 group1]$ touch flume-file-flume.conf
[atguigu@hadoop102 group1]$ vim flume-file-flume.conf
添加以下内容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给全部channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
注:
Avro是由Hadoop创始人Doug Cutting建立的一种跟语言无关的数据序列化和RPC框架。注:
RPC(Remote Procedure Call)—远程过程调用,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议。
2.建立flume-flume-hdfs.conf
配置上级Flume输出的Source,输出是到HDFS的Sink。
建立配置文件并打开
[atguigu@hadoop102 group1]$ touch flume-flume-hdfs.conf
[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf
添加以下内容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位建立一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#从新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每一个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.建立flume-flume-dir.conf
配置上级Flume输出的Source,输出是到本地目录的Sink。
建立配置文件并打开
[atguigu@hadoop102 group1]$ touch flume-flume-dir.conf
[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf
添加以下内容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
提示:
输出的本地目录必须是已经存在的目录,若是该目录不存在,并不会建立新的目录。
4.执行配置文件
分别开启对应配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
5.启动Hadoop和Hive
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
6.检查HDFS上数据
[atguigu@hadoop102 flume3]$ pwd
/opt/module/datas/flume3
[atguigu@hadoop102 flume3]$ ll
总用量 4
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 1594 3月 4 01:02 1551632490229-2
[atguigu@hadoop102 flume3]$ ll
总用量 4
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 3808 3月 4 01:02 1551632490229-2
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:02 1551632490229-3
[atguigu@hadoop102 flume3]$ ll
总用量 8
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 3808 3月 4 01:02 1551632490229-2
-rw-rw-r--. 1 atguigu atguigu 538 3月 4 01:02 1551632490229-3
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:03 1551632490229-4
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:03 1551632490229-5
单Source、Channel多Sink(负载均衡),以下图所示。
[atguigu@hadoop102 job]$ mkdir group2
[atguigu@hadoop102 job]$ cd group2/
1.建立flume-netcat-flume.conf
配置1个接收日志文件的source和1个channel、2个sink,分别输送给flume-flume-console1和flume-flume-console2。
建立配置文件并打开
[atguigu@hadoop102 group2]$ touch flume-netcat-flume.conf
[atguigu@hadoop102 group2]$ vim flume-netcat-flume.conf
添加以下内容:
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink组相关信息
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
注:
Avro是由Hadoop创始人Doug Cutting建立的一种语言无关的数据序列化和RPC框架。注:
RPC(Remote Procedure Call)—远程过程调用,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议。
2.建立flume-flume-console1.conf
配置上级Flume输出的Source,输出是到本地控制台。
建立配置文件并打开
[atguigu@hadoop102 group2]$ touch flume-flume-console1.conf
[atguigu@hadoop102 group2]$ vim flume-flume-console1.conf
添加以下内容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.建立flume-flume-console2.conf
配置上级Flume输出的Source,输出是到本地控制台。
建立配置文件并打开
[atguigu@hadoop102 group2]$ touch flume-flume-console2.conf
[atguigu@hadoop102 group2]$ vim flume-flume-console2.conf
添加以下内容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
4.执行配置文件
分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
5.使用telnet工具向本机的44444端口发送内容
$ telnet localhost 44444
6.查看Flume2及Flume3的控制台打印日志
多Source汇总数据到单Flume,以下图所示。
[atguigu@hadoop102 module]$ xsync flume
在hadoop10二、hadoop103以及hadoop104的/opt/module/flume/job目录下建立一个group3文件夹。
[atguigu@hadoop102 job]$ mkdir group3
[atguigu@hadoop103 job]$ mkdir group3
[atguigu@hadoop104 job]$ mkdir group3
1.建立flume1-logger-flume.conf
配置Source用于监控hive.log文件,配置Sink输出数据到下一级Flume。
在hadoop103上建立配置文件并打开
[atguigu@hadoop103 group3]$ touch flume1-logger-flume.conf
[atguigu@hadoop103 group3]$ vim flume1-logger-flume.conf
添加以下内容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.建立flume2-netcat-flume.conf
配置Source监控端口44444数据流,配置Sink数据到下一级Flume:
在hadoop102上建立配置文件并打开
[atguigu@hadoop102 group3]$ touch flume2-netcat-flume.conf
[atguigu@hadoop102 group3]$ vim flume2-netcat-flume.conf
添加以下内容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.建立flume3-flume-logger.conf
配置source用于接收flume1与flume2发送过来的数据流,最终合并后sink到控制台。
在hadoop104上建立配置文件并打开
[atguigu@hadoop104 group3]$ touch flume3-flume-logger.conf
[atguigu@hadoop104 group3]$ vim flume3-flume-logger.conf
添加以下内容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
4.执行配置文件
分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。
[atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf
[atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf
5.在hadoop103上向/opt/module目录下的group.log追加内容
[atguigu@hadoop103 module]$ echo 'hello' > group.log
6.在hadoop102上向44444端口发送数据
[atguigu@hadoop102 flume]$ telnet hadoop102 44444
7.在hadoop102上向44444端口发送数据
1) 安装httpd服务与php
[atguigu@hadoop102 flume]$ sudo yum -y install httpd php
2) 安装其余依赖
[atguigu@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool rrdtool-devel
[atguigu@hadoop102 flume]$ sudo yum -y install apr-devel
3) 安装ganglia
[atguigu@hadoop102 flume]$ sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmetad
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-web
[atguigu@hadoop102 flume]$ sudo yum install -y ganglia-gmond
4) 修改配置文件/etc/httpd/conf.d/ganglia.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf
修改成以下的配置:
# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
Order deny,allow
Deny from all
Allow from all
# Allow from 127.0.0.1
# Allow from ::1
# Allow from .example.com
</Location>
5) 修改配置文件/etc/ganglia/gmetad.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf
修改成:
data_source "hadoop102" 192.168.25.102
6) 修改配置文件/etc/ganglia/gmond.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf
修改成:
cluster {
name = "hadoop102"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
host = 192.168.25.102
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
# bind = 239.2.11.71
bind = 192.168.25.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
7) 修改配置文件/etc/selinux/config
[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config
修改成:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
# SELINUX=enforcing
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
尖叫提示:
selinux本次生效关闭必须重启,若是此时不想重启,能够临时生效之:
[atguigu@hadoop102 flume]$ sudo setenforce 0
5) 启动ganglia
[atguigu@hadoop102 flume]$ sudo service httpd start
[atguigu@hadoop102 flume]$ sudo service gmetad start
[atguigu@hadoop102 flume]$ sudo service gmond start
6) 打开网页浏览ganglia页面
http://192.168.25.102/ganglia尖叫提示:
若是完成以上操做依然出现权限不足错误,请修改/var/lib/ganglia目录的权限:
[atguigu@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia
1) 修改/opt/module/flume/conf目录下的flume-env.sh配置:
JAVA_OPTS="-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.25.102:8649
-Xms100m
-Xmx200m"
2) 启动Flume任务
[atguigu@hadoop102 flume]$ bin/flume-ng agent \
--conf conf/ \
--name a1 \
--conf-file job/flume-telnet-logger.conf \
-Dflume.root.logger==INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=192.168.25.102:8649
3) 发送数据观察ganglia监测图
[atguigu@hadoop102 flume]$ telnet localhost 44444
样式如图:
Source是负责接收数据到Flume Agent的组件
。Source组件能够处理各类类型、各类格式的日志数据,包括avro、thrift、exec
、jms、spooling directory
、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经不少,可是有时候并不能知足实际开发当中的需求,此时咱们就须要根据实际需求自定义某些Source。
如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其余存储框架,因此此时须要咱们本身实现MySQLSource。
官方也提供了自定义source的接口:
官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source
根据官方说明自定义MySqlSource须要继承AbstractSource类并实现Configurable和PollableSource接口。
实现相应方法:
getBackOffSleepIncrement() // 暂不用
getMaxBackOffSleepInterval() // 暂不用
configure(Context context) // 初始化context
process() // 获取数据(从MySql获取数据,业务处理比较复杂,因此咱们定义一个专门的类SQLSourceHelper来处理跟MySql的交互),封装成Event并写入Channel,这个方法被循环调用)
stop() // 关闭相关的资源
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
在ClassPath下添加jdbc.properties和log4j. properties
jdbc.properties:
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=123456
log4j. properties:
#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
1)属性说明:
package com.atguigu;
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
private int runQueryDelay, // 两次查询的时间间隔
startFrom, // 开始id
currentIndex, // 当前id
recordSixe = 0, // 每次查询返回结果的条数
maxRow; // 每次查询的最大条数
private String table, // 要操做的表
columnsToSelect, // 用户传入的查询的列
customQuery, // 用户传入的查询语句
query, // 构建的查询语句
defaultCharsetResultSet; // 编码集
// 上下文,用来获取配置文件
private Context context;
// 为定义的变量赋值(默认值),可在flume任务的配置文件中修改
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
// 加载静态资源
static {
Properties p = new Properties();
try {
p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.toString());
}
}
// 获取JDBC链接
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null)
throw new SQLException();
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// 构造方法
SQLSourceHelper(Context context) throws ParseException {
// 初始化上下文
this.context = context;
// 有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
// 无默认值参数:获取flume任务配置文件中的参数
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
// 校验相应的配置信息,若是没有默认值的参数也没赋值,抛出异常
checkMandatoryProperties();
// 获取当前的id
currentIndex = getStatusDBIndex(startFrom);
// 构建查询语句
query = buildQuery();
}
// 校验相应的配置信息(表,查询语句以及数据库链接的参数)
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}
// 构建sql语句
private String buildQuery() {
String sql = "";
// 获取当前id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
// 以id做为offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}
// 执行查询
List<List<Object>> executeQuery() {
try {
// 每次执行查询时都要从新生成sql,由于id不一样
customQuery = buildQuery();
// 存放结果的集合
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
// 存放一条数据的集合(多个列)
List<Object> row = new ArrayList<>();
// 将返回结果放入集合
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// 从新链接
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}
// 将结果集转化为字符串,每一条数据是一个list集合,将每个小的list集合转化为字符串
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty())
return allRows;
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}
// 更新offset元数据状态,每次返回结果集后调用。必须记录每次查询的offset值,为程序中断续跑数据时使用,以id为offset
void updateOffset2DB(int size) {
// 以source_tab作为KEY,若是不存在则插入,存在则更新(每一个源表对应一条记录)
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('" + this.table + "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}
// 执行sql语句
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
// 获取当前id的offset
private Integer getStatusDBIndex(int startFrom) {
// 从flume_meta表中查询出当前的id是多少
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
// 若是没有数据,则说明是第一次查询或者数据表中尚未存入数据,返回最初传入的值
return startFrom;
}
// 查询一条数据的执行语句(当前id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// 关闭相关资源
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
int getCurrentIndex() {
return currentIndex;
}
void setCurrentIndex(int newValue) {
currentIndex = newValue;
}
int getRunQueryDelay() {
return runQueryDelay;
}
String getQuery() {
return query;
}
String getConnectionURL() {
return connectionURL;
}
private boolean isCustomQuerySet() {
return (customQuery != null);
}
Context getContext() {
return context;
}
public String getConnectionUserName() {
return connectionUserName;
}
public String getConnectionPassword() {
return connectionPassword;
}
String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}
}
代码实现:
package com.atguigu;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class SQLSource extends AbstractSource implements Configurable, PollableSource {
// 打印日志
private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
// 定义sqlHelper
private SQLSourceHelper sqlSourceHelper;
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
@Override
public void configure(Context context) {
try {
// 初始化
sqlSourceHelper = new SQLSourceHelper(context);
} catch (ParseException e) {
e.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
try {
// 查询数据表
List<List<Object>> result = sqlSourceHelper.executeQuery();
// 存放event的集合
List<Event> events = new ArrayList<>();
// 存放event头集合
HashMap<String, String> header = new HashMap<>();
// 若是有返回数据,则将数据封装为event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
// 将event写入channel
this.getChannelProcessor().processEventBatch(events);
// 更新数据表中的offset信息
sqlSourceHelper.updateOffset2DB(result.size());
}
// 等待时长
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}
@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
// 关闭资源
sqlSourceHelper.close();
} finally {
super.stop();
}
}
}
1) 将MySql驱动包放入Flume的lib目录下
[atguigu@hadoop102 flume]$ cp \
/opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar \
/opt/module/flume/lib/
2) 打包项目并将Jar包放入Flume的lib目录下
1)建立配置文件并打开
[atguigu@hadoop102 job]$ touch mysql.conf
[atguigu@hadoop102 job]$ vim mysql.conf
2)添加以下内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.9.102:3306/mysqlsource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = 000000
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1) 建立MySqlSource数据库
CREATE DATABASE mysqlsource;
2) 在MySqlSource数据库下建立数据表Student和元数据表Flume_meta
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
3)向数据表中添加数据
1 zhangsan
2 lisi
3 wangwu
4 zhaoliu
1)任务执行
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 \
--conf-file job/mysql.conf -Dflume.root.logger=INFO,console
2)结果展现,以下图所示:
案例需求:
1)flume-1监控hive.log日志,flume-1的数据传送给flume-2,flume-2将数据追加到本地文件,同时将数据传输到flume-3。
2)flume-4监控本地另外一个本身建立的文件any.txt,并将数据传送给flume-3。
3)flume-3将汇总数据写入到HDFS。
请先画出结构图,再开始编写任务脚本。
使用第三方框架Ganglia实时监控Flume。
一、做用
(1)Source组件是专门用来收集数据
的,能够处理各类类型、各类格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
(2)Channel组件对采集到的数据进行缓存
,能够存放在Memory或File中。
(3)Sink组件是用于把数据发送到目的地的组件
,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。
二、我公司采用的Source类型为:
(1)监控后台日志:exec
(2)监控后台产生日志的端口:netcat
Exec spooldir
1. Source
增长 Source 个数(使用 Tair Dir Source 时可增长 FileGroups 个数)能够增大 Source 的读取数据的能力。例如:当某一个目录产生的文件过多时须要将这个文件目录拆分红多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。
batchSize 参数决定 Source 一次批量运输到 Channel 的 Event 条数,适当调大这个参数能够提升 Source 搬运 Event 到 Channel 时的性能。
2. Channel
type 选择 memory 时 Channel 的性能
最好,可是若是 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性
更好,可是性能上会比 memory Channel 差。
使用 file Channel 时 dataDirs 配置多个不一样盘下的目录能够提升性能。
Capacity 参数决定 Channel 可容纳最大的 Event 条数。transactionCapacity 参数决定每次 Source 往 Channel 里面写的最大 Event 条数和每次 Sink 从 Channel 里面读的最大 Event 条数。transactionCapacity 须要大于 Source 和 Sink 的 batchSize 参数。
3. Sink
增长 Sink 的个数能够增长 Sink 消费 Event 的能力。Sink 也不是越多越好够用就行,过多的 Sink 会占用系统资源,形成系统资源没必要要的浪费。
batchSize 参数决定 Sink 一次批量从 Channel 读取的 Event 条数,适当调大这个参数能够提升 Sink 从 Channel 搬出 Event 的性能。
Flume的事务机制(相似数据库的事务机制):Flume 使用两个独立的事务分别负责从 Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。好比 spooling directory source 为文件的每一行建立一个事件,一旦事务中全部的事件所有传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以相似的方式处理从 Channel 到 Sink 的传递过程,若是由于某种缘由使得事件没法记录,那么事务将会回滚。且全部的事件都会保持到 Channel 中,等待从新传递。
以下图所示:
不会,Channel存储能够存储在File中,数据传输自身有事务。 可是若是使用内存存储的话,掉电可能会丢失数据。