前言:老刘不敢保证说的有多好,但绝对是很是良心地讲述自学大数据开发路上的一些经历和感悟,保证会讲述一些不一样于别人技术博客的细节。node
老刘如今想写点有本身特点的东西,讲讲自学大数据遇到的一些事情,保证讲一些别人技术博客里忽略的知识点。面试
不少自学编程的人都会有一个问题,特别是研二即将找工做的小伙伴,由于立刻就要找工做了,自学时间很少了,因此在自学的路上,经常会忽略不少细小但很重要的知识点,不少伙伴都是直接背一些机构的资料。编程
本身没有静下心来好好研究各个知识点,也没有考虑这些机构写的知识点的对错,彻底照搬资料上的知识点,没有造成本身的理解,这是很是危险的!缓存
从今天开始,老刘就来给你们讲讲自学大数据开发路上的那些容易被人忽略的细节,让你们对知识点造成本身的理解。服务器
在解释什么是flume这类知识点上,不少机构的资料或者网上的技术博客讲的都很差,不少培训机构的资料是这样形容的“Flume是一个高可用,高可靠,分布式的海量日志采集、聚合和传输的系统”。
通常都会以为这句话没啥问题,可是好好想一想,这句话是否是至关于这个场景,男女相亲,男方说我有车有房,可是没有说是什么车,什么房,自行车也是车,租的房也是房啊!因此在说有车有房的时候,必定要拿出确凿的证据。网络
因此呢,当面试的时候,直接说flume是一个高可用,高可靠,分布式的海量日志采集、聚合和传输的系统是很是没有说服力的,很是典型的照搬资料,没有本身的理解。架构
它是如何作到高可用,高可靠,分布式也须要讲一讲,这样才以为可靠!这就是老刘说的和别人不同的地方,真的良心分享!框架
老刘以为能够这样说在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心以外,还须要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架。分布式
其中,flume就是一个日志采集、聚合和传输系统的开源框架,它的高可用、高可靠、分布式这些特色,通常都是经过部署多个服务器,而后在每一个服务器上部署flume agent模式造成的,而且flume经过事务机制保证了数据传输的完整性和准确性,flume事务在后面讲。工具
flume的概念就讲这么多,这样说的目的主要是不想让你们照搬机构资料的内容,本身多想一想,要有本身的理解。
看到这个架构图,老刘直接先说说flume是怎么工做的?
外部数据源以特定格式向flume发送events事件,当source接收到events时,它将其存储到一个或多个channel,channel会一直保存events直到它被sink消费。sink的主要功能是从channel中读取events,并将其存入外部存储系统或转发到下一个source,成功后再从channe移除events。
接着讲讲各个组件agent、source、channe、sink。
agent
它是一个JVM进程,它以事件的形式将数据从源头送至目的。
source
它是一个采集组件,用来获取数据。
channel
它是一个传输通道组件,用来缓存数据,用于从将source的数据传递到sink。
sink它
是一个下沉组件,它将数据发送给最终的存储系统或者下一个agent。
flume事务是很是很是重要的,以前就说过经过flume事务,实现了传输数据的完整性和准确性。
先看看这张图:
flume它有两个事务,分别是put事务、take事务。
put事务的步骤分为两步:
doput,它先将此数据写入到临时缓冲区putlist里面;
docommit,它会去检查channel里面有没有空位置,若是有空位置就会传入数据;若是channel里面没有空位置,那就会把数据回滚到putlist里面。
take事务也分为两步:
dotake,它会将数据读取到临时缓冲区takelist,并把数据传到hdfs上;
docommit,它会去判断数据是否上传成功,若成功那么就会清除临时缓冲区takelist里的数据;若不成功,好比hdfs发生崩溃啥的,那就会回滚数据到channel里面。
经过讲述两个事务的步骤,是否是就知道了为何flume会保证传输数据的完整和准确。
老刘总结一下就是,数据在传输到下一个节点时,假设接收节点出现异常,好比网络异常之类的,那就会回滚这一批数据,所以就会致使数据重发。
那在同一个节点内,source写入数据到channel,数据在一个批次内出现异常,那就会不写入到channel中,已经接收到的部分数据会被直接抛弃,靠上一个节点重发数据。
经过这两个事务,flume就提升了数据传输的完整性、准确性。
这部分是flume最重要的,做为一个日志采集框架,flume的应用比它的概念还要重要,必定要知道flume要怎么用!老刘最开始压根就没看这部分,光看知识点了,如今才发现实战的重要性!
可是flume实战案例数不胜数,咱们难道要记住每个案例吗?
固然不是,这个flume案例咱们能够根据官网里的配置文件进行配置,以下图:
看左下角蓝色方框里的内容,就能够查询到相关配置文件。在这里老刘有句话说,若是想学习一个新的框架,我们的学习资料就是官网,经过官网学习,不只能提高技术,还能提升英语,且不美滋滋!
如今开始讲案例,第一个是采集文件到HDFS,需求就是监控一个文件若是有新增的内容就把数据采集到HDFS上。
根据官网资料,flume配置文件开发须要在flume安装目录下建立一个文件夹,后期存放flume开发的配置文件。
根据需求的描述,source的配置应该选择为exec;为了保证数据不丢失,channel的配置应该选择file;sink的配置应该选择为hdfs。
这样虽然已知足需求,可是咱们作数据开发,确定会存在很是多的小文件,必定要作相关的优化。例如,文件小,文件多怎么解决?文件目录多怎么解决?
因此咱们还要选择一些参数来控制参数和目录。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #配置source #指定source的类型为exec,经过unix命令来传输结果数据 a1.sources.r1.type = exec #监控这个文件,有新的数据产生就不断采集 a1.sources.r1.command = tail -F /opt/bigdata/flumeData/tail.log #指定source的数据流入到channel中 a1.sources.r1.channels = c1 #配置channel #选择file,就是保证数据不丢失,即便出现火灾或者洪灾 a1.channels.c1.type = file #设置检查点目录--该目录是记录下event在数据目录下的位置 a1.channels.c1.checkpointDir=/kkb/data/flume_checkpoint #数据存储所在的目录 a1.channels.c1.dataDirs=/kkb/data/flume_data #配置sink a1.sinks.k1.channel = c1 #指定sink类型为hdfs a1.sinks.k1.type = hdfs #指定数据收集到hdfs目录 a1.sinks.k1.hdfs.path = hdfs://node01:9000/tailFile/%Y-%m-%d/%H%M #指定生成文件名的前缀 a1.sinks.k1.hdfs.filePrefix = events- #是否启用时间上的”舍弃” -->控制目录 a1.sinks.k1.hdfs.round = true #时间上进行“舍弃”的值 # 如 12:10 -- 12:19 => 12:10 # 如 12:20 -- 12:29 => 12:20 a1.sinks.k1.hdfs.roundValue = 10 #时间上进行“舍弃”的单位 a1.sinks.k1.hdfs.roundUnit = minute # 控制文件个数 #60s或者50字节或者10条数据,谁先知足,就开始滚动生成新文件 a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 50 a1.sinks.k1.hdfs.rollCount = 10 #每一个批次写入的数据量 a1.sinks.k1.hdfs.batchSize = 100 #开始本地时间戳--开启后就可使用%Y-%m-%d去解析时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream
第二个是采集目录到HDFS,若是一个目录中不断产生新的文件,就须要把目录中的文件不断地进行数据传输到HDFS上。
采集目录的话,source的配置通常采用spooldir;channel的配置能够设置为file,也能够设置为别的,通常为memory;sink的配置仍是设置为hdfs。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source ##注意:不能往监控目中重复丢同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/bigdata/flumeData/files # 是否将文件的绝对路径添加到header a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://node01:9000/spooldir/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 50 a1.sinks.k1.hdfs.rollCount = 10 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream
最后再说一个两个agent串联,就是第一个agent负责监控某个目录中新增的文件进行数据收集,经过网络发送到第二个agent当中去,第二个agent负责接收第一个agent发送的数据,并将数据保存到hdfs上面去。
虽然是两个agent串联,但只要看了官网的配置文件以及通过以前的两个案例,这两个agent串联难度也是很通常的。
首先是agent的配置文件是这样的:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source ##注意:不能往监控目中重复丢同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/bigdata/flumeData/files a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.channel = c1 #AvroSink是用来经过网络来传输数据的,能够将event发送到RPC服务器(好比AvroSource) a1.sinks.k1.type = avro #node02 注意修改成本身的hostname a1.sinks.k1.hostname = node02 a1.sinks.k1.port = 4141
agent2的配置文件是这样的:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #配置source #经过AvroSource接受AvroSink的网络数据 a1.sources.r1.type = avro a1.sources.r1.channels = c1 #AvroSource服务的ip地址 a1.sources.r1.bind = node02 #AvroSource服务的端口 a1.sources.r1.port = 4141 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node01:9000/avro-hdfs/%Y-%m-%d/%H-%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 50 a1.sinks.k1.hdfs.rollCount = 10 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream
最后运行的时候,先启动node02上的flume,而后在启动node01上的flume。
老刘此次讲了flume的四个容易被忽略的细节,就是想提醒自学的小伙伴们要注意细节,绝对不能彻底照搬资料上说的内容,对每一个知识点必定要有本身的理解。
最后,若是以为有哪里写的很差或者有错误的地方,能够联系公众号:努力的老刘,进行交流。但愿可以对大数据开发感兴趣的同窗有帮助,但愿可以获得同窗们的指导。
若是以为写的不错,给老刘点个赞!