若是你还没看过Flume-ng源码解析系列中的启动流程、Channel组件和Sink组件,能够点击下面连接:
Flume-ng源码解析之启动流程
Flume-ng源码解析之Channel组件
Flume-ng源码解析之Sink组件app
在前面三篇文章中咱们初步了解了Flume的启动流程、Channel组件和Sink组件,接下来咱们一块儿来看看agent三大组件中Source组件。ide
Source,做为agent中的消息来源组件,咱们来看看它是如何将event传递给channel的和它的特性。ui
依然先看代码:.net
@InterfaceAudience.Public @InterfaceStability.Stable public interface Source extends LifecycleAware, NamedComponent { public void setChannelProcessor(ChannelProcessor channelProcessor); public ChannelProcessor getChannelProcessor(); }
咱们能够看到它里面定义的两个须要实现方法是getChannelProcessor和setChannelProcessor,咱们大概能够猜到,source就是经过ChannelProcessor将event传输给channel的。debug
这里先来了解一下Source的类型,Flume根据数据来源的特性将Source分红两类类,像Http、netcat和exec等就是属于事件驱动型(EventDrivenSource),而kafka和Jms等就是属于轮询拉取型(PollableSource)。code
据咱们在启动流程中了解到的,Application是先启动SourceRunner,再由SourceRunner来启动source,那么既然source有两种类型,那么Sourcerunner也分为EventDrivenSourceRunner和PollableSourceRunner,咱们来看看它们的start():blog
EventDrivenSourceRunner事件
public class EventDrivenSourceRunner extends SourceRunner { … @Override public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); lifecycleState = LifecycleState.START; } … }
PollableSourceRunnerrem
public class PollableSourceRunner extends SourceRunner { … @Override public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); runner = new PollingRunner(); runner.source = source; runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START; } … public static class PollingRunner implements Runnable { private PollableSource source; private AtomicBoolean shouldStop; private CounterGroup counterGroup; @Override public void run() { logger.debug("Polling runner starting. Source:{}", source); while (!shouldStop.get()) { counterGroup.incrementAndGet("runner.polls"); try { if (source.process().equals(PollableSource.Status.BACKOFF)) { counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval())); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { logger.info("Source runner interrupted. Exiting"); counterGroup.incrementAndGet("runner.interruptions"); } catch (EventDeliveryException e) { logger.error("Unable to deliver event. Exception follows.", e); counterGroup.incrementAndGet("runner.deliveryErrors"); } catch (Exception e) { counterGroup.incrementAndGet("runner.errors"); logger.error("Unhandled exception, logging and sleeping for " + source.getMaxBackOffSleepInterval() + "ms", e); try { Thread.sleep(source.getMaxBackOffSleepInterval()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } logger.debug("Polling runner exiting. Metrics:{}", counterGroup); } } }
不管是PollableSourceRunner仍是EventDrivenSourceRunner,都是调用它里面的source的start()。这个时候咱们看到ChannelProcessor的存在,那么就会有疑惑,这ChannelProcessor哪来的?咱们仍是得看回AbstarctConfigurationProvider,查看里面的loadSources(),咱们就会发现下面这段代码:get
ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, config); source.setChannelProcessor(channelProcessor);
到这里咱们基本已经了解了Source的启动流程,下面以AvroSource为例看看,source是在哪里调用ChannelProcessor的插入方法。
public class AvroSource extends AbstractSource implements EventDrivenSource, Configurable, AvroSourceProtocol { … @Override public Status append(AvroFlumeEvent avroEvent) { if (logger.isDebugEnabled()) { if (LogPrivacyUtil.allowLogRawData()) { logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent); } else { logger.debug("Avro source {}: Received avro event", getName()); } } sourceCounter.incrementAppendReceivedCount(); sourceCounter.incrementEventReceivedCount(); Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); try { getChannelProcessor().processEvent(event); } catch (ChannelException ex) { logger.warn("Avro source " + getName() + ": Unable to process event. " + "Exception follows.", ex); return Status.FAILED; } sourceCounter.incrementAppendAcceptedCount(); sourceCounter.incrementEventAcceptedCount(); return Status.OK; } @Override public Status appendBatch(List<AvroFlumeEvent> events) { logger.debug("Avro source {}: Received avro event batch of {} events.", getName(), events.size()); sourceCounter.incrementAppendBatchReceivedCount(); sourceCounter.addToEventReceivedCount(events.size()); List<Event> batch = new ArrayList<Event>(); for (AvroFlumeEvent avroEvent : events) { Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); batch.add(event); } try { getChannelProcessor().processEventBatch(batch); } catch (Throwable t) { logger.error("Avro source " + getName() + ": Unable to process event " + "batch. Exception follows.", t); if (t instanceof Error) { throw (Error) t; } return Status.FAILED; } sourceCounter.incrementAppendBatchAcceptedCount(); sourceCounter.addToEventAcceptedCount(events.size()); return Status.OK; } … }
在append方法中咱们能够看到getChannelProcessor().processEvent(event);,因此不一样的Source根据它的不一样触发机制和拉取机制,在特定的时候调用ChannelProcessor来执行event的插入。 ·
到此为止,咱们就完成了对Flume启动流程和三大组件的研究,鉴于能力,其中有些细节没办法深刻研究,但愿之后有时间可以继续深刻分析下去。