并发编程实践部分源码使用github里面部分开源框架源码,从新整理,若有错误或版权问题,请指出指正。java
欢迎star、fork,读书笔记系列会同步更新mysql
gitgit
https://github.com/xuminwlt/j360-jdkgithub
模块j360-jdk-applicationsql
日常开发过程当中使用并发框架的场景在SSH的框架里面直接接触的机会并非不少,基本上大量使用的就是部分原子类,然而在底层实现中,为了实现高并发的可能,jdk并发编程框架几乎所谓不在,每一个并发编程类以及其参数都有各自所擅长的场景,并发编程框架不是一卡通,可是却很像搭积木,经过组合几乎不少的并发场景都不在话下,这里就是用批量日志管理接口的场景中,如何使用并发编程类实现高并发下的日志管理。数据库
业务日志在每一个系统中都是不可或缺的功能点,业务日志的输出也有多种的输出形式,会话界面、文本、数据库等等,一般业务日志在系统中的埋点会做为一个日志单元,一个业务系统天天产生的日志数量=每一个处理流程*埋点*pv(或者同单位request),在高并发系统中,单个集群节点处理日志也一般采用并发框架批量进行处理,这里分别设置两个条件来对收集的日志进行处理:
编程
1:每nSeconds批量将收集的日志队列输出并发
2:日志队列占用的内存>设置的阈值(保护n秒内的队列过大)app
分别表明工厂类、日志方法接口类、代理类以及实现类的工厂类、方法实现类
框架
先看接口类的方法:
public interface BizLogger { public void log(BizLogPo bizLogPo); public void log(List<BizLogPo> bizLogPos); }
分别处理单条和多条日志
再看会话打印实现类:
private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName()); @Override public void log(BizLogPo jobLogPo) { LOGGER.info(JSONUtils.toJSONString(jobLogPo)); } @Override public void log(List<BizLogPo> jobLogPos) { for (BizLogPo jobLogPo : jobLogPos) { log(jobLogPo); } }
这里使用slf4j接口类做为会话打印的输出接口,实现类一样可使用mysql、file等输出形式
经过工厂类配置得到实现类,一般会使用多个实现类提供接口,而多个实现类难以同时对实现过程进行控制,这里引入了代理类来调用具体的实现类,而并发编程框架一般在代理类中进行集中控制管理:
public class BizLoggerDelegate implements BizLogger { private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class); // 3S 检查输盘一第二天志 private int flushPeriod; private BizLogger jobLogger; private boolean lazyLog = false; private ScheduledExecutorService executor; private ScheduledFuture scheduledFuture; private BlockingQueue<BizLogPo> memoryQueue; // 日志批量刷盘数量 private int batchFlushSize = 100; private int overflowSize = 10000; // 内存中最大的日志量阀值 private int maxMemoryLogSize; private AtomicBoolean flushing = new AtomicBoolean(false); public BizLoggerDelegate(Config config) { BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() { @Override public BizLogger getJobLogger() { return new ConsoleBizLogger(); } }; jobLogger = jobLoggerFactory.getJobLogger(); lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false); if (lazyLog) { // 无界Queue memoryQueue = new LinkedBlockingQueue<BizLogPo>(); maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000); flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3); executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger")); scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { if (flushing.compareAndSet(false, true)) { checkAndFlush(); } } catch (Throwable t) { LOGGER.error("CheckAndFlush log error", t); } } }, flushPeriod, flushPeriod, TimeUnit.SECONDS); } } /** * 检查内存中是否有日志,若是有就批量刷盘 */ private void checkAndFlush() { try { int nowSize = memoryQueue.size(); if (nowSize == 0) { return; } List<BizLogPo> batch = new ArrayList<BizLogPo>(); for (int i = 0; i < nowSize; i++) { BizLogPo jobLogPo = memoryQueue.poll(); batch.add(jobLogPo); if (batch.size() >= batchFlushSize) { flush(batch); } } if (batch.size() > 0) { flush(batch); } } finally { flushing.compareAndSet(true, false); } } private void checkOverflowSize() { if (memoryQueue.size() > overflowSize) { throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available"); } } private void flush(List<BizLogPo> batch) { boolean flushSuccess = false; try { jobLogger.log(batch); flushSuccess = true; } finally { if (!flushSuccess) { memoryQueue.addAll(batch); } batch.clear(); } } /** * 检查内存中的日志量是否超过阀值,若是超过须要批量刷盘日志 */ private void checkCapacity() { if (memoryQueue.size() > maxMemoryLogSize) { // 超过阀值,须要批量刷盘 if (flushing.compareAndSet(false, true)) { // 这里能够采用new Thread, 由于这里只会同时new一个 new Thread(new Runnable() { @Override public void run() { try { checkAndFlush(); } catch (Throwable t) { LOGGER.error("Capacity full flush error", t); } } }).start(); } } } @Override public void log(BizLogPo jobLogPo) { if (jobLogPo == null) { return; } if (lazyLog) { checkOverflowSize(); memoryQueue.offer(jobLogPo); checkCapacity(); } else { jobLogger.log(jobLogPo); } } @Override public void log(List<BizLogPo> jobLogPos) { if (CollectionUtils.isEmpty(jobLogPos)) { return; } if (lazyLog) { checkOverflowSize(); for (BizLogPo jobLogPo : jobLogPos) { memoryQueue.offer(jobLogPo); } // checkCapacity checkCapacity(); } else { jobLogger.log(jobLogPos); } } }
这里写一个测试类测试下结果:
@Test public void loggerTest() throws InterruptedException { Config config = new Config(); config.setParameter("biz.logger","console"); List<BizLogPo> list = new ArrayList<BizLogPo>(); for(int i =0;i<=10;i++){ BizLogPo jobLogPo = new BizLogPo(); jobLogPo.setMsg("hello" + i); list.add(jobLogPo); } TimeUnit.SECONDS.sleep(5); BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config); jobLoggerDelegate.log(list); }
一般开发使用Spring环境时,新增Spring适配工厂类,经过Spring配置下:
public class BizLoggerFactoryBean implements FactoryBean<BizLogger>, InitializingBean, DisposableBean { public BizLogger getBizLogger() { return bizLogger; } public void setBizLogger(BizLogger bizLogger) { this.bizLogger = bizLogger; } private BizLogger bizLogger; @Override public void destroy() throws Exception { } @Override public BizLogger getObject() throws Exception { return bizLogger; } @Override public Class<?> getObjectType() { return bizLogger.getClass(); } @Override public boolean isSingleton() { return true; } @Override public void afterPropertiesSet() throws Exception { } }
新增Spring的配置类:
LoggerSpringConfig ApplicationContextAware { ApplicationContext (ApplicationContext applicationContext) BeansException { .= applicationContext} (=) BizLogger () Exception { BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()} }
新增Spring的测试类:
@Test public void loggerSpringTest(){ ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class); BizLogger bizLogger = (BizLogger) context.getBean("bizLogger"); List<BizLogPo> list = new ArrayList<BizLogPo>(); for(int i =0;i<=10;i++){ BizLogPo jobLogPo = new BizLogPo(); jobLogPo.setMsg("hello" + i); list.add(jobLogPo); } bizLogger.log(list); }
实际使用中,把LoggerDelelate配置到Bean里面便可。
到这里一个简单的使用并发框架实现的日志处理接口完成了,固然实现类中可使用批量SQL的形式进行处理,加强sql的吞吐量。