官方参考文档html
https://flume.apache.org/FlumeUserGuide.html#file-channel前端
Flume NG是一个分布式、可靠、可用的系统,它可以将不一样数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到如今的Flume NG,进行了架构重构,而且如今NG版本彻底不兼容原来的OG版本。通过架构重构后,Flume NG更像是一个轻量的小工具,很是简单,容易适应各类方式日志收集,并支持failover和负载均衡。nginx
架构设计要点shell
Flume的架构主要有一下几个核心概念:数据库
Event:一个数据单元,带有一个可选的消息头apache
Flow:Event从源点到达目的点的迁移的抽象tomcat
Client:操做位于源点处的Event,将其发送到Flume Agentbash
Agent:一个独立的Flume进程,包含组件Source、Channel、Sink服务器
Source:用来消费传递到该组件的Event架构
Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(若是有的话)
Flume NG架构,如图所示:
外部系统产生日志,直接经过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件能够直接把数据存储到HDFS集群上。
一个最基本Flow的配置,格式以下:
# list the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Source2> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ... <Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2>
尖括号里面的,咱们能够根据实际需求或业务来修更名称。下面详细说明:
表示配置一个Agent的名称,一个Agent确定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。
上面配置内容中,第一组中配置Source、Sink、Channel,它们的值能够有1个或者多个;第二组中配置Source将把数据存储(Put)到哪个Channel中,能够存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,其实是Replication;第三组中配置Sink从哪个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。
下面,根据官网文档,咱们展现几种Flow Pipeline,各自适应于什么样的应用场景:
多个Agent顺序链接
能够将多个Agent顺序链接起来,将最初的数据源通过收集,存储到最终的存储系统中。这是最简单的状况,通常状况下,应该控制这种顺序链接的Agent的数量,由于数据流经的路径变长了,若是不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
多个Agent的数据汇聚到同一个Agent
这种状况应用的场景比较多,好比要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每一个节点都产生用户行为日志,能够为每一个节点都配置一个Agent来单独收集日志数据,而后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
多路(Multiplexing)Agent
这种模式,有两种方式,一种是用来复制(Replication),另外一种是用来分流(Multiplexing)。Replication方式,能够将最前端的数据源复制多份,分别传递到多个channel中,每一个channel接收到的数据都是相同的,配置格式,以下所示:
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
上面指定了selector的type的值为replication,其余的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,而后数据被传递到Sink1和Sink2。
Multiplexing方式,selector能够根据header的值来肯定数据传递到哪个channel,配置格式,以下所示:
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:若是header的值为Value一、Value2,数据从Source1路由到Channel1;若是header的值为Value二、Value3,数据从Source1路由到Channel2。
实现load balance功能
Load balancing Sink Processor可以实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每一个Sink组件分别链接到一个独立的Agent上,示例配置,以下所示:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
实现failover能
Failover Sink Processor可以实现failover功能,具体流程相似load balance,可是内部处理机制与load balance彻底不一样:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。若是一个Sink可以成功处理Event,则会加入到一个Pool中,不然会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置以下所示:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000
基本功能
咱们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,可以让咱们在应用中更好地选择使用哪种方案。说明Flume NG的功能,实际仍是围绕着Agent的三个组件Source、Channel、Sink来看它可以支持哪些技术或协议。咱们再也不对各个组件支持的协议详细配置进行说明,经过列表的方式分别对三个组件进行概要说明:
Source类型 说明 Avro Source 支持Avro协议(其实是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变动 Twitter 1% firehose Source 经过API持续下载Twitter数据,试验性质 Netcat Source 监控某个端口,将流经端口的每个文本行数据做为Event输入 Sequence Generator Source 序列生成器数据源,生产序列数据 Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议 HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 Legacy Sources 兼容老的Flume OG中Source(0.9.x版本) Flume Channel ############################################################### Channel类型 说明 Memory Channel Event数据存储在内存中 JDBC Channel Event数据存储在持久化存储中,当前Flume Channel内置支持Derby File Channel Event数据存储在磁盘文件中 Spillable Memory Channel Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) Pseudo Transaction Channel 测试用途 Custom Channel 自定义Channel实现 Flume Sink ################################################################### Sink类型 说明 HDFS Sink 数据写入HDFS Logger Sink 数据写入日志文件 Avro Sink 数据被转换成Avro Event,而后发送到配置的RPC端口上 Thrift Sink 数据被转换成Thrift Event,而后发送到配置的RPC端口上 IRC Sink 数据在IRC上进行回放 File Roll Sink 存储数据到本地文件系统 Null Sink 丢弃到全部数据 HBase Sink 数据写入HBase数据库 Morphline Solr Sink 数据发送到Solr搜索服务器(集群) ElasticSearch Sink 数据发送到Elastic Search搜索服务器(集群) Kite Dataset Sink 写数据到Kite Dataset,试验性质的 Custom Sink 自定义Sink实现 ################################################################# 另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,能够参考官网提供的用户手册。
安装配置略,能够参考网上教程
下面是测试的配置文件
agent 配置文件以下
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 127.0.0.1 a1.sinks.k1.port = 44444 a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/nginx/access.log a1.sources.r1.channels = c1 a1.sources.s.deserializer.maxLineLength=65535
server端配置文件以下: 测试复制(Replication)1个source 复制到多个channels 输出到多个sink
# Name the components on this agent b1.sources = r1 b1.sinks = k1 k2 k3 b1.channels = c1 c2 c3 b1.sources.r1.selector.type = replicating b1.sources.r1.type = avro b1.sources.r1.channels = c1 c2 c3 b1.sources.r1.bind = 0.0.0.0 b1.sources.r1.port = 44444 b1.channels.c1.type = file b1.channels.c1.write-timeout = 10 b1.channels.c1.keep-alive = 10 b1.channels.c1.checkpointDir = /flume/check b1.channels.c1.useDualCheckpoints = true b1.channels.c1.backupCheckpointDir = /flume/backup b1.channels.c1.dataDirs = /flume b1.channels.c2.type=memory b1.channels.c2.capacity=2000000 b1.channels.c2.transactionCapacity=10000 b1.channels.c3.type=memory b1.channels.c3.capacity=2000000 b1.channels.c3.transactionCapacity=10000 # Describe the sink b1.sinks.k1.type = hdfs b1.sinks.k1.channel = c1 b1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/hadoop/flume/collected/ b1.sinks.k1.hdfs.filePrefix = chen_test b1.sinks.k1.hdfs.round = true b1.sinks.k1.hdfs.roundValue = 10 b1.sinks.k1.hdfs.roundUnit = minute b1.sinks.k2.channel = c2 b1.sinks.k2.type = file_roll b1.sinks.k2.batchSize = 100000000 b1.sinks.k2.rollInterval = 1000000 b1.sinks.k2.serializer = TEXT b1.sinks.k2.sink.directory = /var/log/flume b1.sinks.k3.channel = c3 b1.sinks.k3.type = logger
启动测试命令
flume-ng agent -c . -f test.conf -n b1 -Dflume.root.logger=INFO,console
-c 配置文件目录 -f配置文件 -n 节点名字 和配置文件对应 console打到终端
5.flume-ng负载均衡load-balance、failover集群搭建
参考连接
http://blog.csdn.net/lskyne/article/details/37662835
6. 测试 区分 flume日志合并在一块儿的日志
a1配置 [root@host_12 test]# cat a1.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type=static a1.sources.r1.interceptors.i1.key=nginx a1.sources.r1.interceptors.i1.value=nginx_1 a1.sources.r1.interceptors.i1.preserveExisting=false #a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = host #a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sinks.k1.type = avro a1.sinks.k1.hostname = 127.0.0.1 a1.sinks.k1.port = 44444 a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.type = exec ###匹配shell tomcat yyyy:mm:dd:hh格式的日志 a1.sources.r1.shell = /bin/bash -c a1.sources.r1.command = tail -f /var/log/nginx_1/access_`date +%Y%m%d%H`.log a1.sources.r1.channels = c1 #########匹配替换行里的文本的内容 #a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = search_replace #a1.sources.r1.interceptors.i1.searchPattern = [0-9]+ #a1.sources.r1.interceptors.i1.replaceString = lxw1234 #a1.sources.r1.interceptors.i1.charset = UTF-8 ########################################### a2配置 [root@host_12 test]# cat a2.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type=static a2.sources.r1.interceptors.i1.key=nginx a2.sources.r1.interceptors.i1.value=nginx_2 a2.sources.r1.interceptors.i1.preserveExisting=false a2.sinks.k1.type = avro a2.sinks.k1.hostname = 127.0.0.1 a2.sinks.k1.port = 44444 a2.sinks.k1.channel = c1 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sources.r1.type = exec a2.sources.r1.shell = /bin/bash -c a2.sources.r1.command = tail -f /var/log/nginx_2/access_`date +%Y%m%d%H`.log a2.sources.r1.channels = c1 #################################### server配置 [root@host_12 test]# cat h1.conf # Name the components on this agent serv_1.sources = r1 serv_1.sinks = k2 k3 serv_1.channels = c2 c3 #serv_1.sources.r1.selector.type = replicating serv_1.sources.r1.selector.type = multiplexing serv_1.sources.r1.selector.header = nginx serv_1.sources.r1.selector.mapping.nginx_1 = c2 serv_1.sources.r1.selector.mapping.nginx_2 = c3 serv_1.sources.r1.type = avro serv_1.sources.r1.channels = c2 c3 serv_1.sources.r1.bind = 0.0.0.0 serv_1.sources.r1.port = 44444 serv_1.channels.c2.type=memory serv_1.channels.c2.capacity=2000000 serv_1.channels.c2.transactionCapacity=10000 serv_1.channels.c3.type=memory serv_1.channels.c3.capacity=2000000 serv_1.channels.c3.transactionCapacity=10000 serv_1.sinks.k2.channel = c2 serv_1.sinks.k2.type = file_roll serv_1.sinks.k2.batchSize = 100000000 serv_1.sinks.k2.rollInterval = 1000000 serv_1.sinks.k2.serializer = TEXT serv_1.sinks.k2.sink.directory = /var/log/flume/ serv_1.sinks.k3.channel = c3 serv_1.sinks.k3.type = logger