以前由于找实习的缘故,博客1个多月没有写了。找实习的经历总算告一段落,如今从新更新博客,此次的内容是分析Java并发包中的阻塞队列
关于阻塞队列,我以前是一直充满好奇,很好奇这个阻塞是怎么实现。如今咱们先看一个该抽象类的实现类ArrayBlockingQueue。下面所有的代码均在githubjava
ArrayBlockingQueue顾名思义是一种数组形式的阻塞队列,其天然就有数组的特色,即队列的长度不可改变,只有初始化的时候指定。
下面,咱们看一下例子。node
public class ArrayBlock { private BlockingQueue<String> blockingQueue; public ArrayBlock(){ blockingQueue = new ArrayBlockingQueue<String>(3); } public BlockingQueue<String> getBlockingQueue() { return blockingQueue; } }
建立一个大小为3的ArrayBlockingQueue,下面是一个生产者和消费者,经过ArrayBlockingQueue实现生产者/消费者模型。git
public class Producer extends Thread { private BlockingQueue<String> blockingQueue; @Override public void run() { super.run(); for (int i = 0 ; i < 5;i++) { try { blockingQueue.put(i + ""); System.out.println(getName() + " 生产数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } public Producer(ArrayBlock arrayBlock){ this.setName("Producer"); blockingQueue = arrayBlock.getBlockingQueue(); } } public class Costumer extends Thread{ private BlockingQueue<String> blockingQueue; public Costumer(ArrayBlock arrayBlock) { blockingQueue = arrayBlock.getBlockingQueue(); this.setName("Costumer"); } @Override public void run() { super.run(); while (true) { try { Thread.sleep(6000); String str = blockingQueue.take(); System.out.println(getName() + " 取出数据 " + str); } catch (InterruptedException e) { e.printStackTrace(); } } } }
测试过程就不放了,直接放出结果:github
Producer 生产数据 Producer 生产数据 Producer 生产数据 Costumer 取出数据 0 Producer 生产数据 Costumer 取出数据 1 Producer 生产数据 Costumer 取出数据 2 Costumer 取出数据 3 Costumer 取出数据 4
这能够看出put方法与take方法均是阻塞的方法。当队列已经满的时候,就会阻塞放入方法,当队列为空的时候,就会阻塞取出方法。
下面,咱们主要看这个两个方法,到底是如何实现阻塞的。算法
** put方法 **数组
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
put方法是将元素放入到队列中,这里面能够看出是用过Lock类与Condition类来实现的,即经过等待/通知机制实现的阻塞队列。这里notFull是一个条件,当队列已经满的时候,就会执行await方法,若是没有满就执行入队(enqueue)方法。这里,判断队列已满用的是count == items.length
。接下来,咱们看一下take方法,来看看取数据的阻塞。缓存
** take方法**安全
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
这里,与put方法相似,当元素为0时,就会执行await方法,上面方法中都没有直接说明signal方法的执行。其实该方法是入队与出队的方法中实现的。也就是当执行notFull.await()
时,是经过dequeue()
方法来通知中止等待的,能够放入元素。当执行到notEmpty.await()
时,是经过enqueue
来通知结束阻塞,能够取出元素。并发
LinkedBlockingQueue顾名思义是一个链表形式的阻塞队列,不一样于ArrayBlockingQueue。若是不指定容量,则默认是Integer.MAX_VALUE。也就是说他是一个无界阻塞队列。他的例子与上面的相似,可是其put与take方法实现不一样于ArrayBlockingQueue,但二者大体思路一致。咱们只看一下put实现:less
** put方法**
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
这里阻塞的本质实现也是经过Condition类的等待/通知机制。可是有几点不一样:
第一 这里用了一个原子类的count计数,官方的给的注释是即便没有锁来提供保护,也能保证线程安全,实现wait guard。
第二 ArrayBlockingQueue的通知是在入队与出队的方法中,LinkedBlockingQueue则不是,而且插入以后不满的时候,还有通知其余await的线程。
第三 ArrayBlockingQueue的lock一直是一个,也就是put/take是用的一个锁,放与取没法实现并行。可是LinkedBlockingQueue是两个锁,放一个锁,取一个锁,能够实现put/take的并行,要高效一些。
SynchronousQueue顾名思义是同步队列,特色不一样于上面的阻塞队列,他是一个无界非缓存的队列,准确说他不存储元素,放入的元素,只有等待取走元素以后才能放入。也就是说任意时刻:
元素并不会被生产者存在队列中,而是直接生产者与消费者进行交互。
其实现是利用无锁算法,能够参考SynchronousQueue实现
还有一点须要注意,同步队列支持公平性与非公平性。公平性是利用队列来管理多余生产者与消费者,非公平性是利用栈来管理多余生产者与消费者。