flume--为搬砖而生,日志传输的一把好手

(一)flume的产生

<font face="stcaiyun" size=5 color="red">为何会有flume</font>java

<font size=3 color='green'>随着互联网的发展,人们对网络日志产生的信息也愈来愈重视。不只如此,咱们的服务器,好比Nginx,天天都会产生大量的日志。咱们要将这些日志收集到指定的地方,好比hdfs平台,进行分析。可是大量的日志产生的位置比较分散,可能来自于Tomcat、Nginx、甚至是数据库等等,并且存储的目的地也不同,这就致使了数据采集的复杂性。然鹅最关键的问题是,若是在采集的过程当中,出现问题了该怎么办?好比咱们要把日志收集到hdfs平台上进行分析,咱们能够经过hdfs shell的方式,可是咱们没法对日志采集的情况进行监控。针对以上种种问题,flume就诞生了,所以看到这里小伙伴也能猜到flume是干什么的了。</font>python

<font face="stcaiyun" size=5 color="red">flume是什么</font>golang

关于flume是什么?官网给出了定义,咱们来看一下。由于flume是Apache的一个顶级项目,因此官网就是flume.apache.orgweb

<font size=3 color='pink'>flume是一个分布式、高可靠、高可用的服务,用于高效地收集、聚合、移动大量的数据集。flume具备一个简单、灵活并基于数据流的架构,它具有可调节的可靠机制和许多故障转移以及恢复机制,并具备很强的健壮性和容错性。经过使用一种简单可扩展的数据模型来提供应用程序的在线分析。</font>shell

<font color='blue' size=3>咱们从图中,能够看到一个长方形,里面围住的部分就是flume的一个agent。咱们经过flume移动数据就至关于在写一个配置文件。source:源。表示咱们要从哪里收集,好比这里来自于web服务器。sink:下沉。表示咱们要把数据沉到什么地方去,这里是hdfs。那么中间还有一个channel是干什么的呢?channel:管道,咱们不会把刚收集到的日志马上sink到hdfs上面,那样效率太慢,而是会先存到channel里面,这个channel就是消息队列,通常是kafka,当管道满了,而后再把管道里面的数据一次性sink的目的地,这里是hdfs。</font>数据库

<font face="STCAIYUN" color="red" size=5>所以通俗一点的说:</font>apache

<font size=3 color="purple">flume就是一个日志采集工具,或者理解为一个搬运工,将日志从一个地方搬到另外一个地方去。</font><font size=3 color="black">特征是:具有高可用、高可靠、分布式处理模式。里面定制了不少的source和sink,支持咱们从不一样的源端读取日志文件,而后sink到不一样的地方去。</font>数组

<font face="STCAIYUN" color="red" size=5>flume的特征</font>缓存

  • <font face="" color="pink" size=3>flume能够高效率地从多个服务器中收集日志并存储到hdfs/hbase中</font>安全

  • <font face="" color="pink" size=3>除了日志信息,flume也能够用来接入收集规模宏大的社交网络节点事件数据,好比Facebook、Twitter等等</font>

  • <font face="" color="pink" size=3>支持各类不一样数据源,并可以输送到不一样的地方</font>

  • <font face="" color="pink" size=3>支持多路径,多管道输入,多管道输出,上下文路由等等</font>

    什么意思呢?意思就是咱们sink的时候,也能够做为下一个agent的source,不必定要是hdfs等平台,而且多个sink还能够到同一个source里面。

    同理,一个source也能够到多个channel,而后sink到多个地方

  • <font face="" color="pink" size=3>能够水平扩展</font>

<font face="STCAIYUN" color="red" size=5>flume的应用场景</font>

flume其实还能够有其余的功能,好比数据的处理,可是在大数据实际应用中,flume基本上只用于日志收集工做,根据不一样的业务场景,编写不一样的配置文件。

(二)flume的client

<font face="STCAIYUN" color="red" size=5>flume的客户端</font>

什么是flume的客户端呢?实际上,部署flume的机器就能够称之为一个客户端。部署flume,是否是须要一些环境呢?

