你所不知道的日志异步落库

在互联网设计架构过程当中,日志异步落库,俨然已是高并发环节中不可缺乏的一环。为何说是高并发环节中不可缺乏的呢? 缘由在于,若是直接用mq进行日志落库的时候,低并发下,生产端生产数据,而后由消费端异步落库,是没有什么问题的,并且性能也都是异常的好,估计tp99应该都在1ms之内。可是一旦并发增加起来,慢慢的你就发现生产端的tp99一直在增加,从1ms,变为2ms,4ms,直至send timeout。尤为在大促的时候,我司的系统就经历过这个状况,当时mq的发送耗时超过200ms,甚至一度有很多timeout产生。html

考虑到这种状况在高并发的状况下才出现,因此今天咱们就来探索更加可靠的方法来进行异步日志落库,保证所使用的方式不会由于太高的并发而出现接口ops持续降低甚至到不可用的状况。apache

 

方案一: 基于log4j的异步appender实现架构

此种方案,依赖于log4j。在log4j的异步appender中,经过mq进行生产消费入库。至关于在接口和mq之间创建了一个缓冲区,使得接口和mq的依赖分离,从而不让mq的操做影响接口的ops。并发

此种方案因为使用了异步方式,且因为异步的discard policy策略,当大量数据过来,缓冲区满了以后,会抛弃部分数据。此种方案适用于可以容忍数据丢失的业务场景,不适用于对数据完整有严格要求的业务场景。app

来看看具体的实现方式:less

首先,咱们须要自定义一个Appender,继承自log4j的AppenderSkeleton类,实现方式以下:dom

public class AsyncJmqAppender extends AppenderSkeleton {

    @Resource(name = "messageProducer")
    private MessageProducer messageProducer;

    @Override
    protected void append(LoggingEvent loggingEvent) {
        asyncPushMessage(loggingEvent.getMessage());
    }

    /**
     * 异步调用jmq输出日志
     * @param message
     */
    private void asyncPushMessage(Object message) {

        CompletableFuture.runAsync(() -> {

            Message messageConverted = (Message) message;

            try {
                messageProducer.send(messageConverted);
            } catch (JMQException e) {
                e.printStackTrace();
            }

        });
    }


    @Override
    public boolean requiresLayout() {
        return false;
    }

    @Override
    public void close() {

    }
}

而后在log4j.xml中,为此类进行配置:异步

<!--异步JMQ appender-->
<appender name="async_mq_appender" class="com.jd.limitbuy.common.util.AsyncJmqAppender">
    <!-- 设置File参数:日志输出文件名 -->
    <param name="File" value="D:/export/Instances/order/server1/logs/order.async.jmq" />
    <!-- 设置是否在从新启动服务时,在原有日志的基础添加新日志 -->
    <param name="Append" value="true" />
    <!-- 设置文件大小 -->
    <param name="MaxFileSize" value="10KB" />
    <!-- 设置文件备份 -->
    <param name="MaxBackupIndex" value="10000" />
    <!-- 设置输出文件项目和格式 -->
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%m%n" />
    </layout>
</appender>
<logger name="async_mq_appender_logger">
    <appender-ref ref="async_mq_appender"/>
</logger>

最后就能够按照以下的方式进行正常使用了:async

private static Logger logger = LoggerFactory.getLogger("filelog_appender_logger");

注意: 此处须要注意log4j的一个性能问题。在log4j的conversionPattern中,匹配符最好不要出现 C% L%通配符,压测实践代表,这两个通配符会致使log4j打日志的效率下降10倍。ide

方案一很简便,且剥离了接口直接依赖mq致使的性能问题。可是没法解决数据丢失的问题(可是咱们其实能够在本地搞个策略落盘来不及处理的数据,能够大大的减小数据丢失的概率)。可是不少的业务场景,是须要数据不丢失的,因此这就衍生出咱们的另外一套方案来。

 

方案二:增量消费log4j日志

此种方式,是开启worker在后台增量消费log4j的日志信息,和接口彻底脱离。此种方式相比方案一,能够保证数据的不丢失,且能够作到彻底不影响接口的ops。可是此种方式,因为是后台worker在后台启动进行扫描,会致使落库的数据慢一些,好比一分钟以后才落库完毕。因此适用于对落库数据实时性不高的场景。

具体的实现步骤以下:

首先,将须要进行增量消费的日志统一打到一个文件夹,以天为单位,天天生成一个带时间戳日志文件。因为log4j不支持直接带时间戳的日志文件生成,因此这里须要引入log4j.extras组件,而后配置log4j.xml以下:

image

以后在代码中的申明方式以下:

private static Logger businessLogger = LoggerFactory.getLogger("file_rolling_logger");

最后在须要记录日志的地方使用方式以下:

