flume介绍

前言

本文经过对flume的架构,数据链路和数据的可靠性来分析flume的原理,并在文末提供了demo(官网搬运)。html

 

架构

flume能够理解为是一个ETL工具,自己是单点的,也就是只有agent,没有server,可是经过强大的source-channel-sink-source…机制,能够经过在多个节点上起agent构成一个DAG,从而造成分布式形态。参考图1,图2(其余图就不贴了,参考官网文档),flume支持在任意节点启动任意数量的agent,而且agent与agent之间能够经过rpc链接。须要注意的是,1个source能够下发数据到多个channel(经过`ChannelSelector`,最多见的就是广播,或者动态路由),1个sink只能从1个channel获取数据,可是多个sink又能够从1个channel拉取数据(经过`SinkProcessor`)。这样的DAG设计意图很明显,source生产数据一般都是比sink处理数据快的,因此channel起到数据缓冲做用,而且经过事务机制保证数据的可靠性。试想一下场景,假如数据量很大,1个source消费速度跟不上,也就是说达到了单进程的性能瓶颈,那么能够启动多个agent;假如数据量通常,1个source就足够,可是处理很复杂好比IO密集型的操做,那么能够经过多sink的方式从channel拉数据,也就是sink端作负载均衡,好比`LoadBalancingSinkProcessor`。flume这样的设计很好的知足了各类场景需求。node

 

图1apache

图2架构

数据链路

参考图3,flume的三个核心组件的数据链路是基于推送和拉取模式,其中Channel能够理解为列队,也是实现数据可靠性的关键组件。好比常见的,采用FileChannel之类落盘的channel场景中,当source推送数据到channel失败,那么就会触发source重试,当sink拉取数据操做失败,那么回通知channel回滚,直到sink操做成功才提交以前的那些数据,从而使得channel移除那些已经被成功处理的数据。负载均衡

图3分布式

 

flume的组件也不只仅只有source,channel,sink这三个。参考图4,完整的数据链路还有ChannelProcessor和SinkeProcessor。ChannelProcessor主要功能有两个,1.对事件进行拦截,提供修改事件的入口; 2.对channel的选择,咱们知道1个source能够发送数据到多个channel,可是具体发送到哪些channel呢?这里就是选择器决定了source发送到哪些channel。SinkProcessor的功能其实也相似,因为一个channel的事件能够被多个sink拉取,那么SinkProcessor决定了sink拉取的策略,这里flume衍生出了sinkGroup的概念,通常状况下,1个sink对应1个线程,而sinkGroup能够包含多个sink共用1个线程。ide

 

图4工具

 

数据可靠性

数据可靠性是很是重要的一个特性,因此拉出来单独作一下说明。性能

flume基于Channel和Transaction接口实现数据不丢失的特性。其中channel负责对数据持久化,维护了全部没有被事务提交的事件,Transaction负责实现事务语义,相似jdbc的事务语法,以下大数据

Transaction tx = ch.getTransaction();
try {
  tx.begin();
  ...
  // ch.put(event) or ch.take()
  ...
  tx.commit();
 } catch (ChannelException ex) {
  tx.rollback();
   ...
 } finally {
  tx.close();
}

经过这两个接口的保证,flume能够实现at leaset once的语义,这是因为sink能够出现rpc超时等一些问题,致使误觉得失败致使事件被重复拉取。这个问题能够经过对事件分配惟一id,再经过其余大数据组件去重。

 

总结

flume提供一个灵活的设计思路,能够经过agent组合构建出符合本身需求的DAG,有点相似storm,可是程序更加轻量。而且提供了不少开箱即用的插件,能够说是很良心了。

 

demo

下面经过一个案例来了解flume,思路是构建基于netcat的agent,而后经过telnet进行验证。

# 建立一个新的flume配置文件

vi example.conf

# example.conf: A single-node Flume configuration

 

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# Describe the sink

a1.sinks.k1.type = logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

# 启动flume agent,并开启http的度量监控,能够经过http请求获取相关度量数据

flume-ng agent --name a1 --conf-file example.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=9999

 

# 经过telnet进行调试

telnet localhost 44444

# 发现消息,能够看到flume agent能够成功接收

 

参考

// flume官网文档以及源码

http://flume.apache.org/FlumeUserGuide.html

 

// 书乃本也~

《Flume  构建高可用、可扩展的海量日志采集系统》

相关文章
相关标签/搜索