<font face="" color="green" size=3>根据官网的要求,至少要知足如下几点</font>

  • <font face="" color="pink" size=3>1.jdk1.8或更高,大数据的这些组件都和java脱不开关系</font>
  • <font face="" color="pink" size=3>2.内存。由于flume是基于流式的,所以内存也是必须的</font>
  • <font face="" color="pink" size=3>3.磁盘空间,咱们不会把数据一直保存到内存里面,最终要刷到磁盘上,所以磁盘空间也是必须的。</font>
  • <font face="" color="pink" size=3>4.目录权限,你要往一个目录里面写文件的话,总要有写或者读的权限吧</font>

感受二、三、4都是废话,这不是显然的吗?

(三)flume的source

咱们以前说过,一个agent就至关于一个配置文件,里面有三剑客,分别是source、channel、sink。经过编写配置文件,配置source、channel、sink,来将数据从一端传输到另外一端。
咱们来看看source。

<font face="" color="green" size=3>source就是咱们的源端,从不一样的数据源读取日志信息放到channel里,而后sink取出channel里面的日志信息到其余的地方,好比hdfs、hbase等等。</font>

<font face="" color="pink" size=3>所以flume的架构很好理解,就这三个组成部分。既然flume的source支持不一样的数据源,那么咱们就来看看,它都支持哪些不一样的数据源呢?</font>

<font face="STCAIYUN" color="red" size=5>source支持的数据源</font>

<font face="" color="blue" size=3>支持如下几种source</font>

  • Avro Source

    <font face="" color="pink" size=3>flume第一种数据来源,能够监听IP和端口,并不必定是本机的IP,能够监听其余机器的IP和port,用于获取数据。从外部的Avro client streams接收events,当与另外一个flume agent内置的avro sink配对时,能够建立分层搜集的拓扑结构。感受官网说的好阳春白雪,其实说白了,仍是下面这张图,一个agent的source能够是上一个agent的sink,之间经过avro去链接</font>

    <font face="" color="red" size=3>字体加粗的属性必须进行设置</font>

    由于source的类型是avro,所以type也要指定为avro

  • Thrift Source

    和avro source基本相似,可是avro能够支持ip过滤,可是thrift不支持。

    <font face="" color="red" size=3>由于source是thrift,因此type要改为thrift。是什么样的source,那么type就改为什么。</font>

  • Exec Source

    <font face="" color="red" size=3>比较经常使用的一种source,能够运行Unix命令,而后该命令会一直执行,读取文件,做为flume数据的来源,须要配置相应的channel、type、以及运行的命令。而且根据官网介绍,本方式不保证数据百分百会传送到指定的位置,若是出现问题,数据可能丢失。可使用Spooling Directory等其余手段,保证数据传输</font>

  • JMS Source

    <font face="" color="red" size=3>本方式会从JMS消息队列的目的地,好比一个队列或者一个topic中读取数据;须要在plugins.d即插件目录下放置相应的jar包;不过此来源通常用的较少,由于局限性比较强。</font>

  • Spooling Directory Source

    <font face="" color="red" size=3>Spooling Directory Source监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spoolDir指定的目录下的文件不能够再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途做为对日志的准实时监控。</font>

    <font face="" color="red" size=3>加粗的表示必须填写,其中spooldir,就是咱们指定的目录,固然type也要指定为spooldir</font>

  • 自定制source

    其实flume提供的source仍是不能彻底知足咱们的需求,所以咱们也能够自定义source。

<font face="stcaiyun" color="blue" size=5>固然还有其它source,不过不经常使用。这里可能有点抽象,先来清楚一下概念,了解一下flume的架构、各个组成部分的功能,后续会进行演示</font>

(四)flume的channel