businessLogger.error(JsonUtils.toJSONString(myMessage))

这样就能够将日志打印到一个单独的文件中,且按照日期,天天生成一个。

而后,当日志文件生成完毕后,咱们就能够开启咱们的worker进行增量消费了,这里的增量消费方式,咱们选择RandomAccessFile这个类来进行,因为其独特的位点读取方式,可使得咱们很是方便的根据位点的位置来消费增量文件,从而避免了逐行读取这种低效率的实现方式。

注意,为每一个日志文件都单首创建了一个位点文件,里面存储了对应的文件的位点读取信息。当worker扫描开始的时候,会首先读取位点文件里面的位点信息,而后找到相应的日志文件,从位点信息位置开始进行消费。这就是整个增量消费worker的核心。具体代码实现以下(代码太长,作了折叠):

/**
 * @Description: 增量日志扫描worker
 * @Detail: 此worker主要用来扫描增量日志,日志自己会在不停的插入中,此worker会不停的扫描此日志来将数据上传到kafka集群
 * @date 2018-04-08 10:30
 */
public class LimitBuyScanWorker {

    /**
     * 日志和位点文件保存的目录
     */
    private static final String FILE_DIRECTORY = "D:\\export\\Instances\\order\\server1\\logs\\";

    /**
     * 每次步进的长度,此处为1000行
     */
    private static final int SCAN_STEP = 1000;

    /**
     * 日志文件名前缀
     */
    private static final String LOG_FILE_PREFIX = "limitbuy.soa.order.";

    /**
     * 位点文件名后缀
     */
    private static final String OFT_FILE_APPENDIX = ".offset";

    public void logScanner() {

        //当前时间
        Date currentDate = new Date();

        //今日
        String currentDay = DateUtil.formatDate("yyyy-MM-dd", currentDate);
        //今日日志文件路径
        String currentLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay;
        logger.error("今日的日志文件路径:" + currentLogFilePath);
        //今日位点文件路径
        String currentOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay + OFT_FILE_APPENDIX;

        //昨日
        String yesterDay = DateUtil.formatDate("yyyy-MM-dd", DateUtil.queryPlusDay(currentDate, -1));
        //昨日日志文件路径
        String yesterdayLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay;
        logger.error("昨日的日志文件路径:" + yesterdayLogFilePath);
        //昨日位点文件路径
        String yesterdayOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay + OFT_FILE_APPENDIX;

        //先检测昨日位点和文件体积是否一致,不一致则表明未消费完毕
        boolean yesterdayConsumedOK = checkIfConsumeOK(yesterdayLogFilePath, yesterdayOffsetFilePath);
        logger.error("昨日的日志文件已被消费完毕:" + yesterdayConsumedOK);

        //昨日的文件已扫描完毕
        if (yesterdayConsumedOK) {
            //扫描并消费今日增量日志
            scanAndConsumeLog(currentLogFilePath, currentOffsetFilePath);
        }
        //昨日的文件未扫描完毕
        else {
            //扫描并消费昨日增量日志
            scanAndConsumeLog(yesterdayLogFilePath, yesterdayOffsetFilePath);
        }
    }

