采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就须要把文件采集到HDFS中去web
根据需求,首先定义如下3大要素服务器
l 采集源,即source——监控文件目录 : spooldirspa
l 下沉目标,即sink——HDFS文件系统 : hdfs sink3d
l source和sink之间的传递通道——channel,可用file channel 也能够用内存memory channel日志
配置文件编写:code
vi spooldir-hdfs-sink.conform
#定义三大组件的名称blog
agent1.sources = source1内存
agent1.sinks = sink1ci
agent1.channels = channel1
# 配置source组件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/data/
agent1.sources.source1.fileHeader = false
#配置拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
# 配置sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#滚动生成的文件按大小生成
agent1.sinks.sink1.hdfs.rollSize = 102400
#滚动生成的文件按行数生成
agent1.sinks.sink1.hdfs.rollCount = 1000000
#滚动生成的文件按时间生成
agent1.sinks.sink1.hdfs.rollInterval = 60
#开启滚动生成目录
agent1.sinks.sink1.hdfs.round = true
#以10为一梯度滚动生成
agent1.sinks.sink1.hdfs.roundValue = 10
#单位为分钟
agent1.sinks.sink1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
agent1.channels.channel1.keep-alive = 120
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
添加数据
aaa.txt
13601249301 100 200 300 400 500 600 700
13601249302 100 200 300 400 500 600 700
13601249303 100 200 300 400 500 600 700
13601249304 100 200 300 400 500 600 700
13601249305 100 200 300 400 500 600 700
执行命令
bin/flume-ng agent -c conf -f conf/spooldir-hdfs-sink.conf -n agent1 -Dflume.root.logger=INFO,console
flume的source采用spoodir时! 目录下面不容许存放同名的文件,不然报错!
Channel参数解释:
capacity:默认该通道中最大的能够存储的event数量
trasactionCapacity:每次最大能够从source中拿到或者送到sink中的event数量
keep-alive:event添加到通道中或者移出的容许时间
其余组件:Interceptor(拦截器)
用于Source的一组Interceptor,按照预设的顺序在必要地方装饰和过滤events。
内建的Interceptors容许增长event的headers好比:时间戳、主机名、静态标记等等
定制的interceptors能够经过内省event payload(读取原始日志),实现本身的业务逻辑(很强大)