Apache Flume是一个分布式,可靠且可用的系统,用于高效地收集,汇总和未来自多个不一样源的大量日 志数据移动到集中式数据存储区。 Apache Flume不单单日志数据聚合,由于数据源能够定制,所以flume被用于传输大规模数据,但不限于网络流量数据,社交媒体生成数据,电子邮件数据以及几乎任何可能的数据源
flume-ng-channels、flume-ng-sources、flume-ng-sinks:这三个项目主要包括各类插件,flume-ng-channels包括flile、jdbc等;flume-ng-sources包括kafka、jms等插件;flume-ng-sinks包括hdfs、hive等,你们看插件能够看看这里是怎样实现的,本身也能够写一些插件知足本身的需求 flume-ng-client:这个里面包括log4j日志直接发送给flume,还包括负载均衡的实现 flume-ng-configuration:这个模块比较重要,主要加载配置文件的参数,你们必需要详细看一下,看看是怎样加载配置文件,还有生成sourcerunner、sinkrunner等 flume-ng-core:这个模块是核心模块,这里有ChannelFactory、SinkFactory、SourceFactory等,看源码必须得看懂这里,这就是为何称为核心的东西,我debug好多遍才看懂这里,这里有好多借口,你们应该好好理解 flume-ng-node:这个模块是flume程序的入口,入口类是Application
boolean reload = !commandLine.hasOption("no-reload-conf");//这个参数是否配置文件改变自动加载参数 if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { isZkConfigured = true;//是否使用zookeeper,因为如今在实验阶段,不推荐适应zookeeper } Application application = null; if (isZkConfigured) {//这段代码是对zookeeper的配置加载和调用 // get options String zkConnectionStr = commandLine.getOptionValue('z'); String baseZkPath = commandLine.getOptionValue('p'); if (reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); List<LifecycleAware> components = Lists.newArrayList(); PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider = new PollingZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath, eventBus); components.add(zookeeperConfigurationProvider); application = new Application(components); eventBus.register(application); } else { StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider = new StaticZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath); application = new Application(); application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration()); } } else { File configurationFile = new File(commandLine.getOptionValue('f')); /* * The following is to ensure that by default the agent will fail on * startup if the file does not exist. */ if (!configurationFile.exists()) { // If command line invocation, then need to fail fast if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) { String path = configurationFile.getPath(); try { path = configurationFile.getCanonicalPath(); } catch (IOException ex) { logger.error("Failed to read canonical path for file: " + path, ex); } throw new ParseException( "The specified configuration file does not exist: " + path); } } List<LifecycleAware> components = Lists.newArrayList(); if (reload) {//用观察者模式加载配置文件,若是配置文件有变化会自动加载配置文件 EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else {//这种状况就是配置文件修改必须重启flume PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration());//这里就是加载配置文件 } } application.start();//若是reload为true,先加载配置文件 final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } });
public synchronized void start() { for (LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } } public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) {//监督程序 if (this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()) { throw new FlumeException("Supervise called on " + lifecycleAware + " " + "after shutdown has been initiated. " + lifecycleAware + " will not" + " be started"); } Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware), "Refusing to supervise " + lifecycleAware + " more than once"); if (logger.isDebugEnabled()) { logger.debug("Supervising service:{} policy:{} desiredState:{}", new Object[] { lifecycleAware, policy, desiredState }); } Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable();//声明一个线程 monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS);//每三秒检查一下程序是否在 monitorFutures.put(lifecycleAware, future); } public static class MonitorRunnable implements Runnable {//定时器会每隔三秒调用一次 public ScheduledExecutorService monitorService; public LifecycleAware lifecycleAware; public Supervisoree supervisoree; @Override public void run() { logger.debug("checking process:{} supervisoree:{}", lifecycleAware, supervisoree); long now = System.currentTimeMillis(); try { if (supervisoree.status.firstSeen == null) { logger.debug("first time seeing {}", lifecycleAware); supervisoree.status.firstSeen = now; } supervisoree.status.lastSeen = now; synchronized (lifecycleAware) { if (supervisoree.status.discard) { // Unsupervise has already been called on this. logger.info("Component has already been stopped {}", lifecycleAware); return; } else if (supervisoree.status.error) { logger.info("Component {} is in error state, and Flume will not" + "attempt to change its state", lifecycleAware); return; } supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { logger.debug("Want to transition {} from {} to {} (failures:{})", new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start();//全部组件的启动都是在这在这个位置启动,首先你要确认lifecycleAware的实现类,若是你是用观察者模式调用配置文件,第一次先是调用配置文件,你能够看一下LifecycleAware实现关系 } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++; } break; case STOP: try { lifecycleAware.stop(); } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } } } } catch (Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); } }
这个函数比较重要,channel、source和sinks的启动都会用到这个函数,这个函数是监督函数
从图中咱们能够看到PollingPropertiesFileConfigurationProvider类实现LifecycleAware,如今就开始加载相关的参数node
public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); }
在这里有启动一个线程FileWatcherRunnable网络
public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration());//在这里加载配置文件和启动观察者模式 } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } }
eventBus.post(getConfiguration())这个函数首先加载配置文件和启动观察者模式;app
//这是启动观察者模式入口 @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
咱们看一下加载配置文件负载均衡
public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap);//加载channle相关的参数 loadSources(agentConf, channelComponentMap, sourceRunnerMap);//sourcerunner和指定source的相关参数 loadSinks(agentConf, channelComponentMap, sinkRunnerMap);//sinkrunner和指定sink相关参数 Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }
for (String chName : channelNames) { ComponentConfiguration comp = compMap.get(chName); if (comp != null) { Channel channel = getOrCreateChannel(channelsNotReused, comp.getComponentName(), comp.getType());//得到channel的相关参数 try { Configurables.configure(channel, comp); channelComponentMap.put(comp.getComponentName(), new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + "error during configuration", chName); LOGGER.error(msg, e); } } } if (channelClass.isAnnotationPresent(Disposable.class)) { Channel channel = channelFactory.create(name, type);//工厂模式建立 channel.setName(name); return channel; }
Map<String, Context> sourceContexts = agentConf.getSourceContext(); for (String sourceName : sourceNames) { Context context = sourceContexts.get(sourceName); if (context != null) { Source source = sourceFactory.create(sourceName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(source, context);//加载source的参数,在这里会调用每一个source的参数 List<Channel> sourceChannels = new ArrayList<Channel>(); String[] channelNames = context.getString( BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); } Map<String, String> selectorConfig = context.getSubProperties( BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig);//建立channel的选择器 ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, context); source.setChannelProcessor(channelProcessor); sourceRunnerMap.put(sourceName, SourceRunner.forSource(source));//建立启动source的sourcerunner for (Channel channel : sourceChannels) { ChannelComponent channelComponent = Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName);
flume启动source必须有sourcerunner来启动,而sourcerunner分为PollableSourceRunner和EventDrivenSourceRunner,source有多是其中一种实现的。分布式
Map<String, Context> sinkContexts = agentConf.getSinkContext(); for (String sinkName : sinkNames) { Context context = sinkContexts.get(sinkName); if (context != null) { Sink sink = sinkFactory.create(sinkName, context.getString( BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(sink, context);//加载sink的参数 ChannelComponent channelComponent = channelComponentMap.get( context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)); if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); } sink.setChannel(channelComponent.channel); sinks.put(sinkName, sink); channelComponent.components.add(sinkName); } catch (Exception e) { String msg = String.format("Sink %s has been removed due to an " + "error during configuration", sinkName); LOGGER.error(msg, e); } } } loadSinkGroups(agentConf, sinks, sinkRunnerMap);
sinkrunner只有这一种,它会来调用指定sinkide
由于用的是观察模式,因此会从这里执行,这是guava中实现 @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try { logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for (Channel ch : materializedConfiguration.getChannels().values()) { while (ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)) { try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) { try { logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }
从这里开始启动全部的channel sink 和source组件函数