常见的日志收集方式有两种,一种是经由本地日志文件作媒介,异步地发送到远程日志仓库,一种是基于RPC方式的同步日志收集,直接发送到远程日志仓库。这篇讲讲Flume NG如何从本地日志文件中收集日志。 ExecSource是用来执行本地shell命令,并把本地日志文件中的数据封装成Event事件流在Flume NG中流动。它的典型配置以下,指定source类型是exec,指定Source下游的Channel是哪一个,指定要执行的shell命令。最经常使用的命令就是tail -F命令,能够从本地日志文件中获取新追加的日志。java
producer.sources.s1.type = exec producer.sources.s1.channels = channel producer.sources.s1.command = tail -F /data/logs/test.log
看一下ExecSource的实现流程shell
public void start() { logger.info("Exec source starting with command:{}", command); executor = Executors.newSingleThreadExecutor(); runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. runnerFuture = executor.submit(runner); sourceCounter.start(); super.start(); logger.debug("Exec source started"); }
public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; String line = null; final List<Event> eventList = new ArrayList<Event>(); timedFlushService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat( "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); StderrReader stderrReader = new StderrReader(new BufferedReader( new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader-[" + command + "]"); stderrReader.setDaemon(true); stderrReader.start(); future = timedFlushService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { synchronized (eventList) { if(!eventList.isEmpty() && timeout()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Exception occured when processing event batch", e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } } }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } synchronized (eventList) { if(!eventList.isEmpty()) { flushEventBatch(eventList); //此将event发送到channel中 } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { logger.error("Failed to close reader for exec source", ex); } } exitCode = String.valueOf(kill()); } if(restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { logger.info("Command [" + command + "] exited with " + exitCode); } } while(restart); }
这里最主要的是步骤是在Java平台中使用Shell命令来获取本地日志文件的数据,主要的代码以下网络
try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // 当tail -F没有数据时,reader.readLine会阻塞,直到有数据到达 while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset)));//event产生 if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } }
将java.lang.Process表明的本地进程的输出流重定向到Java的输入流中,当tail -F没有数据时,Java输入流的reader.readLine会阻塞,直到有新数据到达。获取到新数据后,首先是将数据封装成Event,若是超过了批量限制,就flushEventBatch flushEventBatch会将Event列表交给ChannelProcessor批量处理。异步
public static Event withBody(byte[] body, Map<String, String> headers) { Event event = new SimpleEvent(); if(body == null) { body = new byte[0]; } event.setBody(body); if (headers != null) { event.setHeaders(new HashMap<String, String>(headers)); } return event; } // ExecSource.flushEventBatch private void flushEventBatch(List<Event> eventList){ channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); }
ExecSource是异步收集本地日志的实现,它不保证可靠性,好比Java平台建立的tail -F进程出问题了,那么目标日志文件的收集会收到影响。ExecSource的好处是性能比RPC方式要好,减小了网络的流量,同时避免了对应用程序的倾入性,能够无缝地接入。ide