    /**
     * 检测日志是否被扫描消费完毕,true:消费完毕;false:未消费完毕
     * @Description 此举主要防止log4j在零点大促开始的时候,忽然的滚动文件形成的部分增量日志不会被消费的问题
     * @param logFilePath
     * @param offsetFilePath
     */
    private boolean checkIfConsumeOK(String logFilePath, String offsetFilePath) {
        try {

            //打开文件
            RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");

            //获得当前位点
            long currentOffset = checkOffset(offsetFilePath);

            //获得文件总长
            long currentFileLength = randomAccessFile.length();

            //比对
            if (currentOffset >= currentFileLength) {
                return true;
            }
            return false;
        } catch (FileNotFoundException e) {
            logger.error("com.jd.limitbuy.service.worker.logScanner 出错(FileNotFoundException):", e);
            AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());
            return false;
        } catch (IOException e) {
            logger.error("com.jd.limitbuy.service.worker.logScanner 出错(IOException):", e);
            AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());
            return false;
        }
    }

    /**
     * 扫描并消费增量日志
     * @param logFilePath
     * @param offsetFilePath
     */
    private void scanAndConsumeLog(String logFilePath, String offsetFilePath) {
        try {

            RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");

            //获得当前位点
            long currentOffset = checkOffset(offsetFilePath);
            logger.error("开始位点==>" + currentOffset);

            //重置位点到当前位点
            if (currentOffset <= randomAccessFile.length()) {
                randomAccessFile.seek(currentOffset);
            }

            //读取@SCAN_STEP行
            for (long i = currentOffset; i < currentOffset + SCAN_STEP; i++) {
                //获得行
                String result = randomAccessFile.readLine();
                //若是内容不为空
                if (StringUtil.isNotBlank(result)) {
                    //TODO 逻辑实现
                }
            }

            //读取@SCAN_STEP行以后的位点
            logger.error("读取" + SCAN_STEP + "行以后位点==>" + randomAccessFile.getFilePointer());

            //若是update不成功,能够不处理,后面扫描进来从新过一遍便可
            updateOffset(randomAccessFile.getFilePointer(), offsetFilePath);

            logger.error("文件总长==>" + randomAccessFile.length());

        } catch (FileNotFoundException e) {
            logger.error("com.jd.limitbuy.service.worker.logScanner 出错(FileNotFoundException):", e);
            AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());
        } catch (IOException e) {
            logger.error("com.jd.limitbuy.service.worker.logScanner 出错(IOException):", e);
            AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());
        }
    }

    /**
     * 校验位点
     *     不存在则建立并赋值为0
     *     已存在则更新位点
     * @param offsetFilePath
     * @return
     * @throws IOException
     */
    private long checkOffset(String offsetFilePath) throws IOException {

        File offsetFile = new File(offsetFilePath);

        //若是位点文件不存在,则建立位点文件并返回0
        if (!offsetFile.exists()) {
            updateOffset(0, offsetFilePath);
            return 0;
        }
        //若是位点文件存在,则返回位点文件内容
        else {
            FileReader fileReader = new FileReader(offsetFilePath);
            StringBuilder stringBuilder = new StringBuilder();
            char[] bytesChar = new char[50];
            fileReader.read(bytesChar);
            fileReader.close();

            for (char c : bytesChar) {
                stringBuilder.append(c);
            }

            String filteredOffset = stringBuilder.toString().trim();

            if (StringUtil.isNotBlank(filteredOffset)) {
                return Long.parseLong(filteredOffset);
            } else {
                return 0;
            }
        }
    }

    /**
     * 更新位点信息
     * @param offset
     * @param offsetFilePath
     */
    private void updateOffset(long offset, String offsetFilePath) throws IOException {
        FileWriter fileWriter = new FileWriter(offsetFilePath);
        fileWriter.write(offset + "");
        fileWriter.flush();
        fileWriter.close();
    }
}

此种方式因为worker扫描是每隔一段时间启动一次进行消费,因此致使数据从产生到入库,可能经历时间超过一分钟以上,可是在一些对数据延迟要求比较高的业务场景,好比库存扣减,是不能容忍的,因此这里咱们就引伸出第三种作法,基于内存文件队列的异步日志消费。

 

方案三:基于内存文件队列的异步日志消费

因为方案一和方案二都严重依赖log4j,且方案自己都存在着要么丢数据,要么入库时间长的缺点,因此都并非那么尽如人意。可是本方案的作法,既解决了数据丢失的问题,又解决了数据入库时间被拉长的尴尬,因此是终极解决之道。并且在大促销过程当中,此种方式经历了实战检验,能够大面积的推广使用。

此方案中提到的内存文件队列,是我司自研的一款基于RandomAccessFile和MappedByteBuffer实现的内存文件队列。队列核心使用了ArrayBlockingQueue,并提供了produce方法,进行数据入管道操做,提供了consume方法,进行数据出管道操做。并且后台有一个worker一直启动着,每隔5ms或者遍历了100条数据以后,就将数据落盘一次,以防数据丢失。具体的设计,就这么多,感兴趣的能够根据我提供的信息,本身实践一下。

因为有此中间件的加持,数据生产的时候,只须要入压入管道,而后消费端进行消费便可。未被消费的数据,会进行落盘操做,谨防数据丢失。当大促的时候,大量数据涌来的时候,管道满了的状况下会阻塞接口,数据不会被抛弃。虽然可能会致使接口在那一瞬间无响应,可是因为有落盘操做和消费操做(此操做操控的是JVM堆外内存数据,不受GC的影响,因此不会出现操做暂停的状况,为何呢?由于用了MappedByteBuffer),此种阻塞并未影响到接口总体的ops。

在实际使用的时候,ArrayBlockingQueue做为核心队列,显然是全局加锁的,后续咱们考虑升级为无锁队列,因此将会参考Netty中的有界无锁队列:MpscArrayQueue。预计性能将会再好一些。

受限于公司政策,我仅提供大体思路,可是不会提供具体代码,有问题评论区交流吧。

 

上面就是在进行异步日志消费的时候,我所经历的三个阶段,而且一步一步的优化到目前的方式。虽然过程曲折,可是结果使人欢欣鼓舞。若是喜欢就给个推荐,后续我将会持续更新你所不知道的系列,以期达到抛砖引玉的效果。

相关文章
相关标签/搜索