仍是这张放了好几遍的图,source咱们已经介绍过了,下面介绍channel。
channel咱们其实咱们以前也说过了,是一个管道,相似于golang里面的channel。或者咱们把它理解为一个缓存也行,source收集过来的日志并不直接sink到hdfs上面,而是会放到缓存里面,等缓存满了再一次性sink到指定的地方。
为何要这么设计呢?介绍source收集日志的速度很快,可是sink的速度很慢,若是source直接对接sink的话,那么就会致使读写速度不一致了,这就跟双十一要引入消息队列同样,若是上游的订单系统直接对接下游的库存系统,那么上游爆发的流量很容易将下游冲垮,因此才须要引入消息队列进行削峰填谷,二者的道理是相似的。

关于缓存,有两种。一种是基于内存的缓存,另外一种是基于磁盘的缓存。
基于内存的缓存:速度快,可是不安全,并且容量有限
基于磁盘的缓存:可靠性高,容量大,可是读写速度会慢一些。

<font face="stcaiyun" size=5 color="red">flume官方为咱们提供了以下channel</font>

<font face="" size=3 color="green">基于内存的channel,咱们不多使用,由于数据容易丢失。File channle和kafka channel很经常使用</font>

<font face="" size=3 color="green">好比file  channel,那么这里的type要指定为file,由于这里的缓存使用文件来实现的。配置的话能够指定大小</font>

(五)flume的sink

<font face="" size=3 color="red">咱们以前说过,source是用于收集数据,channel是用于缓存数据,sink是用于将数据写到外部的存储设备。那咱们外部的存储设备不同,那么sink的类型就会不同,所以sink实际上也是可定制的,可配置的。对于不一样的外部地址,那么sink就有不一样的实现。</font>

<font face="" size=3 color="pink">flume也为咱们提供了不少sink,好比hdfs sink、hive sink等等。固然还有avro sink,这个在以前介绍过,是为了对接下一个agent的avro source</font>

<font face="" size=3 color="green">固然每个sink都有不少额外的配置,好比文件的前缀啊、回滚间隔、回滚策略啊等等</font>

(六)flume的interceptor

<font face="stcaiyun" size=5 color="blue">flume的拦截器,这是什么?</font>

flume能够在移动数据时修改/删除事件,这是在拦截器的帮助下完成的。全部的拦截器都是实现了org.apache.flume.interceptor.Interceptor接口的类,拦截器能够基于开发人员所选择的任意条件来修改、甚至删除事件。而且flume还支持链式的拦截器(补充:意思就是能够设置多个拦截器,而后顺序的调用。),这能够经过在配置中指定多个拦截器的类名组成的列表来实现。拦截器在源配置中经过一系列的空格进行分隔,拦截器被指定的顺序也是它们被调用的顺序,每个拦截器所返回的事件在列表中会链式的传递到下一个拦截器,而且能够修改或者删除事件。若是一个拦截器须要删除事件,那么它只须要在列表中将该返回的事件不进行返回便可。若是要删除全部事件,只须要简单地返回一个空列表便可。拦截器是一个命名组件,如下是如何经过配置建立拦截器的一个例子。

<font face="" size=3 color="blue">拦截器主要用于source端,咱们能够将获取的数据拦截下来,而后加上一个键值对,能够指定文件的名字、来源、时间戳等等,而后sink到不一样的地方去。好比咱们 收集了不少日志,好比登录日志、行为日志等等,那么咱们在拦截的时候,即可以指定哪些日志都是什么类型,而后输出的时候,不一样的日志输出到不一样的地方去</font>

(七)flume的selector

<font face="" size=3 color="purple">咱们的source在收集完数据以后要写到哪个channel里面呢?好比咱们搜集的日志有三种,一种是只包含字母,另外一种只包含数字,最后一种只包含符号(假设是这样蛤)。hdfs上面有三个文件,正好对应三种日志类型。如今咱们的问题是,不一样类型的日志怎么存到不一样的文件里面去呢?既然要存到不一样的文件里面,那么首先要存到不一样的channel里面,channel1只包含字母、channel2只包含数字、channel3只包含符号,而后sink1对接channel一、sink2对接channel二、sink3对接channel3,理论上状况是这样的。</font>

