ExecSource从本地收集日志并发送至fileChannel

常见的日志收集方式有两种,一种是经由本地日志文件作媒介,异步地发送到远程日志仓库,一种是基于RPC方式的同步日志收集,直接发送到远程日志仓库。这篇讲讲Flume NG如何从本地日志文件中收集日志。 ExecSource是用来执行本地shell命令,并把本地日志文件中的数据封装成Event事件流在Flume NG中流动。它的典型配置以下,指定source类型是exec,指定Source下游的Channel是哪一个,指定要执行的shell命令。最经常使用的命令就是tail -F命令,能够从本地日志文件中获取新追加的日志。java

producer.sources.s1.type = exec
producer.sources.s1.channels = channel
producer.sources.s1.command = tail -F /data/logs/test.log

看一下ExecSource的实现流程shell

  1. ExecSource维护了一个单线程的线程池executor,以及配置的shell命令,计数器等属性
public void start() {
  logger.info("Exec source starting with command:{}", command);
  executor = Executors.newSingleThreadExecutor();
  runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
      restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
  // FIXME: Use a callback-like executor / future to signal us upon failure.
  runnerFuture = executor.submit(runner);
  sourceCounter.start();
  super.start();
  logger.debug("Exec source started");
}
  1. ExecRunnable对象实现了Runnable接口,被executor线程池执行。 ExecRunnable实现了获取本地日志的主要流程
  2. ExecRunnable维护了一个定时执行的线程池timedFlushService,定时(3s)去检查Event列表,若是符合批量输出的要求,就批量flush event 这里就是执行shell命令,而且将shell命令的输出结果做为输入流读到reader中,InputStreamReader是字节流通向字符流的桥梁,它使用指定的charset读取字节并将其解码为字符,每次调用read方法都会从底层输入流读取一个或多个字节。
public void run() {
  do {
    String exitCode = "unknown";
    BufferedReader reader = null;
    String line = null;
    final List<Event> eventList = new ArrayList<Event>();
    timedFlushService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat(
            "timedFlushExecService" +
            Thread.currentThread().getId() + "-%d").build());
    try {
      if(shell != null) {
        String[] commandArgs = formulateShellCommand(shell, command);
        process = Runtime.getRuntime().exec(commandArgs);
      }  else {
        String[] commandArgs = command.split("\\s+");
        process = new ProcessBuilder(commandArgs).start();
      }
      reader = new BufferedReader(
          new InputStreamReader(process.getInputStream(), charset));
          StderrReader stderrReader = new StderrReader(new BufferedReader(
          new InputStreamReader(process.getErrorStream(), charset)), logStderr);
      stderrReader.setName("StderrReader-[" + command + "]");
      stderrReader.setDaemon(true);
      stderrReader.start();

      future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
          @Override
          public void run() {
            try {
              synchronized (eventList) {
                if(!eventList.isEmpty() && timeout()) {
                  flushEventBatch(eventList);
                }
              }
            } catch (Exception e) {
              logger.error("Exception occured when processing event batch", e);
              if(e instanceof InterruptedException) {
                  Thread.currentThread().interrupt();
              }
            }
          }
      },
      batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
      while ((line = reader.readLine()) != null) {
        synchronized (eventList) {
          sourceCounter.incrementEventReceivedCount();
          eventList.add(EventBuilder.withBody(line.getBytes(charset)));
          if(eventList.size() >= bufferCount || timeout()) {
            flushEventBatch(eventList);
          }
        }
      }
      synchronized (eventList) {
          if(!eventList.isEmpty()) {
            flushEventBatch(eventList);   //此将event发送到channel中
          }
      }
    } catch (Exception e) {
      logger.error("Failed while running command: " + command, e);
      if(e instanceof InterruptedException) {
        Thread.currentThread().interrupt();
      }
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          logger.error("Failed to close reader for exec source", ex);
        }
      }
      exitCode = String.valueOf(kill());
    }
    if(restart) {
      logger.info("Restarting in {}ms, exit code {}", restartThrottle,
          exitCode);
      try {
        Thread.sleep(restartThrottle);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    } else {
      logger.info("Command [" + command + "] exited with " + exitCode);
    }
  } while(restart);
}
  1. ExecRunnable使用Runtime.getRuntime().exec以及java.lang.ProcessBuilder来使用Java平台执行操做系统的Shell命令,并把这个Shell命令建立的进程的输出流重定向到Java平台的流,从而在Java平台能够获取到本地日志文件的数据。这里的Shell命令是tail -F

输入图片说明 这里最主要的是步骤是在Java平台中使用Shell命令来获取本地日志文件的数据,主要的代码以下网络

try {
          if(shell != null) {
            String[] commandArgs = formulateShellCommand(shell, command);
            process = Runtime.getRuntime().exec(commandArgs);
          }  else {
            String[] commandArgs = command.split("\\s+");
            process = new ProcessBuilder(commandArgs).start();
          }
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));
          // 当tail -F没有数据时,reader.readLine会阻塞,直到有数据到达
          while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));//event产生
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }

将java.lang.Process表明的本地进程的输出流重定向到Java的输入流中,当tail -F没有数据时,Java输入流的reader.readLine会阻塞,直到有新数据到达。获取到新数据后,首先是将数据封装成Event,若是超过了批量限制,就flushEventBatch flushEventBatch会将Event列表交给ChannelProcessor批量处理。异步

public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();
    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);
    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }
    return event;
  }
// ExecSource.flushEventBatch
private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

ExecSource是异步收集本地日志的实现,它不保证可靠性,好比Java平台建立的tail -F进程出问题了,那么目标日志文件的收集会收到影响。ExecSource的好处是性能比RPC方式要好,减小了网络的流量,同时避免了对应用程序的倾入性,能够无缝地接入。ide

相关文章
相关标签/搜索