flume1.8源码阅读(二)

咱们已经在前面讲过加载flume的配置文件,其中有channel sink和source的组件参数,flume1.8源码阅读(一),前面知识讲加载相关的参数,下面将source的启动ide

以这种EventDrivenSourceRunner方式启动source

public class EventDrivenSourceRunner extends SourceRunner {

  private LifecycleState lifecycleState;

  public EventDrivenSourceRunner() {
    lifecycleState = LifecycleState.IDLE;
  }

  @Override
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();//这是启动source
    lifecycleState = LifecycleState.START;
  }

  @Override
  public void stop() {
    Source source = getSource();
    source.stop();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.close();
    lifecycleState = LifecycleState.STOP;
  }

  @Override
  public String toString() {
    return "EventDrivenSourceRunner: { source:" + getSource() + " }";
  }

  @Override
  public LifecycleState getLifecycleState() {
    return lifecycleState;
  }

}

在这里以SpoolDirectorySource为例来启动ui

@Override
  public synchronized void start() {
    logger.info("SpoolDirectorySource source starting with directory: {}",
        spoolDirectory);

    executor = Executors.newSingleThreadScheduledExecutor();

    File directory = new File(spoolDirectory);
    try {
      reader = new ReliableSpoolingFileEventReader.Builder()
          .spoolDirectory(directory)
          .completedSuffix(completedSuffix)
          .includePattern(includePattern)
          .ignorePattern(ignorePattern)
          .trackerDirPath(trackerDirPath)
          .annotateFileName(fileHeader)
          .fileNameHeader(fileHeaderKey)
          .annotateBaseName(basenameHeader)
          .baseNameHeader(basenameHeaderKey)
          .deserializerType(deserializerType)
          .deserializerContext(deserializerContext)
          .deletePolicy(deletePolicy)
          .inputCharset(inputCharset)
          .decodeErrorPolicy(decodeErrorPolicy)
          .consumeOrder(consumeOrder)
          .recursiveDirectorySearch(recursiveDirectorySearch)
          .build();
    } catch (IOException ioe) {
      throw new FlumeException("Error instantiating spooling event parser",
          ioe);
    }

    Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
    executor.scheduleWithFixedDelay(
        runner, 0, pollDelay, TimeUnit.MILLISECONDS);//在这里启动线程来读取数据

    super.start();
    logger.debug("SpoolDirectorySource source started");
    sourceCounter.start();
  }

以这种PollableSourceRunner方式启动source

public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();//启动source组件,加载参数和得到文件

    runner = new PollingRunner();//读取数据

    runner.source = source;
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;

    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }

PollableSourceRunner比EventDrivenSourceRunner多了一个本身处理的事件处理方法,EventDrivenSourceRunner自己本身带一个方法 这里就是source的启动,我只是简单的介绍一下启动的流程,里面详细的内容会后续再说明,今天就写到这,明天继续写。。。。加油,天天写一点,天天就会有一点收获,再难也要坚持先去。。。。.net

相关文章
相关标签/搜索