咱们已经在前面讲过加载flume的配置文件,其中有channel sink和source的组件参数,flume1.8源码阅读(一),前面知识讲加载相关的参数,下面将source的启动ide
public class EventDrivenSourceRunner extends SourceRunner { private LifecycleState lifecycleState; public EventDrivenSourceRunner() { lifecycleState = LifecycleState.IDLE; } @Override public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start();//这是启动source lifecycleState = LifecycleState.START; } @Override public void stop() { Source source = getSource(); source.stop(); ChannelProcessor cp = source.getChannelProcessor(); cp.close(); lifecycleState = LifecycleState.STOP; } @Override public String toString() { return "EventDrivenSourceRunner: { source:" + getSource() + " }"; } @Override public LifecycleState getLifecycleState() { return lifecycleState; } }
在这里以SpoolDirectorySource为例来启动ui
@Override public synchronized void start() { logger.info("SpoolDirectorySource source starting with directory: {}", spoolDirectory); executor = Executors.newSingleThreadScheduledExecutor(); File directory = new File(spoolDirectory); try { reader = new ReliableSpoolingFileEventReader.Builder() .spoolDirectory(directory) .completedSuffix(completedSuffix) .includePattern(includePattern) .ignorePattern(ignorePattern) .trackerDirPath(trackerDirPath) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) .annotateBaseName(basenameHeader) .baseNameHeader(basenameHeaderKey) .deserializerType(deserializerType) .deserializerContext(deserializerContext) .deletePolicy(deletePolicy) .inputCharset(inputCharset) .decodeErrorPolicy(decodeErrorPolicy) .consumeOrder(consumeOrder) .recursiveDirectorySearch(recursiveDirectorySearch) .build(); } catch (IOException ioe) { throw new FlumeException("Error instantiating spooling event parser", ioe); } Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter); executor.scheduleWithFixedDelay( runner, 0, pollDelay, TimeUnit.MILLISECONDS);//在这里启动线程来读取数据 super.start(); logger.debug("SpoolDirectorySource source started"); sourceCounter.start(); }
public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start();//启动source组件,加载参数和得到文件 runner = new PollingRunner();//读取数据 runner.source = source; runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START; }
PollableSourceRunner比EventDrivenSourceRunner多了一个本身处理的事件处理方法,EventDrivenSourceRunner自己本身带一个方法 这里就是source的启动,我只是简单的介绍一下启动的流程,里面详细的内容会后续再说明,今天就写到这,明天继续写。。。。加油,天天写一点,天天就会有一点收获,再难也要坚持先去。。。。.net