在AbstractConfigurationProvider类中loadSources方法会将全部的source进行封装成SourceRunner放到了Map<String, SourceRunner> sourceRunnerMap之中。相关代码以下:html
1 Map<String, String> selectorConfig = context.getSubProperties( 2 BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); 3 4 ChannelSelector selector = ChannelSelectorFactory.create( 5 sourceChannels, selectorConfig); 6 7 ChannelProcessor channelProcessor = new ChannelProcessor(selector); 8 Configurables.configure(channelProcessor, context); 9 source.setChannelProcessor(channelProcessor); 10 sourceRunnerMap.put(sourceName, 11 SourceRunner.forSource(source));
每一个source都有selector。上述代码会获取配置文件中关于source的selector配置信息;而后构造ChannelSelector对象selector;并封装selector对象成ChannelProcessor对象channelProcessor;执行channelProcessor.configure方法进行配置;设置soure的channelprocessor,最后封装为sourceRunner和source名称一块儿放入sourceRunnerMap中。 数据结构
1、ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig)会根据配置文件中指定的类型实例化一个ChannelSelector(共两种ReplicatingChannelSelector复制和MultiplexingChannelSelector复用)若是没有指定类型默认是ReplicatingChannelSelector,也就是配置文件中不用配置selector会将每一个event复制发送到多个channel;selector.setChannels(channels);对此slector进行配置configure(context)。这两中selector都实现了三个方法getRequiredChannels(Event event)、getOptionalChannels(Event event) 以及configure(Context context)。其实Event要发送到的channel有两种组成:RequiredChannels和OptionalChannels,对应两个方法。app
(1)ReplicatingChannelSelector的configure(context)方法会得到经过"optional"在配置文件中指定的可选发送的channels(能够多个,经过空格分割);获取requiredChannels是此source对应的channel中能够活动的channel列表;而后获取全部channel的名字及其与channel的映射channelNameMap;而后将可选的channel加入optionalChannels并从requiredChannels去掉有对应的channel,在这里并无检查可选channel的合法性以及能够配置此source指定的channel以外的channel,requiredChannels和optionalChannels不能有交集,有交集的话会从requiredChannels中删除相交的channel,因此若是配置文件中optional指定的channel列表和source指定的列表相同getOptionalChannels方法有可能会返回所有可活动channel列表使得数据重复,因此建议optional指定的channel最好是source指定以外的其余channel(好比是其余source的channel)。getOptionalChannels方法就是直接返回optionalChannels列表,getRequiredChannels方法返回requiredChannels列表,若是requiredChannels为null,则返回所有的能够活动的channel列表。ide
(2)MultiplexingChannelSelector的configure(context)先获取要匹配的event的header的headerName,只能选择一个headerName;得到默认发送到的channel列表defaultChannels,能够指定多个默认channel;得到mapping的各个子值,及对应的channel名称mapConfig;用来存储header不一样的值及其对应的要发送到的channel列表(每一个map能够发送到多个channel中,每一个channel也能够同时对应多个mapping),存入channelMapping(这个数据结构是用来存储mapping值及对应的channel列表的);optionalChannels是配置的可选值及其要发送到的channel列表的映射关系,channelMapping中已经出现的channel不容许再次在optionalChannels出现(防止数据重复),若是channelMapping没有这个值对应的channel列表(表示可能会使用默认的channel列表)则使过滤与默认channel列表的交集,optionalChannels存储的是对应header的各个值及其等于该值的event要发送到的可选择的channel列表。getOptionalChannels(Event event)方法返回的是optionalChannels中该event的指定header对应的可选择的channel列表。getRequiredChannels(Event event)方法返回的是channelMapping中该event的指定header对应的channel列表,若是为null(表示因为该event的headers没有匹配的channel就发送到默认的channel中)就返回默认发送列表defaultChannels。须要说明的是选择器配置文件中的"default"、"mapping."、"optional."这三个是同等级的,没有匹配后二者的值时才会选择发送到default对应的channel列表,后二者的值都是event的header中对应配置文件中指定的"header"的各类值。当调用getRequiredChannels(Event event)和getOptionalChannels(Event event)方法时都会对这个event的相应header查找对应要发送到的channel列表。优化
2、 ChannelProcessor channelProcessor = new ChannelProcessor(selector)这个是封装选择器构造channelprocessor。其构造方法会赋值selector并构造一个InterceptorChain对象interceptorChain。ChannelProcessor类负责管理选择器selector和拦截器interceptor。ui
3、执行channelProcessor.configure(Context)进行必要的配置,该方法会调用channelProcessor.configureInterceptors(context)对拦截器们进行获取和配置,configureInterceptors方法会先从配置文件中获取interceptor的组件名字interceptorNames[](能够多个),而后获取全部的“interceptors.”的配置信息interceptorContexts,而后遍历全部interceptorNames从配置文件中获取属于这个interceptor的配置信息及类型(type),根据类型构建相应的interceptor并进行配置configure,加入interceptors列表(用来存放实例化的interceptor);最后将列表传递给interceptorChain。关于更多interceptor的信息能够看这篇Flume-NG源码阅读之Interceptor(原创) 。 spa
4、source.setChannelProcessor(channelProcessor)赋值。各个source经过getChannelProcessor()方法获取processor调用其processEventBatch(events)或者processEvent(event)来将event送到channel中。线程
5、sourceRunnerMap.put(sourceName,SourceRunner.forSource(source))将source封装成SourceRunner放入sourceRunnerMap。SourceRunner.forSource会根据这个source所实现的接口封装成不一样的Runner,有两种接口PollableSource和EventDrivenSource,前者是有本身线程来驱动的须要实现process方法,后者是没有单独的线程来驱动的没有process方法。code
1 public static SourceRunner forSource(Source source) { 2 SourceRunner runner = null; 3 4 if (source instanceof PollableSource) { 5 runner = new PollableSourceRunner(); 6 ((PollableSourceRunner) runner).setSource((PollableSource) source); 7 } else if (source instanceof EventDrivenSource) { 8 runner = new EventDrivenSourceRunner(); 9 ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); 10 } else { 11 throw new IllegalArgumentException("No known runner type for source " 12 + source); 13 } 14 15 return runner; 16 }
(1)PollableSourceRunner的start()方法会获取source的ChannelProcessor,而后执行其initialize()方法,该方法会调用interceptorChain.initialize()方法对拦截器们进行初始化(遍历全部拦截器而后执行拦截器的initialize()方法);而后执行source.start()启动source;再启动一个线程PollingRunner,它的run方法会始终执行source.process()并根据返回的状态值作一些统计工做。htm
(2)EventDrivenSourceRunner的start()方法会获取source的ChannelProcessor,而后执行其initialize()方法,该方法会调用interceptorChain.initialize()方法对拦截器们进行初始化(遍历全部拦截器而后执行拦截器的initialize()方法);而后执行source.start()启动source。
这样就完成了sourceRunnerMap的组装。当在Application中的startAllComponents方法中经过materializedConfiguration.getSourceRunners()获取全部的SourceRunner并放入supervisor.supervise中去执行,会调用到SourceRunner.start()方法,即上面刚讲到的内容。这样source就启动了。而后当将封装的Events或者Event发送到channel时,须要使用对应的方法ChannelProcessor.processEventBatch(List<Event> events)或者ChannelProcessor.processEvent(Event event)就能够将数据从source传输到channel中,这两个方法都会在开始调用interceptorChain.intercept(events)或者interceptorChain.intercept(event)对event增长headers(若是有多个interceptor会遍历interceptors处理每一个event)。ChannelProcessor都是经过在source中直接调用getChannelProcessor()(在全部的source的父类AbstractSource中实现的)得到。看一看processEventBatch(List<Event> events)代码:
1 public void processEventBatch(List<Event> events) { 2 Preconditions.checkNotNull(events, "Event list must not be null"); 3 4 events = interceptorChain.intercept(events); 5 6 Map<Channel, List<Event>> reqChannelQueue = //须要发送到的每一个channel及其要发送到这个channel的event列表 7 new LinkedHashMap<Channel, List<Event>>(); 8 9 Map<Channel, List<Event>> optChannelQueue = //可选的每一个channel及其要发送到这个channel的event列表 10 new LinkedHashMap<Channel, List<Event>>(); 11 12 for (Event event : events) { 13 List<Channel> reqChannels = selector.getRequiredChannels(event); //获取须要发送到的全部channel 14 15 for (Channel ch : reqChannels) { 16 List<Event> eventQueue = reqChannelQueue.get(ch); 17 if (eventQueue == null) { 18 eventQueue = new ArrayList<Event>(); 19 reqChannelQueue.put(ch, eventQueue); 20 } 21 eventQueue.add(event); //将event放入对应channel的event列表 22 } 23 24 List<Channel> optChannels = selector.getOptionalChannels(event); //获取可选的要发送到的全部channel 25 26 for (Channel ch: optChannels) { 27 List<Event> eventQueue = optChannelQueue.get(ch); 28 if (eventQueue == null) { 29 eventQueue = new ArrayList<Event>(); 30 optChannelQueue.put(ch, eventQueue); 31 } 32 33 eventQueue.add(event); //将event放入对应channel的event列表 34 } 35 } 36 37 // Process required channels 38 for (Channel reqChannel : reqChannelQueue.keySet()) { 39 Transaction tx = reqChannel.getTransaction(); //建立事务 40 Preconditions.checkNotNull(tx, "Transaction object must not be null"); 41 try { 42 tx.begin(); 43 44 List<Event> batch = reqChannelQueue.get(reqChannel); 45 46 for (Event event : batch) { //发送到须要发送到的channel 47 reqChannel.put(event); 48 } 49 50 tx.commit(); 51 } catch (Throwable t) { 52 tx.rollback(); //事务回滚 53 if (t instanceof Error) { 54 LOG.error("Error while writing to required channel: " + 55 reqChannel, t); 56 throw (Error) t; 57 } else { 58 throw new ChannelException("Unable to put batch on required " + 59 "channel: " + reqChannel, t); 60 } 61 } finally { 62 if (tx != null) { 63 tx.close(); 64 } 65 } 66 }
上述代码不复杂,会得到全部须要发送到的channel和全部可选的channel,而后针对每一个channel,将全部event放入一个列表与该channel组成映射;而后会遍历两种channel列表中的每一个channel将它对应的全部event发送到对应的channel中。这个方法写的不够友好,还能够再优化,由于方法的参数自己就是一个列表能够省去一层for循环,直接将reqChannelQueue.put(ch, eventQueue)和optChannelQueue.put(ch, eventQueue)中的eventQueue改成传递过来的参数List<Event> events就能够达到优化的目的。
processEvent(Event event)方法就更简单了,将这个event发送到这两种channel列表中每一个channel就能够。
在发送到channel的过程当中咱们也发现都会有事务的建立(getTransaction())、开始(tx.begin())、提交(tx.commit())、回滚(tx.rollback())、关闭(tx.close())等操做,这是必须的。在sink中这些操做须要显示的去调用,而在source端则封装在processEvent和processEventBatch方法中,不须要显示的调用了,但不是不调用。
至此,sourceRunner的配置、初始化、执行就讲解完毕了。在配置文件中看到的interceptor和selector都是在这里进行配置及执行的。经过了解上述,咱们自定义source组件是否是更容易了。呵呵
后续还有精彩内容!敬请期待哈!