说明:本文不只提供两种方案,还详细的记录了一些相关信息。html
本方案的核心是flume采集数据后,按照hive表的结构,将采集数据输送到对应的地址中,达到数据实时存储的目的,这种实时其实是一种准实时。java
假设hadoop集群已经正常启动,hive也已经正常启动,而且hive的文件地址是/hive/warehouse,而后hive里存在一张由如下建表语句建立的表apache
create table flume_test(uuid string);
可推断,表flume_test地址在/hive/warehouse/flume_test,下面介绍flume:bash
flume安装步骤app
#下载 cd /opt mkdir flume wget http://archive.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz tar xvzf apache-flume-1.6.0-bin.tar.gz cd apache-flume-1.6.0-bin/conf cp flume-env.sh.template flume-env.sh
打开flume-env文件,添加java变量dom
export JAVA_HOME=/usr/java/jdk1.8.0_111
而后添加环境变量,为了一次过,分别在profile和bashrc末尾添加eclipse
export FLUME_HOME=/opt/flume/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=$PATH:$FLUME_HOME/bin
而后maven
source /etc/profile
到此flume安装完毕,下面进行配置,切换到conf文件夹复制flume-conf.properties.template为agent.conf,而后编辑ide
#定义活跃列表 agent.sources=avroSrc agent.channels=memChannel agent.sinks=hdfsSink #定义source agent.sources.avroSrc.type=avro agent.sources.avroSrc.channels=memChannel agent.sources.avroSrc.bind=0.0.0.0 agent.sources.avroSrc.port=4353 agent.sources.avroSrc.interceptors=timestampinterceptor agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false #定义channel agent.channels.memChannel.type=memory agent.channels.memChannel.capacity = 1000 agent.channels.memChannel.transactionCapacity = 100 #定义sink agent.sinks.hdfsSink.type=hdfs agent.sinks.hdfsSink.channel=memChannel #agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/flume/test/%{topic}/%Y%m%d%H agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/hive/warehouse/flume_test agent.sinks.hdfsSink.hdfs.filePrefix=stu-flume agent.sinks.hdfsSink.hdfs.inUsePrefix=inuse-stu-flume agent.sinks.hdfsSink.hdfs.inUseSuffix=.temp agent.sinks.hdfsSink.hdfs.rollInterval=0 agent.sinks.hdfsSink.hdfs.rollSize=10240000 agent.sinks.hdfsSink.hdfs.rollCount=0 agent.sinks.hdfsSink.hdfs.idleTimeout=0 agent.sinks.hdfsSink.hdfs.batchSize=100 agent.sinks.hdfsSink.hdfs.minBlockReplicas=1 # agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink.hdfs.fileType = DataStream
具体的每一项配置可参照下面这篇博客http://lxw1234.com/archives/2015/10/527.htm,须要警戒的是rollInterval、rollSize、rollCount、idleTimeout这四个属性,若是进行了配置发现不起做用,就要检查一下minBlockReplicas这个属性是否配置,而且值是不是1,下面这个链接是缘由http://doc.okbase.net/chiweitree/archive/126197.htmloop
配置完毕后能够启动,启动命令
./flume-ng agent -f ../conf/agent.conf -n agent -c conf -Dflume.monitoring.type=http \-Dflume.monitoring.port=5653 -Dflume.root.logger=DEBUG,console
注意:-n 指的是agent的名称,须要对应到配置文件的第一个值,本启动命令还开启了监控,监控地址http://host:5653/metrics;-f 指的是配置文件的路径及名称。flume的conf修改后不用重启,默认30秒刷新一次,自动装载最新的配置。
flume安装并启动完毕后,编写测试程序。打开eclipse,建立maven项目
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>scc</groupId> <artifactId>stu-flume</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <name>stu-flume</name> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.9</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.6.0</version> </dependency> </dependencies> </project>
测试servlet
public class GenerLogServlet extends HttpServlet { private static final Logger LOGGER = Logger.getLogger(GenerLogServlet.class); private static final long serialVersionUID = 1L; @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { for (;;) { LOGGER.info(UUID.randomUUID().toString()); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { this.doGet(request, response); } }
log4j.properties
#log4j settings #log4j.rootLogger=debug, CONSOLE log4j.logger.scc.stu_flume.GenerLogServlet=debug,GenerLogServlet #log4j.rootLogger=INFO log4j.appender.GenerLogServlet=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.GenerLogServlet.Hostname=10.5.3.100 log4j.appender.GenerLogServlet.Port=4353 log4j.appender.GenerLogServletUnsafeMode=false
启动项目,访问http://localhost:8080/log开始生产数据。须要注意的是,若是flume配置基于时间戳作文件分组(此种状况能够匹配hive根据时间进行分区),那么须要agent.conf中的source必定要配置
agent.sources.avroSrc.interceptors=timestampinterceptor agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false
不然flume的sink会报找不到timestamp错误,由于源码org.apache.flume.clients.log4jappender.Log4jAvroHeaders中定义timestamp的key是flume.client.log4j.timestamp而不是timestamp,因此须要手动添加一个timestamp,若是对这个timestamp要求必须是数据生产的时间,能够修改源码或者为source添加拦截器手动配置。
flume具备很是灵活的使用方式,能够自定义source、sink、拦截器、channel选择器等等,适应绝大部分采集、数据缓冲等场景。
观察hadoop目录,发现flume已经按配置将数据移动到相应的hive表目录中,以下图:
打开hive客户端,数据查询命令,发现数据已可被查询!而且针对hive的分区表和桶表flume均可以实现按照hive表数据规则写入,进而达到数据实时插入,至此,方案一结束。
本方案缺点:
因为flume在写入文件的时候,独占正在写入的文件资源,致使hive不能读取正在被写入的文件的内容,也就是说假如每5分钟生成一个文件,那么正在写的文件不会被hive读取到内容,也就意味了hive存在最大5分钟的延迟。而若是把时间变小,那么延迟就会下降,可是哪怕是设置30分钟或1个小时,flume流量不大的状况下,也会生成许多零散的小文件,这点与hive的特长相悖,hive擅长处理大文件,对于零散小文件hive性能会下降不少。
对比方案一,测试程序、source不变,sink改为hbase-sink,数据实时插入到hbase中,而后在hive创建一张hbase映射表,hive从hbase中读取数据,这样可达到实时插入的效果。因为字数限制,方案二记录在以下博客链接中: