并发编程源码分析一之Log接口

并发编程实践部分源码使用github里面部分开源框架源码,从新整理,若有错误或版权问题,请指出指正。java

欢迎star、fork,读书笔记系列会同步更新mysql

gitgit

https://github.com/xuminwlt/j360-jdkgithub

模块j360-jdk-applicationsql


前言

日常开发过程当中使用并发框架的场景在SSH的框架里面直接接触的机会并非不少,基本上大量使用的就是部分原子类,然而在底层实现中,为了实现高并发的可能,jdk并发编程框架几乎所谓不在,每一个并发编程类以及其参数都有各自所擅长的场景,并发编程框架不是一卡通,可是却很像搭积木,经过组合几乎不少的并发场景都不在话下,这里就是用批量日志管理接口的场景中,如何使用并发编程类实现高并发下的日志管理。数据库


内容

业务日志在每一个系统中都是不可或缺的功能点,业务日志的输出也有多种的输出形式,会话界面、文本、数据库等等,一般业务日志在系统中的埋点会做为一个日志单元,一个业务系统天天产生的日志数量=每一个处理流程*埋点*pv(或者同单位request),在高并发系统中,单个集群节点处理日志也一般采用并发框架批量进行处理,这里分别设置两个条件来对收集的日志进行处理:
编程

1:每nSeconds批量将收集的日志队列输出并发

2:日志队列占用的内存>设置的阈值(保护n秒内的队列过大)app


类图

包结构

类说明

分别表明工厂类、日志方法接口类、代理类以及实现类的工厂类、方法实现类
框架

先看接口类的方法:

public interface BizLogger {
    public void log(BizLogPo bizLogPo);

    public void log(List<BizLogPo> bizLogPos);
}

分别处理单条和多条日志

再看会话打印实现类:

private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName());

@Override
public void log(BizLogPo jobLogPo) {
    LOGGER.info(JSONUtils.toJSONString(jobLogPo));
}

@Override
public void log(List<BizLogPo> jobLogPos) {
    for (BizLogPo jobLogPo : jobLogPos) {
        log(jobLogPo);
    }
}

这里使用slf4j接口类做为会话打印的输出接口,实现类一样可使用mysql、file等输出形式

经过工厂类配置得到实现类,一般会使用多个实现类提供接口,而多个实现类难以同时对实现过程进行控制,这里引入了代理类来调用具体的实现类,而并发编程框架一般在代理类中进行集中控制管理:

public class BizLoggerDelegate implements BizLogger {