<font face="" size=3 color="blue">可是如今问题又来了,怎么存到不一样的channel里面呢?这个时候选择器就出现了</font>

<font face="stcaiyun" size=5 color="blue">官网提供了哪些selector呢</font>

  • Replicating Channel Selector (default)

    这也是默认的channel,type指定为replicating。意思是数据会复制到每个channel中。
  • Multiplexing Channel Selector

    复分模式的channel,不会把数据拷贝了,而是将数据总体分发到不一样的channel中,固然要指定type为multiplexing。能够看到无论是source、channel、sink仍是这里的selector,都有不一样的类型,而后在各自的type中指定便可。
  • Custom Channel Selector

    本身实现的channel,不一样的数据分发到不一样的channel中,至关于对Multiplexing Channel Selector进行了自定制,这样就能够按照咱们本身的逻辑来决定数据分发到哪个channel当中。

(八)flume的event

<font face="" color="purple" size=3>例如一些消息队列,发送的数据并非原生的数据,而是对数据要进行一个封装,咱们能够经过python往activemq里面发送数据,而后使用golang去接。可是python最终发到mq里面不是一个简单的数据,golang接收的也不是一个简单的数据,而是对数据进行了一个封装。同理flume也是同样,在flume中,有一个event,咱们好像在拦截器的时候提到过,那么它是什么呢?event在flume当中叫作<font face="stcaiyun" color="red" size=5>事件</font>,flume在传输日志的时候,每一条数据都会被封装成一个event。</font>

<font face="stcaiyun" color="green" size=5>为何要有event呢?</font>

首先咱们能够想一下,数据格式有不少种,source也不一样,channel不一样、sink也各不相同,那么我如何才能将其统一块儿来呢?这个时候就须要event了,将每一条记录都封装为你们都认识event不就好了吗?这就是为何要有event。固然event主要包含两个部分,一个是header,一个body。body是一个字节数组,里面记录了日志信息,header则是一个hashmap,咱们能够添加一些本身的信息,若是只有body没有header的话,那么咱们的信息就会打到body里面,这样就对咱们要传送的数据形成污染了,具备侵入性,因此要有header

(九)flume的agent

agent是flume的运行核心,是flume运行的最小单位。一个flume的agent就是包含了source、channel、sink的一个jvm进程,用于日志的收集和传输。

<font face="" size=3 color="red">首先咱们到目前为止介绍的都是概念,其实flume的概念也很好理解,主要就是各个组件的那些配置尚未介绍,后面会介绍,目前先了解flume的基础架构便可。所以以前也说过,用flume传输日志,就至关于在指定agent中各个组件的配置。一个agent能够包含多个source、多个channel、多个sink</font>

A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop).
一个flume事件被定义为一个数据流单元,具备字节负载和可选的字符属性集。一个flume agent就是一个jvm进程,承载着各个组件(source、channel、sink),事件能够经过这些组件从外部源流向下一个目的地。

<font face="" size=3 color="red">固然不要忘记,agent仍是能够串联的,一个agent的sink对接下一个agent的source</font>

(十)flume的安装

flume下载地址:http://archive.cloudera.com/cdh5/cdh/5
选择flume便可,为何不到官网下载,由于cdh版本的话能够避免不少问题,好比jar包的冲突。
所以大数据组件安装的话,推荐去这个网站下载,不建议使用社区版本的。

<font face="" color="green" size=3>我用的是阿里云的云服务器,全部大数据相关的软件都放在/opt目录下,所以咱们在/opt下面新建一个flume目录,而后把相应的安装包放进去解压</font>

解压完成以后,咱们来看看flume的结构

<font face="" size=3 color="red">咱们进入conf目录下,修改一下配置。若是咱们只修改一个配置的话,那么须要修改什么呢?不用想,确定是jdk的路径。对,咱们须要在flume-env.sh中指定JAVA_HOME,可是进入目录发现没有这个文件,可是有一个flume-env.sh.template,这是模板,咱们用这个模板生成一个便可</font>

<font face="" size=3 color="purple">而后将flume加到环境变量里面去</font>

