【Flume】TailDirSource源码理解

TaildirSource类图以下(列出主要类)
【Flume】TailDirSource源码理解node


TailDirSource类
TailDirSource继承了AbstractSource类,而AbstractSource类中channelProcessor属性负责将Source中的Event提交给Channel组件
TailDirSource类经过配置参数匹配日志文件,获取日志文件更新内容而且将已经读取的偏移量记录到特定的文件当中(position file)linux

configure()方法:
1.判断从配置文件加载的配置是否合法,其中包括了对filegroups,以及以filegroups为单位的文件路径是否存在等条件。
2.对batchSize,skipToEnd,writePosInterval,idleTimeout等变量进行初始化工做
batchSize定义了往Channel中发送Event的批量处理大小
skipToEnd定义了每次程序启动,对文件进行读取的时候,是否从文件尾部开始读取数据,或者从文件最开始读取。
writePosInterval,TaildirSource读取每一个监控文件都在位置文件中记录监控文件的已经读取的偏移量,writePosInterval则是定义了更新位置文件的间隔。
idleTimeout日志文件在idleTimeout间隔时间,没有被修改,文件将被关闭json

start()方法:
经过configure()初始化后的变量建立了ReliableTaildirEventReader对象,同时建立两个线程池idleFileChecker和positionWriter,分别用于监控日志文件和记录日志文件读取的偏移量。
idleFileChecker实现一个Runnable接口,遍历reader全部监控的文件,检查文件最后修改时间+idleTimeout是否小于当前时间,说明日志文件在idleTimeout时间内没有被修改,该文件将被关闭。windows

private class idleFileCheckerRunnable implements Runnable {
  @Override
  public void run() {
    try {
      long now = System.currentTimeMillis();
      for (TailFile tf : reader.getTailFiles().values()) {
        if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) {
          idleInodes.add(tf.getInode());
        }
      }
    } catch (Throwable t) {
      logger.error("Uncaught exception in IdleFileChecker thread", t);
    }
  }
}

positionWriter主要做用是记录日志文件读取的偏移量,以json格式("inode", inode, "pos", tf.getPos(), "file", tf.getPath()),其中inode是linux系统中特有属性,在适应其余系统(Windows等)日志采集时ReliableTaildirEventReader.getInode()方法须要修改(注意:在利用Linux系统上inode实现上,文件是经过inode记录日志读取偏移量。因此即便文件名改变了,也不影响日志读取,在我实现Window版本上,只采用了文件名对应日志读取偏移量,文件名改变影响日志读取)。pos则是记录的日志读取的偏移量,file记录了日志文件的路径dom

process()方法:
process方法记录了TailDirSource类中主要的逻辑,获取每一个监控的日志文件,调用tailFileProcess获取每一个日志文件的更新数据,并将每条记录转换为Event(具体细节要看ReliableTaildirEventReader的readEvents方法)ide

public Status process() {
  Status status = Status.READY;
  try {
    existingInodes.clear();
    existingInodes.addAll(reader.updateTailFiles());
    for (long inode : existingInodes) {
      TailFile tf = reader.getTailFiles().get(inode);
      if (tf.needTail()) {
        tailFileProcess(tf, true);
      }
    }
    closeTailFiles();
    try {
      TimeUnit.MILLISECONDS.sleep(retryInterval);
    } catch (InterruptedException e) {
      logger.info("Interrupted while sleeping");
    }
  } catch (Throwable t) {
    logger.error("Unable to tail files", t);
    status = Status.BACKOFF;
  }
  return status;
}

ReliableTaildirEventReader类
构造ReliableTaildirEventReader对象的时候,首先会判断各类必须参数是否合法等,而后加载position file获取每一个文件上次记录的日志文件读取的偏移量
loadPositionFile(String filePath) 不粘贴方法的具体代码,主要就是获取每一个监控日志文件的读取偏移量
readEvents()的各个不一样参数方法中,下面这个是最主要的,该方法获取当前日志文件的偏移量,调用TailFile.readEvents(numEvents, backoffWithoutNL, addByteOffset)方法将日志文件每行转换为Flume的消息对象Event,并循环将每一个event添加header信息。线程

public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
    throws IOException {
  if (!committed) {
    if (currentFile == null) {
      throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
    }
    logger.info("Last read was never committed - resetting position");
    long lastPos = currentFile.getPos();
    currentFile.updateFilePos(lastPos);
  }
  List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
  if (events.isEmpty()) {
    return events;
  }

  Map<String, String> headers = currentFile.getHeaders();
  if (annotateFileName || (headers != null && !headers.isEmpty())) {
    for (Event event : events) {
      if (headers != null && !headers.isEmpty()) {
        event.getHeaders().putAll(headers);
      }
      if (annotateFileName) {
        event.getHeaders().put(fileNameHeader, currentFile.getPath());
      }
    }
  }
  committed = false;
  return events;
}

openFile(File file, Map<String, String> headers, long inode, long pos) 方法根据日志文件对象,headers,inode和偏移量pos建立一个TailFile对象指针


TailFile类
TaildirSource经过TailFile类操做处理每一个日志文件,包含了RandomAccessFile类,以及记录日志文件偏移量pos,最新更新时间lastUpdated等属性
RandomAccessFile完美的符合TaildirSource的应用场景,RandomAccessFile支持使用seek()方法随机访问文件,配合position file中记录的日志文件读取偏移量,可以轻松简单的seek到文件偏移量,而后向后读取日志内容,并从新将新的偏移量记录到position file中。日志

readEvent(boolean backoffWithoutNL, boolean addByteOffset)方法:
下图描述了该方法的调用层级,readEvent简单的理解就是将每行日志转为Event消息体,方法最终调用的是readFile()方法。code

【Flume】TailDirSource源码理解

readLine()方法,有点难还在研究

public LineResult readLine() throws IOException {
  LineResult lineResult = null;
  while (true) {
    if (bufferPos == NEED_READING) {
      if (raf.getFilePointer() < raf.length()) {//当文件指针位置小于文件总长度的时候,就须要读取指针位置到文件最后的数据
        readFile();
      } else {
        if (oldBuffer.length > 0) {
          lineResult = new LineResult(false, oldBuffer);
          oldBuffer = new byte[0];
          setLineReadPos(lineReadPos + lineResult.line.length);
        }
        break;
      }
    }
    for (int i = bufferPos; i < buffer.length; i++) {
      if (buffer[i] == BYTE_NL) {
        int oldLen = oldBuffer.length;
        // Don't copy last byte(NEW_LINE)
        int lineLen = i - bufferPos;
        // For windows, check for CR
        if (i > 0 && buffer[i - 1] == BYTE_CR) {
          lineLen -= 1;
        } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
          oldLen -= 1;
        }
        lineResult = new LineResult(true,
            concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
        setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
        oldBuffer = new byte[0];
        if (i + 1 < buffer.length) {
          bufferPos = i + 1;
        } else {
          bufferPos = NEED_READING;
        }
        break;
      }
    }
    if (lineResult != null) {
      break;
    }
    // NEW_LINE not showed up at the end of the buffer
    oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length,
                                 buffer, bufferPos, buffer.length - bufferPos);
    bufferPos = NEED_READING;
  }
  return lineResult;
}

readFile()按BUFFER_SIZE(默认8KB)做为缓冲读取日志文件数据

private void readFile() throws IOException {
  if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
    buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
  } else {
    buffer = new byte[BUFFER_SIZE];
  }
  raf.read(buffer, 0, buffer.length);
  bufferPos = 0;
}
相关文章
相关标签/搜索