    private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class);

    // 3S 检查输盘一第二天志
    private int flushPeriod;

    private BizLogger jobLogger;
    private boolean lazyLog = false;
    private ScheduledExecutorService executor;
    private ScheduledFuture scheduledFuture;
    private BlockingQueue<BizLogPo> memoryQueue;
    // 日志批量刷盘数量
    private int batchFlushSize = 100;
    private int overflowSize = 10000;
    // 内存中最大的日志量阀值
    private int maxMemoryLogSize;
    private AtomicBoolean flushing = new AtomicBoolean(false);

    public BizLoggerDelegate(Config config) {
        BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() {
            @Override
            public BizLogger getJobLogger() {
                return new ConsoleBizLogger();
            }
        };
        jobLogger = jobLoggerFactory.getJobLogger();
        lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false);
        if (lazyLog) {

            // 无界Queue
            memoryQueue = new LinkedBlockingQueue<BizLogPo>();
            maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000);
            flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3);

            executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger"));
            scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        if (flushing.compareAndSet(false, true)) {
                            checkAndFlush();
                        }
                    } catch (Throwable t) {
                        LOGGER.error("CheckAndFlush log error", t);
                    }
                }
            }, flushPeriod, flushPeriod, TimeUnit.SECONDS);

        }
    }

    /**
     * 检查内存中是否有日志,若是有就批量刷盘
     */
    private void checkAndFlush() {
        try {
            int nowSize = memoryQueue.size();
            if (nowSize == 0) {
                return;
            }
            List<BizLogPo> batch = new ArrayList<BizLogPo>();
            for (int i = 0; i < nowSize; i++) {
                BizLogPo jobLogPo = memoryQueue.poll();
                batch.add(jobLogPo);

                if (batch.size() >= batchFlushSize) {
                    flush(batch);
                }
            }
            if (batch.size() > 0) {
                flush(batch);
            }

        } finally {
            flushing.compareAndSet(true, false);
        }
    }

    private void checkOverflowSize() {
        if (memoryQueue.size() > overflowSize) {
            throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available");
        }
    }

    private void flush(List<BizLogPo> batch) {
        boolean flushSuccess = false;
        try {
            jobLogger.log(batch);
            flushSuccess = true;
        } finally {
            if (!flushSuccess) {
                memoryQueue.addAll(batch);
            }
            batch.clear();
        }
    }

    /**
     * 检查内存中的日志量是否超过阀值,若是超过须要批量刷盘日志
     */
    private void checkCapacity() {
        if (memoryQueue.size() > maxMemoryLogSize) {
            // 超过阀值,须要批量刷盘
            if (flushing.compareAndSet(false, true)) {
                // 这里能够采用new Thread, 由于这里只会同时new一个
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            checkAndFlush();
                        } catch (Throwable t) {
                            LOGGER.error("Capacity full flush error", t);
                        }
                    }
                }).start();
            }
        }
    }

    @Override
    public void log(BizLogPo jobLogPo) {
        if (jobLogPo == null) {
            return;
        }
        if (lazyLog) {
            checkOverflowSize();
            memoryQueue.offer(jobLogPo);
            checkCapacity();
        } else {
            jobLogger.log(jobLogPo);
        }
    }

    @Override
    public void log(List<BizLogPo> jobLogPos) {
        if (CollectionUtils.isEmpty(jobLogPos)) {
            return;
        }
        if (lazyLog) {
            checkOverflowSize();
            for (BizLogPo jobLogPo : jobLogPos) {
                memoryQueue.offer(jobLogPo);
            }
            // checkCapacity
            checkCapacity();
        } else {
            jobLogger.log(jobLogPos);
        }
    }

}


这里写一个测试类测试下结果:

@Test
public void loggerTest() throws InterruptedException {
    Config config = new Config();
    config.setParameter("biz.logger","console");

    List<BizLogPo> list = new ArrayList<BizLogPo>();
    for(int i =0;i<=10;i++){
        BizLogPo jobLogPo = new BizLogPo();
        jobLogPo.setMsg("hello" + i);
        list.add(jobLogPo);
    }
    TimeUnit.SECONDS.sleep(5);
    BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config);
    jobLoggerDelegate.log(list);
}

一般开发使用Spring环境时,新增Spring适配工厂类,经过Spring配置下:

public class BizLoggerFactoryBean implements FactoryBean<BizLogger>,
        InitializingBean, DisposableBean {

        public BizLogger getBizLogger() {
                return bizLogger;
        }

        public void setBizLogger(BizLogger bizLogger) {
                this.bizLogger = bizLogger;
        }

        private BizLogger bizLogger;

        @Override
        public void destroy() throws Exception {

        }

        @Override
        public BizLogger getObject() throws Exception {
                return bizLogger;
        }

        @Override
        public Class<?> getObjectType() {
                return bizLogger.getClass();
        }

        @Override
        public boolean isSingleton() {
                return true;
        }

        @Override
        public void afterPropertiesSet() throws Exception {

        }
}

新增Spring的配置类:

LoggerSpringConfig ApplicationContextAware {

    ApplicationContext (ApplicationContext applicationContext) BeansException {
        .= applicationContext}

    (=)
    BizLogger () Exception {
        BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()}

}

新增Spring的测试类:

@Test
public void loggerSpringTest(){
    ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class);
    BizLogger bizLogger = (BizLogger) context.getBean("bizLogger");
    List<BizLogPo> list = new ArrayList<BizLogPo>();
    for(int i =0;i<=10;i++){
        BizLogPo jobLogPo = new BizLogPo();
        jobLogPo.setMsg("hello" + i);
        list.add(jobLogPo);
    }
    bizLogger.log(list);
}


实际使用中,把LoggerDelelate配置到Bean里面便可。

到这里一个简单的使用并发框架实现的日志处理接口完成了,固然实现类中可使用批量SQL的形式进行处理,加强sql的吞吐量。

相关文章
相关标签/搜索