export FLUME_HOME=/opt/flume/apache-flume-1.6.0-cdh5.7.5-bin
export PATH=$FLUME_HOME/bin:$PATH

<font face="" size=3 color="purple">而后source一下</font>

输入flume-ng version查看版本,和其余框架不同,flume的话须要加上-ng,至于缘由后面会说。

(十一)flume的两种版本

flume分为两个版本,一个1.x以前的版本,叫作flume og,另外一个是1.x版本,叫作flume ng
至于差异就不介绍了,总之如今用的话就是用flume 1.x版本的就好了。

(十二)flume的avro_source

<font face="" size=3 color="purple">任务:使用avro_source收集数据,打印到控制台</font>

终于到实战部分了,咱们下面就经过编写配置文件的方式来移动数据,以前也说了,使用flume就是编写配置文件。
至于channel,咱们就无所谓了,使用内存channel仍是磁盘channel都无所谓,咱们这里主要是介绍source。而sink则就是控制台

那么配置怎么写呢?

# 这里的a1就是咱们agent的名字
a1.sources = r1  #  命名agent的source为r1
a1.channels = c1  # 同理c1是channel的名字
a1.sinks = k1  # 同理k1是sink的名字

a1.sources.r1.type = avro  # 配置r1的的类型avro
a1.sources.r1.bind = localhost  # 绑定的端口为本地
a1.sources.r1.port = 4141  # 端口为4141

a1.channels.c1.type = memory  # 指定c1的类型为内存
a1.channels.c1.capacity = 1000  # 指定c1的容量为1000个event
a1.channels.c1.transcactionCapacity = 1000  # 指定c1的每次处理容量为1000个event

a1.sinks.k1.type = logger  # 表示不产生实体文件,就打印在控制台

# 将source和sink与channel绑定起来
a1.sources.r1.channels = c1  # source r1的channel指定为c1 
a1.sinks.k1.channel = c1  # sink k1的channel指定为c1。可是这里再也不是channels,而是channel,要注意

<font face="" color="blue" size=3>咱们随便在一个目录,新建一个文件,将上面的内容去掉注释而后写进去</font>

那么如何启动flume呢?
flume-ng agent -c $FLUME_HOME/conf -f 咱们写的配置文件的路径 -n a1 -Dflume.root.logger=INFO,console

参数解释:
-c $FLUME_HOME/conf:首先flume启动是有默认配置的,这个配置就在$FLUME_HOME/conf下面
-f 咱们写的配置文件的路径:这个很好理解,咱们本身写的配置能够经过-f来指定
-n a1:咱们在配置文件中指定的agent名字
-Dflume.root.logger=INFO,console:表示flume自身运行状态的日志,按需配置,详细信息控制台打印

咱们启动服务,因为咱们本身的conf就在当前目录下,因此直接flume-ng agent -c $FLUME_HOME/conf -f ./conf -n a1 -Dflume.root.logger=INFO,console启动便可

<font face="" size=3 color="red">服务已经启动了,监听本地的4141端口,那么咱们就往这个端口里面发送数据。</font>

发送数据,咱们能够发送一个文件,可使用flume自带的一个方法。
flume-ng avro-client -c $FLUME_HOME/conf -H localhost -p 4141 -F ~/.bashrc

avro-client:建立一个客户端
-c $FLUME_HOME/conf:指定配置,刚才说了
-H localhost:指定目的ip
-p 4141:指定目的端口
-F ~/.bashrc:指定发送的文件

<font face="" size=3 color="green">显示内容已经接受到了,会将文件打开,每一条都是一个event,包含header(咱们这里没有指定)和event(是一个字节数组)</font>

(十三)flume的exec_source

下面来看看flume的exec_source,flume之因此有这么多source、channel、sink,主要是为了解决不一样场景的业务,那么这个exec_source是用来干什么的呢?

<font face="" size=3 color="green">exec_source能够实时监控一个文本文件,若是文件有变化,能够打印出来</font>

下面就来写配置文件,在当前目录新建一个conf1,写上以下内容(不含注释)

