[ETL] Flume 理论与demo(Taildir Source & Hdfs Sink)

1、Flume简介

1. Flume概述

  Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各种数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各类数据接受方(可定制)的能力。html

2. Flume系统功能

  • 日志收集

  Flume最先是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各种数据发送方,用于收集数据。node

  • 数据处理

  Flume提供对数据进行简单处理,并写到各类数据接受方(可定制)的能力, Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP2种模式),exec(命令执行)等数据源上收集数据的能力。apache

3. Flume的工做方式

  Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper自己可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper能够通知Flume Master节点。Flume Master间使用gossip协议同步数据。json

  Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另外一个主要的不一样点是读入数据和写出数据如今由不一样的工做线程处理(称为 Runner)。 在 Flume-og 中,读入线程一样作写出工做(除了故障重试)。若是写出慢的话(不是彻底失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程能够顺畅的工做而无需关注下游的任何问题。服务器

4. Flume的安装

  若是使用Apache-Flume的话只要上传安装包到服务器,而后解压,再配置一下环境变量便可。本文使用CDH-5.12.2安装Flume。异步

2、Flume工做原理及组件详解

1. Flume工做原理

 

  Flume的主要工做就是启动一个Agent,一个Agent由三个部分组成,Source、Sink和Channel。Source表示数据从哪里来,Sink表示数据收集到哪里去,Channel是一个缓冲区。更详细介绍能够参看官方文档tcp

2. Source组件

  对于Source和Sink则根据不一样的需求有不少种写法。例如Source能够直接从一个文件/目录中去取数据,也能够别人直接给你传数据。因此,总的来讲Source只有两类,一类是主动的Source、一类是被动的Source。分布式

  主动的Source就是将Source配置成主动的到别人那里去拿,主动的Source与被动的Source不一样,它不是一个服务。主动的source有Exec Source、Spooling Directory Source、Taildir Source、Kafka Source等。ide

  被动的Source就是别人给发过来,前提是Flume机器的source必定是一个服务,能够是Http协议、tcp协议、Rcp协议的服务,规定是哪一种协议的服务,B机器就按那种协议去发。总之,被动的Source就是一种服务,至于使用什么协议就看实际需求。被动的source有Avro Source、Thrift Source、JMS Source、NetCat Source等。 工具

3. Sink组件

  通常来讲,Sink没有主动和被动之分。若是要将收集到的数据放到HDFS上的话,那么Sink就是Hdfs的客户端;若是放到Kafka中,那么Sink就是Kafka的客户端。总之,Sink通常都是作客户端的。

4. Channel组件

  对于Channel。只有两种状况,一种缓冲在内存中,一种缓冲在磁盘中。 

3、Demo(Taildir Source & Hdfs Sink)

1. Taildir Source

  相比于Spooldir Source,Taildir Source作了一些优化。Spooldir Source读取目录时,文件在很短的时间内不能修改,不然会报错,致使Flume终止。而咱们常常须要上传较大文件,当文件达到几MB或者十几MB,Flume就会报错。固然,能够对Flume的源代码进行修改,来解决这个问题(可参见Flume Spooldir 源的一些问题)。Flume 1.7以后增长了Taildir Source,这个Source也能够解决这个问题。

  其中,channels,type,filegroups,filegroups.<filegroupName>是必配属性。type=TAILDIR;filegroups是给若干个目录取别名,例如 g1 g2;filegroups.<filegroupName>是设置对应目录(g1或g2)下的文件匹配规则。

2. HDFS Sink

  • 属性的配置

  配置基本属性。其中,channel、type、hdfs.path是必配属性。type=hdfs;hdfs.path=hdfs://namenode:rpc端口/path。

  设置写文件的方式。hdfs.rollInterval,间隔时间,每间隔多少秒写一个文件;hdfs.rollSize,写入一个文件的最大大小;hdfs.rollCount,设置往一个文件中写数据的最大次数。HDFS Sink写数据到HDFS中,有三种不一样的方式来写文件。第一种是按设置的hdfs.rollInterval间隔时间来写,达到这么长时间就写一个文件;第二种是根据文件的大小来生成文件,当文件达到hdfs.rollSize设置的大小以后从新写一个文件;第三种是按照设置的hdfs.rollCount每次写的次数,当达到这个极限时关闭流,生成一个文件。若是三个都配了,不管哪一个属性先知足设置,就会关闭流,生成一个文件。若是不考虑某种写文件的方式,就将其属性值设置为0。另外,hdfs.idleTimeout,设置超时时间,能够设置超过多少秒都没有数据过来,不管是否知足写文件三个方式的设置,都会关闭流。

  配置生成的文件名属性。hdfs.filePrefix,生成的文件的前缀。hdfs.fileSuffix,生成文件的后缀。hdfs.inUsePrefix,是否使用用户的前缀。

  配置动态目录属性。须要用到的属性为hdfs.rund,hdfs.roundValue,hdfs.roundUnit,hdfs.useLocalTimeStamp等。例如,本文的flume.conf最后四行配置,能够在hdfs上动态生成目录,每隔10分钟生成一个。

  • 变量的使用

  以下图所示,可使用这里的变量来设置一些动态的sink目录,例如按照不一样的时间(日期)来做为不一样的sink目录。

 

3. 配置flume.conf文件

a1.sources=s
a1.channels=c
a1.sinks=k

a1.sources.s.type=TAILDIR
a1.sources.s.filegroups=g1
a1.sources.s.filegroups.g1=/data/.*csv.*
a1.sources.s.positionFile=/tmp/myflume/taildir_position.json
a1.sources.s.channels=c
a1.sources.s.fileHeader=true

a1.channels.c.type=memory
a1.channels.c.capacity=100
a1.channels.c.transactionCapacity=100
a1.channels.c.keep-alive=10
a1.channels.c.byteCapacity=0

a1.sinks.k.type=hdfs
a1.sinks.k.channel=c
a1.sinks.k.hdfs.path=hdfs://cdh01:8020/flume/%Y-%m-%d/%H%M
a1.sinks.k.hdfs.rollInterval=60
a1.sinks.k.hdfs.rollSize=20971520
a1.sinks.k.hdfs.rollCount=0
a1.sinks.k.hdfs.idleTimeout=60
a1.sinks.k.hdfs.fileType=DataStream
a1.sinks.k.hdfs.round=true
a1.sinks.k.hdfs.roundValue=10
a1.sinks.k.hdfs.roundUnit=minute
a1.sinks.k.hdfs.useLocalTimeStamp=true
相关文章
相关标签/搜索