CentOS6安装各类大数据软件 第一章:各个软件版本介绍html
CentOS6安装各类大数据软件 第二章:Linux各个软件启动命令java
CentOS6安装各类大数据软件 第三章:Linux基础软件的安装node
CentOS6安装各类大数据软件 第四章:Hadoop分布式集群配置数据库
CentOS6安装各类大数据软件 第五章:Kafka集群的配置apache
CentOS6安装各类大数据软件 第六章:HBase分布式集群的配置数组
CentOS6安装各类大数据软件 第七章:Flume安装与配置bash
CentOS6安装各类大数据软件 第八章:Hive安装和配置服务器
CentOS6安装各类大数据软件 第九章:Hue大数据可视化工具安装和配置网络
CentOS6安装各类大数据软件 第十章:Spark集群安装和部署dom
此flume安装以用户点击行为实时安装为例(2台flume从日志系统中获取数据,并汇总到一台flume上,并由这台flume对数据进行分发,分别分发到Kafka和HBase等其余应用上),安装步骤以下所示
步骤一:上次压缩包 步骤二:解压到安装目录下 tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /export/servers/
#修改文件名称 mv flume-env.sh.template flume-env.sh #配置Java环境变量 export JAVA_HOME=/opt/modules/jdk1.8.0_144
#步骤一:修改文件名称 mv flume-conf.properties.template flume-conf.properties #步骤二:进行具体配置 #给三个线程起一个别名(数据来源,管道,下沉地) a2.sources = r1 a2.channels = c1 a2.sinks = k1 #设置Flume源(类型,数据来源的地址,数据经过什么通道传输) a2.sources.r1.type = exec a2.sources.r1.command = tail -F /export/datas/access/access.log a2.sources.r1.channels = c1 #设置Flume通道类型 a2.channels.c1.type = memory #设置Flume通道大小 a2.channels.c1.capacity = 10000 #每次从源抓取数据的大小 a2.channels.c1.transactionCapacity = 10000 #超时事件的设定 a2.channels.c1.keep-alive = 5 #设置flume的sink(sink的类型,通道来自哪里,往哪台服务器发送数据,下沉的端口号) a2.sinks.k1.type = avro a2.sinks.k1.channel = c1 #下沉的服务器的ip地址 a2.sinks.k1.hostname = node01.ouyang.com #下沉的服务器的端口号 a2.sinks.k1.port = 5555
同上agent2配置便可,为区别不一样服务器,将a2修改a3便可(不修改不影响实际使用,但为了区别不一样服务器,建议修改)
说明:agent2和agent3节点的数据下沉地为node01服务器的5555端口,因此agent1的数据来源只有一个,为本机的5555端口,agent1的flume只须要从本机的5555端口获取数据便可。但此flume的下沉地有2个,分别是HBase和Kafka,因此相应的此flume的通道也要有2个。
agent1具体配置请看下文的Flume整合HBase和Flume整合Kafka。
在实际工做中,Flume收集到的数据通常下层到HBase,Kafka,HDFS等应用,此处演示将Flume下层到HBase和Kafka的开发。以下图:
从上述能够看到咱们有两个Flume服务器用来收集四台WEB应用服务器的日志信息,这两台服务器将数据汇总到另一台总的Flume服务器上,也就是说另外两台的输出做为这台总的服务器的输入.这里的输入也就是avro.这台总的服务器能够将日志信息直接推送到 Kafka消息系统中,或者通过清洗以后,存入HBase数据库中.也就说咱们会在存入HBase数据库以前进行二次开发.由于咱们的Hbase数据库是非关系型数据库,它的列是不固定的.因此,咱们须要对hbase sink进行自定义开发。
官网:http://flume.apache.org/download.html
请注意:源码版本请和安装的flume版本匹配,以下图:
步骤一:解压源码压缩包
步骤二:打开IDEA,点击Open
步骤三:选择导入flume-ng-sinks这个模块
步骤四:选择flume-ng-sinks模块下的flume-ng-hbase-sink这个子模块
步骤五:进入源码
上述咱们首先将源码导入IDEA中,待会咱们会针对Flume和Hbase的整合进行sink的自定义。
#修改文件名称 mv flume-env.sh.template flume-env.sh #配置Java环境变量 export JAVA_HOME=/opt/modules/jdk1.8.0_144
#步骤一:修改文件名称 mv flume-conf.properties.template flume-conf.properties #步骤二:进行具体配置 #定义三个线程的别称(由于下沉地有2个,因此设置2个管道和2个下沉地的别称) a1.sources = r1 a1.channels = hbaseChannel kafkaChannel a1.sinks = hbaseSink kfkSink #设置源 #设置源的格式 a1.sources.r1.type = avro #设置源接收的数据须要前往哪些管道 a1.sources.r1.channels = hbaseChannel kafkaChannel #设置数据来源的ip地址 a1.sources.r1.bind = node01.ouyang.com #设置数据来源的端口号 a1.sources.r1.port = 5555 #超时设置 a1.sources.r1.threads = 5 #设置hbaseChannel(配置hbase的管道信息) a1.channels.hbaseChannel.type = memory a1.channels.hbaseChannel.capacity = 100000 a1.channels.hbaseChannel.transactionCapacity = 100000 a1.channels.hbaseChannel.keep-alive = 20 #设置hbaseSink(配置hbase的下沉地信息) a1.sinks.hbaseSink.type = asynchbase a1.sinks.hbaseSink.table = access a1.sinks.hbaseSink.columnFamily = info a1.sinks.hbaseSink.serializer = [待定] #根据数据定义列的名称 a1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl #设置sink来源的管道 a1.sinks.hbaseSink.channel = hbaseChannel
Flume官方提供的HbaseSink的实现是SimpleAsyncHbaseEventSerializer,这个实现不符合咱们本次项目中的要求,因此,咱们须要自定义HbaseSink.自定义仿照SimpleAsyncHbaseEventSerializer便可。
package org.apache.flume.sink.hbase; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest; import java.util.ArrayList; import java.util.List; @SuppressWarnings("all") public class HeimaAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { //表的名称 private byte[] table; //列簇名 private byte[] cf; //列的数据 private byte[] payload; //列的Column private byte[] payloadColumn; private byte[] incrementColumn; //rowKey的前缀 private String rowPrefix; private byte[] incrementRow; //告诉咱们的keyType是哪个 private KeyType keyType; @Override public void initialize(byte[] table, byte[] cf) { this.table = table; this.cf = cf; } @Override public List<PutRequest> getActions() { List<PutRequest> actions = new ArrayList<PutRequest>(); if (payloadColumn != null) { byte[] rowKey; try { //获取每一列 String[] columns = String.valueOf(payloadColumn).split(","); //获取每一列的值 String[] values = String.valueOf(payload).split(","); for (int i = 0; i < columns.length; i++) { //获取列的字节数组 byte[] colColumns = columns[i].getBytes(); //获取每一类的值的字节数组 byte[] colValues = values[i].getBytes(Charsets.UTF_8); //对列和值的长度进行判断,若是两者不一致,直接能够跳过,也就是不会进行数据落地 if(columns.length!=values.length){ continue; } //获取userid和datatime String datatime = values[0].toString(); String userid = values[1].toString(); rowKey = SimpleRowKeyGenerator.getHeimaRowKey(userid,datatime); PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumns, colValues); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } @Override public List<AtomicIncrementRequest> getIncrements() { List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>(); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub } @Override public void configure(Context context) { String pCol = context.getString("payloadColumn", "pCol"); String iCol = context.getString("incrementColumn", "iCol"); rowPrefix = context.getString("rowPrefix", "default"); String suffix = context.getString("suffix", "uuid"); if (pCol != null && !pCol.isEmpty()) { if (suffix.equals("timestamp")) { keyType = KeyType.TS; } else if (suffix.equals("random")) { keyType = KeyType.RANDOM; } else if (suffix.equals("nano")) { keyType = KeyType.TSNANO; } else { keyType = KeyType.UUID; } payloadColumn = pCol.getBytes(Charsets.UTF_8); } if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void setEvent(Event event) { this.payload = event.getBody(); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub } }
public static byte[] getHeimaRowKey(String userid,String datatime) throws UnsupportedEncodingException { return (userid +"-"+ datatime +"-"+ String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); }
将自定义的类放置到导入的flume-sink工程中,跟示例类SimpleAsyncHbaseEventSerializer同一个目录下,而后打包该工程便可。
步骤一:首先打开Project Structure
步骤二:点击Artifacts这一项,增长一个jar
步骤三:选择须要打包的模块
步骤四:对须要打包的模块进行配置
步骤五:确认配置
步骤六:进行打包,选择build这个菜单项
步骤七:获得打包的结果
步骤一:删除flume目录的lib目录下的flume-ng-hbase-sink-1.7.0.jar包
rm -rf flume-ng-hbase-sink-1.7.0.jar
步骤二:将打包好的jar包重命名为flume-ng-hbase-sink-1.7.0.jar,并上传到lib下
步骤三:修改flume-conf.properties,将自定义sink类的全限定名添加上去
a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.HeimaAsyncHbaseEventSerializer
flume-conf.properties
Flume和Kafka集成只要配置flume-conf.properties配置文件便可,该配置文件的数据来源能够参考上述flume和HBase的集合,下述为Flume和Kafka集成的管道和下沉地的配置。
#设置kafkaChannel(配置Kafka的管道信息) a1.channels.kafkaChannel.type = memory a1.channels.kafkaChannel.capacity = 100000 a1.channels.kafkaChannel.transactionCapacity = 100000 a1.channels.kafkaChannel.keep-alive = 20 #设置kfkSink(下沉地设置) #设置下沉的数据来源(来自kafkaChannel管道) a1.sinks.kfkSink.channel = kafkaChannel #设置下沉的类型 a1.sinks.kfkSink.type = org.apache.flume.sink.kafka.KafkaSink #设置下沉到kafka中的主题名 a1.sinks.kfkSink.topic = access #设置kafka服务的ip和端口号 a1.sinks.kfkSink.brokerList = node01.ouyang.com:9092,node02.ouyang.com:9092,node03.ouyang.com:9092 #设置zookeeper的ip和端口号(由于要使用kafka需基于zookeeper) a1.sinks.kfkSink.zookeeperConnect = node01.ouyang.com:2181,node02.ouyang.com:2181,node03.ouyang.com:2181 #设置kafka的生产者消息防丢失机制(默认为1) a1.sinks.kfkSink.requiredAcks = 1 #设置一次传输数据的大小 a1.sinks.kfkSink.batchSize = 1 #设置序列化类,让数据能够进行网络传输 a1.sinks.kfkSink.serializer.class = kafka.serializer.StringEncoder
Flume启动前请先启动Flume所依赖的应用服务,如上述配置,需先启动HBase和Kafka,而HBase和Kafka又依赖Hadoop和Zookeeper,因此请先将这些依赖服务启动;Flume启动时请先启动分节点,再启动聚会节点,若是上述配置,请先启动agent2和agent3,再启动agent1。
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf 指定 flume 自身的配置文件所在目录
-f conf/netcat-logger.con 指定咱们所描述的采集方案
-n a1 指定咱们这个 agent 的名字
以agent2和agent3收集数据,agent1汇总数据,并将数据分别分发到HBase和Kafka中为例:
#步骤一:给每台服务器的flume配置好环境变量 export FLUME_HOME=/export/servers/flume export PATH=${FLUME_HOME}/bin:$PATH #步骤二:在agent2和agent3服务器的flume的bin目录下编写启动脚本: #/bin/bash echo "................flume-2 starting......................" /export/servers/flume/bin/flume-ng agent --conf /export/servers/flume/conf/ -f /export/servers/flume/conf/flume-conf.properties -n a2 -Dflume.root.logger=INFO,console agent2和agent3启动脚本基本一致,就名字不一样,该名字为在配置文件中配置的别名 编写完启动脚本后能够进行启动测试 #步骤三:在agent1服务器的flume的bin目录下编写启动脚本: #/bin/bash echo "................flume-1 starting......................" /export/servers/flume/bin/flume-ng agent --conf /export/servers/flume/conf/ -f /export/servers/flume/conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console 由于Flume节点1是聚合节点,因此,须要依赖其余不少服务,因此在这里咱们先不作任何测试验证,待会再进行统一的测试验证。 #步骤四:在onekye目录下编写一键启动脚本: cat /export/onekey/slave | while read line do { echo "Flume开始启动 --> "$line ssh $line "source /etc/profile;nohup sh ${FLUME_HOME}/bin/flume-access-start.sh >/dev/null 2>&1 &" }& wait done echo "★★★Flume启动完成★★★" #保存退出