原文地址:https://www.xilidou.com/2018/01/22/merge-request/java
在高并发系统中,咱们常常遇到这样的需求:系统产生大量的请求,可是这些请求实时性要求不高。咱们就能够将这些请求合并,达到必定数量咱们统一提交。最大化的利用系统性IO,提高系统的吞吐性能。git
因此请求合并框架须要考虑如下两个需求:github
咱们就聊聊一如何实现这样一个需求。spring
阅读这篇文章你将会了解到:shell
咱们就聊一聊实现这个东西的具体思路是什么。但愿你们可以学习到分析问题,设计模块的一些套路。编程
底层使用什么数据结构来持有须要合并的请求?安全
ArrayList
来持有。咱们可使用阻塞队列来持有须要合并的请求。LockSuppor.park()
和LockSuppor.unpark
来暂停和激活操做线程。通过上面的分析,咱们就有了这样一个数据结构:数据结构
private static class FlushThread<Item> implements Runnable{ private final String name; //队列大小 private final int bufferSize; //操做间隔 private int flushInterval; //上一次提交的时间。 private volatile long lastFlushTime; private volatile Thread writer; //持有数据的阻塞队列 private final BlockingQueue<Item> queue; //达成条件后具体执行的方法 private final Processor<Item> processor; //构造函数 public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) { this.name = name; this.bufferSize = bufferSize; this.flushInterval = flushInterval; this.lastFlushTime = System.currentTimeMillis(); this.processor = processor; this.queue = new ArrayBlockingQueue<>(queueSize); } //外部提交数据的方法 public boolean add(Item item){ boolean result = queue.offer(item); flushOnDemand(); return result; } //提供给外部的超时方法 public void timeOut(){ //超过两次提交超过期间间隔 if(System.currentTimeMillis() - lastFlushTime >= flushInterval){ start(); } } //解除线程的阻塞 private void start(){ LockSupport.unpark(writer); } //当前的数据是否大于提交的条件 private void flushOnDemand(){ if(queue.size() >= bufferSize){ start(); } } //执行提交数据的方法 public void flush(){ lastFlushTime = System.currentTimeMillis(); List<Item> temp = new ArrayList<>(bufferSize); int size = queue.drainTo(temp,bufferSize); if(size > 0){ try { processor.process(temp); }catch (Throwable e){ log.error("process error",e); } } } //根据数据的尺寸和时间间隔判断是否提交 private boolean canFlush(){ return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval; } @Override public void run() { writer = Thread.currentThread(); writer.setName(name); while (!writer.isInterrupted()){ while (!canFlush()){ //若是线程没有被打断,且不达到执行的条件,则阻塞线程 LockSupport.park(this); } flush(); } } }
一般咱们遇到定时相关的需求,首先想到的应该是使用 ScheduledThreadPoolExecutor
定时来调用FlushThread 的 timeOut 方法,若是你想到的是 Thread.sleep()
...那须要再努力学习,多看源码了。多线程
咱们使用的FlushThread
实现了 Runnable
因此咱们能够考虑使用线程池来持有多个FlushThread
。并发
因此咱们就有这样的代码:
public class Flusher<Item> { private final FlushThread<Item>[] flushThreads; private AtomicInteger index; //防止多个线程同时执行。增长一个随机数间隔 private static final Random r = new Random(); private static final int delta = 50; private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1); private static ExecutorService POOL = Executors.newCachedThreadPool(); public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) { this.flushThreads = new FlushThread[threads]; if(threads > 1){ index = new AtomicInteger(); } for (int i = 0; i < threads; i++) { final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor); flushThreads[i] = flushThread; POOL.submit(flushThread); //定时调用 timeOut()方法。 TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS); } } // 对 index 取模,保证多线程都能被add public boolean add(Item item){ int len = flushThreads.length; if(len == 1){ return flushThreads[0].add(item); } int mod = index.incrementAndGet() % len; return flushThreads[mod].add(item); } //上文已经描述 private static class FlushThread<Item> implements Runnable{ ...省略 } }
public interface Processor<T> { void process(List<T> list); }
咱们写个测试方法测试一下:
//实现 Processor 将 String 所有输出 public class PrintOutProcessor implements Processor<String>{ @Override public void process(List<String> list) { System.out.println("start flush"); list.forEach(System.out::println); System.out.println("end flush"); } }
public class Test { public static void main(String[] args) throws InterruptedException { Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor()); int index = 1; while (true){ stringFlusher.add(String.valueOf(index++)); Thread.sleep(1000); } } }
执行的结果:
start flush 1 2 3 end flush start flush 4 5 6 7 end flush
咱们发现并无达到10个数字就触发了flush。由于出发了超时提交,虽然尚未达到规定的5
个数据,但仍是执行了 flush。
若是咱们去除 Thread.sleep(1000);
再看看结果:
start flush 1 2 3 4 5 end flush start flush 6 7 8 9 10 end flush
每5个数一次提交。完美。。。。
一个比较生动的例子给你们讲解了一些多线程的具体运用。学习多线程应该多思考多动手,才会有比较好的效果。但愿这篇文章你们读完之后有所收获,欢迎交流。
github地址:https://github.com/diaozxin007/framework
徒手撸框架系列文章地址: