大数据学习笔记(五)

1、Flume

1.1 概述

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume支持从各类数据源中(如文件、文件夹、Socket数据包、Kafka等)收集数据;同时,Flume提供对数据进行简单处理,并把处理后的数据写出到HDFS、hbase、hive、kafka等众多外部存储系统。node

1.2 运行原理

Flume里面的几个比较重要的概念:nginx

  • Agent:它是Flume的核心角色,Flume采集系统是由一个个的Agent链接起来。一个Agent里面包含了Source、Sink和Channel组件;
  • Source:采集组件,负责从数据源中获取采集数据;
  • Sink:下沉组件,负责向下一级Agent传递数据,或者存储到存储设备(如HDFS);
  • Channel:通道组件,负责从Source将数据传递到Sink;

Source、Sink、Channel组件的关系以下图所示:
在这里插入图片描述web

1.3 Flume的安装

提示:安装Flume以前,先要准备好Hadoop环境。正则表达式

目前Flume最新版本为1.9.0。下载地址:http://archive.apache.org/dist/flume/1.9.0/。shell

下载完成后,将压缩包上传到服务器的/export/softwares目录下,而后解压缩到/export/servers中。apache

解压完成后,进入cd /export/servers/apache-flume-1.8.0-bin/conf目录下,编辑flume-env.sh文件,设置JAVA_HOME环境变量。缓存

cd /export/servers/apache-flume-1.8.0-bin/conf
cp flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=/export/servers/jdk1.8.0_141

1.4 Flume应用

1.4.1 从终端设备采集数据

在这里插入图片描述

  • 需求分析:
    1)启动Flume,并绑定IP和端口;
    2)启动终端,使用telnet向Flume发送数据;
    3)Flume把采集到的数据输出到Console上;bash

  • 实现步骤:服务器

第一步:新建配置文件/export/servers/apache-flume-1.8.0-bin/conf/netcat-logger.conf,在配置文件中设置数据采集的方案;网络

# 定义agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
# 绑定数据源提供方的地址
a1.sources.r1.bind = 192.168.31.9
# 绑定数据源提供方的端口
a1.sources.r1.port = 44444

# 描述和配置sink组件:k1
a1.sinks.k1.type = logger

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 创建链接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面配置中的a1表明agent的名字,启动flume时候自行指定。

第二步:启动Flume;

cd /export/servers/apache-flume-1.8.0-bin/bin
flume-ng agent -c ../conf -f ../conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

-c:指定配置文件所在目录;
-f:指定配置文件的路径;
-n:指定agent的名字;

第三步:启动终端,使用telnet测试;

telnet 192.168.31.9 44444

1.3.2 采集文件数据

在这里插入图片描述

  • 需求分析:
    好比有一个业务使用使用的日志文件,该日志文件的内容会不断变化。如今咱们须要把日志文件的数据进行实时采集,而后存储到HDFS中。

  • 实现思路:
    1)Source:监控文件内容更新,其命令格式为exec ‘tail -F file’
    2)Sink:使用HDFS;
    3)Channel:能够是file或memory类型;

  • 实现步骤:

第一步:新建配置文件/export/servers/apache-flume-1.8.0-bin/conf/tail-file.conf,在配置文件中设置数据采集的方案;

# 设置agent中各组件的名字
a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

# 描述source组件
a1.sources.source1.type = exec
a1.sources.source1.command = tail -F /export/servers/taillogs/access_log

# 描述Sink组件
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H%M/
a1.sinks.sink1.hdfs.filePrefix = access_log
a1.sinks.sink1.hdfs.maxOpenFiles = 5000
a1.sinks.sink1.hdfs.batchSize= 100
a1.sinks.sink1.hdfs.fileType = DataStream
a1.sinks.sink1.hdfs.writeFormat =Text
a1.sinks.sink1.hdfs.round = true
a1.sinks.sink1.hdfs.roundValue = 10
a1.sinks.sink1.hdfs.roundUnit = minute
a1.sinks.sink1.hdfs.useLocalTimeStamp = true

# 描述channel组件
a1.channels.channel1.type = memory
a1.channels.channel1.keep-alive = 120
a1.channels.channel1.capacity = 500000
a1.channels.channel1.transactionCapacity = 600

# 创建链接
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

第二步:启动Flume;

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/bin
flume-ng agent -c ../conf -f ../conf/tail-file.conf -n a1 -Dflume.root.logger=INFO,console

第三步:编写脚本,不断向日志文件写入数据;

# 新建shells文件夹,用于存放脚本文件
mkdir -p /export/servers/shells/
cd /export/servers/shells/
vi tail-file.sh

#!/bin/bash
while true
do
  date >> /export/servers/taillogs/access_log;
  sleep 0.5;