a1.sources = r1  #  命名agent的source为r1
a1.channels = c1  # 同理c1是channel的名字
a1.sinks = k1  # 同理k1是sink的名字

a1.sources.r1.type = exec  # 配置r1的的类型exec
# a1.sources.r1.bind = localhost  # 绑定的端口为本地
# a1.sources.r1.port = 4141  # 端口为4141
# 因为是监控一个文件,因此不须要在绑定ip和端口了
a1.sources.r1.command = tail -F /root/mashiro.txt  # 指定命令,监控这个文件

a1.channels.c1.type = memory  # 指定c1的类型为内存
a1.channels.c1.capacity = 1000  # 指定c1的容量为1000个event
a1.channels.c1.transcactionCapacity = 1000  # 指定c1的每次处理容量为1000个event

a1.sinks.k1.type = logger  # 表示不产生实体文件,就打印在控制台

# 将source和sink与channel绑定起来
a1.sources.r1.channels = c1  # source r1的channel指定为c1 
a1.sinks.k1.channel = c1  # sink k1的channel指定为c1。可是这里再也不是channels,而是channel,要注意

启动flume,flume-ng agent -c $FLUME_HOME/conf -f ./conf1 -n a1 -Dflume.root.logger=INFO,console

<font face="stcaiyun" size=4 color="red">此时服务启动成功,可是尚未东西过来</font>

(十四)file channel

基于内存的channel,咱们在介绍source的时候,演示过了,下面来看看基于文件的channel。
a1.sources = r1  
a1.channels = c1 
a1.sinks = k1  

a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /root/mashiro.txt  

a1.channels.c1.type = file  # 指定类型为file
a1.channels.c1.checkpointDir = /root/checkPoint  # checkpoint将被存储的目录
a1.channels.c1.dataDirs = /root/data  # 数据的存储位置

a1.sinks.k1.type = logger  # 表示不产生实体文件,就打印在控制台

# 将source和sink与channel绑定起来
a1.sources.r1.channels = c1  # source r1的channel指定为c1 
a1.sinks.k1.channel = c1  # sink k1的channel指定为c1。可是这里再也不是channels,而是channel,要注意

先来看看/root目录的内容

<font face="" color="blue" size=3>而后执行命令,flume-ng agent -c $FLUME_HOME/conf -f ./conf2 -n a1 -Dflume.root.logger=INFO,console</font>

<font face="" color="blue" size=3>再来查看当前目录</font>

<font face="" size=3 color="pink">能够看到checkPoint和data两个目录,并且这是flume帮咱们建立的,咱们来往文件里面写点东西</font>

(十五)hdfs sink

下面介绍sink,这里介绍一下hdfs sink,由于收集过来的日志主要放到hdfs平台上面。
既然要把收集的数据放大hdfs上面,那么flume就必须能链接hdfs,须要把依赖的jar包,放到$FLUME_HOME/lib下面,须要哪些jar包呢?又从哪里找呢?咱们能够从$HADOOP_HOME/share/hadoop目录里面找。
commons-configuration-x.x.jar  
hadoop-auth-x.x.x.jar
hadoop-common-x.x.x.jar
hadoop-hdfs-x.x.x.jar
# jar前面的x表示版本号,在找的时候可使用模糊查询,好比我如今在$HADOOP_HOME/share/hadoop目录下,要找commons-configuration-x.x.jar  的话,即可以经过find ./ -name commons-configuration*便可。

若是你的flume版本是1.99的话,那么还须要如下两个jar包,不然的话就不须要
commons-io-x.x.jar
htrace-core-.x.x.x-incubating.jar

找到以后,cp到$FLUME_HOME/lib下面

<font face="" size=3 color="green">下面就是编写配置文件</font>

# 给source、channel、sink指定名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source的配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/test.log

# channel的配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink的配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000
# 这条配置是给文件起一个前缀
a1.sinks.k1.hdfs.prefix = hive-  

# 绑定channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
相关文章
相关标签/搜索