若是你还没看过Flume-ng源码解析系列中的启动流程和Channel组件,能够点击下面连接:
Flume-ng源码解析之启动流程
Flume-ng源码解析之Channel组件node
做为启动流程中第二个启动的组件,咱们今天来看看Sink的细节apache
Sink在agent中扮演的角色是消费者,将event输送到特定的位置负载均衡
首先依然是看代码,由代码咱们能够看出Sink是一个接口,里面最主要的方法是process(),用来处理从Channel中获取的数据。Sink的实例是由SinkFactory.create()生成的。ide
@InterfaceAudience.Public @InterfaceStability.Stable public interface Sink extends LifecycleAware, NamedComponent { public void setChannel(Channel channel); public Channel getChannel(); /* 用来处理channel中取来的event*/ public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF } }
在启动流程中咱们了解到Application中启动的不是Sink,而是SinkRunner,由名字咱们能够看出这是一个驱动类。咱们来看看代码,主要看它的start()this
public class SinkRunner implements LifecycleAware { ... @Override public void start() { SinkProcessor policy = getPolicy(); policy.start(); runner = new PollingRunner(); runner.policy = policy; runner.counterGroup = counterGroup; runner.shouldStop = new AtomicBoolean(); runnerThread = new Thread(runner); runnerThread.setName("SinkRunner-PollingRunner-" + policy.getClass().getSimpleName()); runnerThread.start(); lifecycleState = LifecycleState.START; } ... }
咱们知道启动SinkRunner实际上就是调用它的start(),而在start()中能够看到主要是启动了一个SinkProcessor,而这个SinkProcessor在建立SinkRunnner的时候已经指定了,若是你想要了解配置文件是如何处理的,能够要去看看conf包里面的类,能够看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。.net
咱们接着看看SinkProcessor线程
public interface SinkProcessor extends LifecycleAware, Configurable { Status process() throws EventDeliveryException; void setSinks(List<Sink> sinks); }
SinkProcesor是一个接口,他的实现类由SinkProcessorFactory的getProcessor()生成,在AbstractConfigurationProvider中的loadSinkGroup()调用SinkGroup中的configure()生成。code
public class SinkGroup implements Configurable, ConfigurableComponent { List<Sink> sinks; SinkProcessor processor; SinkGroupConfiguration conf; public SinkGroup(List<Sink> groupSinks) { sinks = groupSinks; } public SinkProcessor getProcessor() { return processor; } @Override public void configure(ComponentConfiguration conf) { this.conf = (SinkGroupConfiguration) conf; processor = SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(), sinks); } }
那么咱们以DefalutSinkProcessor为例子看看blog
public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent { private Sink sink; private LifecycleState lifecycleState; @Override public void start() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.start(); lifecycleState = LifecycleState.START; } @Override public void stop() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.stop(); lifecycleState = LifecycleState.STOP; } @Override public LifecycleState getLifecycleState() { return lifecycleState; } @Override public void configure(Context context) { } @Override public Status process() throws EventDeliveryException { return sink.process(); } @Override public void setSinks(List<Sink> sinks) { Preconditions.checkNotNull(sinks); Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can " + "only handle one sink, " + "try using a policy that supports multiple sinks"); sink = sinks.get(0); } @Override public void configure(ComponentConfiguration conf) { } }
从上面的代码中咱们能够看到SinkProcessor执行的仍是sink的start、stop和process方法,那么SinkProcessor的做用是什么,Flume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor,顾名思义,一个是失效备援,一个是负载均衡,那么SinkProcessor不一样子类的存在就是为了实现不一样的分配操做和策略。而sink的start()一般是启动线程去执行消费操做。接口