本文主要讲一下SynchronousQueue。html
SynchronousQueue,实际上它不是一个真正的队列,由于它不会为队列中元素维护存储空间。与其余队列不一样的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。java
若是以洗盘子的比喻为例,那么这就至关于没有盘架,而是将洗好的盘子直接放入下一个空闲的烘干机中。这种实现队列的方式看似很奇怪,但因为能够直接交付工做,从而下降了将数据从生产者移动到消费者的延迟。(在传统的队列中,在一个工做单元能够交付以前,必须经过串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操做。)缓存
直接交付方式还会将更多关于任务状态的信息反馈给生产者。当交付被接受时,它就知道消费者已经获得了任务,而不是简单地把任务放入一个队列——这种区别就比如将文件直接交给同事,仍是将文件放到她的邮箱中并但愿她能尽快拿到文件。dom
由于SynchronousQueue没有存储功能,所以put和take会一直阻塞,直到有另外一个线程已经准备好参与到交付过程当中。仅当有足够多的消费者,而且老是有一个消费者准备好获取交付的工做时,才适合使用同步队列。ide
public class SynchronousQueueExample { static class SynchronousQueueProducer implements Runnable { protected BlockingQueue<String> blockingQueue; final Random random = new Random(); public SynchronousQueueProducer(BlockingQueue<String> queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { String data = UUID.randomUUID().toString(); System.out.println("Put: " + data); blockingQueue.put(data); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class SynchronousQueueConsumer implements Runnable { protected BlockingQueue<String> blockingQueue; public SynchronousQueueConsumer(BlockingQueue<String> queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { String data = blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " take(): " + data); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>(); SynchronousQueueProducer queueProducer = new SynchronousQueueProducer( synchronousQueue); new Thread(queueProducer).start(); SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer( synchronousQueue); new Thread(queueConsumer1).start(); SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer( synchronousQueue); new Thread(queueConsumer2).start(); } }
插入数据的线程和获取数据的线程,交替执行ui
Executors.newCachedThreadPool()this
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
因为ThreadPoolExecutor内部实现任务提交的时候调用的是工做队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),所以,在使用SynchronousQueue做为工做队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程可以从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工做队列)。此时,ThreadPoolExecutor会新建一个新的工做者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。线程
因此,使用SynchronousQueue做为工做队列,工做队列自己并不限制待执行的任务的数量。但此时须要限定线程池的最大大小为一个合理的有限值,而不是Integer.MAX_VALUE,不然可能致使线程池中的工做者线程的数量一直增长到系统资源所没法承受为止。code
若是应用程序确实须要比较大的工做队列容量,而又想避免无界工做队列可能致使的问题,不妨考虑SynchronousQueue。SynchronousQueue实现上并不使用缓存空间。htm
使用SynchronousQueue的目的就是保证“对于提交的任务,若是有空闲线程,则使用空闲线程来处理;不然新建一个线程来处理任务”。