done

第四步:启动脚本;

mkdir -p /export/servers/taillogs
sh /export/servers/shells/tail-file.sh

1.3.3 采集文件夹

  • 需求分析:
    例如采集应用服务器的日志目录。每当产生新的日志文件,就须要把日志文件采集到HDFS中。

  • 实现思路:
    1)Source:监控文件目录,其命令格式为spooldir’
    2)Sink:使用HDFS;
    3)Channel:能够是file或memory类型;

  • 实现步骤:

第一步:新建配置文件/export/servers/apache-flume-1.8.0-bin/conf/spooldir.conf,在配置文件中设置数据采集的方案;

# 设置Agent中各个组件的名字
a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

# 描述和配置Source组件
# 注意:监控目录不可以出现同名文件
a1.sources.source1.type = spooldir
a1.sources.source1.spoolDir = /export/servers/dirfile
a1.sources.source1.fileHeader = true

# 描述和配置Sink组件
a1.sinks.sink1.type = hdfs

a1.sinks.sink1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/
a1.sinks.sink1.hdfs.filePrefix = events-
a1.sinks.sink1.hdfs.round = true
a1.sinks.sink1.hdfs.roundValue = 10
a1.sinks.sink1.hdfs.roundUnit = minute
a1.sinks.sink1.hdfs.rollInterval = 3
a1.sinks.sink1.hdfs.rollSize = 20
a1.sinks.sink1.hdfs.rollCount = 5
a1.sinks.sink1.hdfs.batchSize = 1
a1.sinks.sink1.hdfs.useLocalTimeStamp = true

# 生成的文件类型,默认是Sequencefile,DataStream表明普通文本类型
a1.sinks.sink1.hdfs.fileType = DataStream

# 描述和配置通道
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# 创建链接
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

第二步:启动Flume;

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/bin
flume-ng agent -c ../conf -f ../conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console

启动完成后,能够往/export/servers/dirfile目录不断添加文件,而后在HDFS的/spooldir路径下看到采集到的日志文件。

1.3.4 Agent级联

在这里插入图片描述

  • 需求分析:
    1)第一个agent负责从指定文件收集数据,而后经过网络发送到下一个agent;
    2)第二个agent负责接收从第一个agent发送的数据,并将数据保存到HDFS中;

  • 实现步骤:

第一步:准备两台安装hadoop和flume环境的主机,他们分别是node01和node02;

第二步:在node01和node02分别设置使用avro协议传输数据;

cd /export/servers/ apache-flume-1.8.0-bin/conf
vi tail-avro-avro-logger.conf

node01的配置:

# 设置agent中各个组件的名字
a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

a1.sources.source1.type = exec
a1.sources.source1.command = tail -F /export/servers/taillogs/access_log
a1.sources.source1.channels = channel1

# 设置Sink的类型为avro
a1.sinks.sink1.type = avro
# 指定下沉到下一个agent的主机地址
a1.sinks.sink1.hostname = node02
a1.sinks.sink1.port = 4141
a1.sinks.sink1.batch-size = 10

# 配置channel
a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# 创建链接
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

node02的配置:

a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

# 设置source的类型为avro
a1.sources.source1.type = avro
# 指定从哪一个source获取数据
a1.sources.source1.bind = node01
a1.sources.source1.port = 4141

a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://node01:8020/av/%y-%m-%d/%H%M/
a1.sinks.sink1.hdfs.filePrefix = events-
a1.sinks.sink1.hdfs.round = true
a1.sinks.sink1.hdfs.roundValue = 10
a1.sinks.sink1.hdfs.roundUnit = minute
a1.sinks.sink1.hdfs.rollInterval = 3
a1.sinks.sink1.hdfs.rollSize = 20
a1.sinks.sink1.hdfs.rollCount = 5
a1.sinks.sink1.hdfs.batchSize = 1
a1.sinks.sink1.hdfs.useLocalTimeStamp = true

#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.sink1.hdfs.fileType = DataStream

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

第三步:分别启动node01和node2的flume;

cd /export/servers/apache-flume-1.8.0-bin/bin
flume-ng agent -c ../conf -f ../conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

启动完成后,运行上面编写的脚本测试。

mkdir -p /export/servers/taillogs
sh /export/servers/shells/tail-file.sh

1.3.5 高可用

在这里插入图片描述
上面有三台主机,分别是node0一、node02和node03。node01负责从外采集数据,而后把采集到的数据下沉到node02或node03。Flume NG自己提供了熔断机制实现高可用。因此,即便node02或node03发生熔断,Flume NG也能够自动进行切换或恢复操做。

