收集、移动、聚合大量日志数据的服务。 基于流数据的架构,用于在线日志分析。 基于事件。 在生产和消费者之间启动协调做用。 提供了事务保证,确保消息必定被分发。 Source 多种 sink多种. multihop //多级跃点. 水平扩展: //加节点 竖直扩展 //增长硬件。
接受数据,类型有多种。
临时存放地,对Source中来的数据进行缓冲,直到sink消费掉。
从channel提取数据存放到中央化存储(hadoop / hbase)。
1.下载 2.tar 3.环境变量 4.验证flume是否成功 $>flume-ng version //next generation.下一代.
1.建立配置文件 [/soft/flume/conf/hello.conf] #声明三种组件 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #定义source信息 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 #定义sink信息 a1.sinks.k1.type=logger #定义channel信息 a1.channels.c1.type=memory #绑定在一块儿 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 2.运行 a)启动flume agent $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console b)启动nc的客户端 $>nc localhost 8888 $nc>hello world c)在flume的终端输出hello world.
$>sudo yum install nmap-ncat.x86_64
$>修改ali.repo --> ali.repo.bak文件。 $>sudo yum clean all $>sudo yum makecache #例如阿里基本源 $>sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo #阿里epel源 $>sudo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
1.netcat nc .. 2.exec 实时日志收集,实时收集日志。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=exec a1.sources.r1.command=tail -F /home/centos/test.txt a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 3.批量收集 监控一个文件夹,静态文件。 收集完以后,会重命名文件成新文件。.compeleted. a)配置文件 [spooldir_r.conf] a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/home/centos/spool a1.sources.r1.fileHeader=true a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 b)建立目录 $>mkdir ~/spool c)启动flume $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console 4.序列source [seq] a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type=seq a1.sources.r1.totalEvents=1000 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 [运行] $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console 5.StressSource a1.sources = stresssource-1 a1.channels = memoryChannel-1 a1.sources.stresssource-1.type = org.apache.flume.source.StressSource a1.sources.stresssource-1.size = 10240 a1.sources.stresssource-1.maxTotalEvents = 1000000 a1.sources.stresssource-1.channels = memoryChannel-1
1.hdfs a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S a1.sinks.k1.hdfs.filePrefix = events- #是不是产生新目录,每十分钟产生一个新目录,通常控制的目录方面。 #2017-12-12 --> #2017-12-12 -->%H%M%S a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second a1.sinks.k1.hdfs.useLocalTimeStamp=true #是否产生新文件。 a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.rollSize=10 a1.sinks.k1.hdfs.rollCount=3 a1.channels.c1.type=memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2.hive 略 3.hbase a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sinks.k1.type = hbase a1.sinks.k1.table = ns1:t12 a1.sinks.k1.columnFamily = f1 a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.channels.c1.type=memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4.kafka
1.建立配置文件 [avro_hop.conf] #a1 a1.sources = r1 a1.sinks= k1 a1.channels = c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.k1.type = avro a1.sinks.k1.hostname=localhost a1.sinks.k1.port=9999 a1.channels.c1.type=memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #a2 a2.sources = r2 a2.sinks= k2 a2.channels = c2 a2.sources.r2.type=avro a2.sources.r2.bind=localhost a2.sources.r2.port=9999 a2.sinks.k2.type = logger a2.channels.c2.type=memory a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 2.启动a2 $>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console 3.验证a2 $>netstat -anop | grep 9999 4.启动a1 $>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1 5.验证a1 $>netstat -anop | grep 8888
1.MemoryChannel 略 2.FileChannel
a1.sources = r1
a1.sinks= k1
a1.channels = c1web
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888apache
a1.sinks.k1.type=loggercentos
a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/centos/flume/fc_check a1.channels.c1.dataDirs = /home/centos/flume/fc_data
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1缓存
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
#0表示禁用内存通道,等价于文件通道
a1.channels.c1.memoryCapacity = 0
#0,禁用文件通道,等价内存通道。
a1.channels.c1.overflowCapacity = 2000架构
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /user/centos/flume/fc_check
a1.channels.c1.dataDirs = /user/centos/flume/fc_datamaven
1.添加pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.it18zhang</groupId> <artifactId>FluemDemo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies> </project>