数据采集组件:Flume基础用法和Kafka集成

本文源码:GitHub || GitEEgit

1、Flume简介

一、基础描述

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各种数据发送方,用于收集数据;github

特色:分布式、高可用、基于流式架构,一般用来收集、聚合、搬运不一样数据源的大量日志到数据仓库。算法

二、架构模型

image

Agent包括三个核心组成,Source、Channel、Sink。Source负责接收数据源,并兼容多种类型,Channel是数据的缓冲区,Sink处理数据输出的方式和目的地。数据库

Event是Flume定义的一个数据流传输的基本单元,将数据从源头送至目的地。apache

image

Flume能够设置多级Agent链接的方式传输Event数据,从最初的source开始到最终sink传送的目的存储系统,若是数量过多会影响传输速率,而且传输过程当中单节点故障也会影响整个传输通道。bootstrap

image

Flume支持多路复用数据流到一个或多个目的地,这种模式能够将相同数据复制到多个channel中,或者将不一样数据分发到不一样的channel中,而且sink能够选择传送到不一样的目的地。vim

image

Agent1理解为路由节点负责Channel的Event均衡到多个Sink组件,每一个Sink组件分別链接到独立的Agent上,实现负载均衡和错误恢复的功能。segmentfault

image

Flume的使用组合方式作数据聚合,每台服务器部署一个flume节点采集日志数据,再汇聚传输到存储系统,例如HDFS、Hbase等组件,高效且稳定的解决集群数据的采集。设计模式

2、安装过程

一、安装包

apache-flume-1.7.0-bin.tar.gz服务器

二、解压命名

[root@hop01 opt]# pwd
/opt
[root@hop01 opt]# tar -zxf apache-flume-1.7.0-bin.tar.gz
[root@hop01 opt]# mv apache-flume-1.7.0-bin flume1.7

三、配置文件

配置路径:/opt/flume1.7/conf

mv flume-env.sh.template flume-env.sh

四、修改配置

添加JDK依赖

vim flume-env.sh
export JAVA_HOME=/opt/jdk1.8

五、环境测试

安装netcat工具

sudo yum install -y nc

建立任务配置

[root@hop01 flume1.7]# cd job/
[root@hop01 job]# vim flume-netcat-test01.conf

添加基础任务配置

注意:a1表示agent名称。

# this agent
a1.sources = sr1
a1.sinks = sk1
a1.channels = sc1

# the source
a1.sources.sr1.type = netcat
a1.sources.sr1.bind = localhost
a1.sources.sr1.port = 55555

# the sink
a1.sinks.sk1.type = logger

# events in memory
a1.channels.sc1.type = memory
a1.channels.sc1.capacity = 1000
a1.channels.sc1.transactionCapacity = 100

# Bind the source and sink
a1.sources.sr1.channels = sc1
a1.sinks.sk1.channel = sc1

开启flume监听端口

/opt/flume1.7/bin/flume-ng agent --conf /opt/flume1.7/conf/ --name a1 --conf-file /opt/flume1.7/job/flume-netcat-test01.conf -Dflume.root.logger=INFO,console

使用netcat工具向55555端口发送数据

[root@hop01 ~]# nc localhost 55555
hello,flume

查看flume控制面

image

3、应用案例

一、案例描述

image

基于flume在各个集群服务进行数据采集,而后数据传到kafka服务,再考虑数据的消费策略。

采集:基于flume组件的便捷采集能力,若是直接使用kafka会产生大量的埋点动做很差维护。

消费:基于kafka容器的数据临时存储能力,避免系统高度活跃期间采集数据过大冲垮数据采集通道,而且能够基于kafka作数据隔离并针对化处理。

二、建立kafka配置

[root@hop01 job]# pwd
/opt/flume1.7/job
[root@hop01 job]# vim kafka-flume-test01.conf

三、修改sink配置

# the sink
a1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
# topic
a1.sinks.sk1.topic = kafkatest
# broker地址、端口号
a1.sinks.sk1.kafka.bootstrap.servers = hop01:9092
# 序列化方式
a1.sinks.sk1.serializer.class = kafka.serializer.StringEncoder

四、建立kafka的Topic

上述配置文件中名称:kafkatest,下面执行建立命令以后查看topic信息。

[root@hop01 bin]# pwd
/opt/kafka2.11
[root@hop01 kafka2.11]# bin/kafka-topics.sh --create --zookeeper hop01:2181 --replication-factor 1 --partitions 1 --topic kafkatest
[root@hop01 kafka2.11]# bin/kafka-topics.sh --describe --zookeeper hop01:2181 --topic kafkatest

五、启动Kakfa消费

[root@hop01 kafka2.11]# bin/kafka-console-consumer.sh --bootstrap-server hop01:2181 --topic kafkatest --from-beginning

这里指定topic是kafkatest。

六、启动flume配置

/opt/flume1.7/bin/flume-ng agent --conf /opt/flume1.7/conf/ --name a1 --conf-file /opt/flume1.7/job/kafka-flume-test01.conf -Dflume.root.logger=INFO,console

4、源代码地址

GitHub·地址
https://github.com/cicadasmile/big-data-parent
GitEE·地址
https://gitee.com/cicadasmile/big-data-parent

阅读标签

Java基础】【设计模式】【结构与算法】【Linux系统】【数据库

分布式架构】【微服务】【大数据组件】【SpringBoot进阶】【Spring&Boot基础

数据分析】【技术导图】【 职场

相关文章
相关标签/搜索