Flume发行时的版本被统称为Flume OG。但随着Flume功能的不断扩展,Flume OG代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来。2011 年 10 月,Flume开发团队对Flume NG进行重构,重构后的版本统称为Flume NG。通过架构重构后,Flume NG变成了一个轻量级的日志采集工具,并支持熔断和负载均衡。

配置高可用只须要在node01的采集方案中定义两个sink,分别指向node02和node03。
在这里插入图片描述
sink配置:

# sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

# 将sink添加到sink group里面
agent1.sinkgroups.g1.sinks = k1 k2

启用熔断须要将sink gourp的处理类型设置为failover

agent1.sinkgroups.g1.processor.type = failover
# 设置权重,权重越高,优先级也就越高
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
# 设置failover time的上限,单位是毫秒,若是没有设置,默认为30秒
agent1.sinkgroups.g1.processor.maxpenalty = 10000
  • 测试工做:

首先咱们在Node01上传文件,Node01负责从指定目录日志目录采集数据。因为sink1的权重比sink2的权重大,因此 Node02的Agent优先采集并上传到存储系统。而后咱们kill掉node02,此时有Node03负责日志的采集上传工做,接着咱们手动恢复Node02节点的Flume服务,而后再次在Node01上传文件,发现Node02恢复优先级别的采集工做。

1.3.6 负载均衡

假如Agent1 是一个路由节点,负责将 Channel 暂存的Event 均衡到对应的多个 Sink组件上,而每一个 Sink 组件分别链接到一个独立的 Agent 上。以下所示:
在这里插入图片描述
Flume NG自动负载均衡的功能。若是要启动负载均衡,只须要将sink group的处理类型设置为load_balance

agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut = 10000

1.3.7 拦截器

假设有AB两台主机实时地采集日志文件的数据。最后在主机C上做汇总,而后统一保存到HDFS上。
在这里插入图片描述
上面部署了两个Logserver,用于采集日志数据。他们把采集到的数据发送给Flume-collector进行汇总,而后把汇总后的数据存储到Hbase或HDFS上。

问题:由于有两台主机时采集日志文件数据,那么Flume-Collector是如何知道采集到的数据是来自于哪一个日志文件呢?????答案是使用拦截器。

1.3.7.1 什么是拦截器

拦截器是设置在Source和Channel之间的一个组件。source接收到的时间,在写入channel以前,拦截器均可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。

  • 内置拦截器:
    1)Timestamp Interceptor:使用该拦截器,Agent会将时间戳插入到event header中。若是不使用任何拦截器,flume接受到的只有message。
    2)Host Interceptor:该拦截器会将主机的ip地址或者主机名等内容插入到event header中;
    3)Static Interceptor:该拦截器将k/v插入到event header中;
    4)Regex Filtering Interceptor:该拦截器能够将一些不须要的日志过滤掉,也能够根据须要收集知足正则条件的日志数据;
    5)Regex Extractor Interceptor:在event header中添加指定的k/v为知足正则的内容;
    6)UUID Interceptor:在每一个events header中生成一个UUID字符串,生成的UUID能够在sink中读取;
    7)Morphline Interceptor:该拦截器使用Morphline对每一个events数据作相应的转换;
    8)Search and Replace Interceptor:此拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能;

1.3.7.2 拦截器的使用

在这里插入图片描述
在主机a和主机b上定义三个Source,分别从不一样的日志文件采集数据。由于在主机a和主机b上定义了static interceptor拦截器,该拦截器会把指定的k/v(如:type=access、type=nginx、type=web)添加到event header中。而后将采集到的数据下沉到主机c中做汇总。汇总后,主机c从event header中获取type的值,该type做为hdfs上存储文件的目录,最后把汇总的数据存储到hdfs指定目录下(数据存放格式以下)。
在这里插入图片描述

1.3.7.3 拦截器的配置

  • node01和node02的配置:
# 定义3个source
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# 第一个source采集access.log文件数据
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/servers/taillogs/access.log
# 定义静态拦截器,每个source定义一个拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# 第二个source采集nginx.log文件数据
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/servers/taillogs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

# 第三个source采集web.log文件数据
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/servers/taillogs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# 定义Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414

# 定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# 与channel创建链接
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
  • node03的配置:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port =41414

# 添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# 定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# 定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:8020/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# 时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
# 按时间生成,单位为秒
a1.sinks.k1.hdfs.rollInterval = 30
# 按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
# 批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
# 操做hdfs的线程数
a1.sinks.k1.hdfs.threadsPoolSize=10
# 操做hdfs超时时间,单位为毫秒
a1.sinks.k1.hdfs.callTimeout=30000

# 与channel创建链接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 采集数据脚本:
# !/bin/bash
while true
do
date >> /export/servers/taillogs/access.log;
date >> /export/servers/taillogs/web.log;
date >> /export/servers/taillogs/nginx.log;
sleep 0.5;
done