什么是阻塞队列 BlockingQueuejava
队列是一种数据结构,它的特色是先进先出(First In First Out),它有两个基本操做:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,经常使用于生产-消费场景。编程
BlockingQueue 是 Java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,若是队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,若是队列已空,线程将会阻塞等待直到队列非空。并发包下不少高级同步类的实现都是基于 BlockingQueue 实现的。数组
BlockingQueue 具备 4 组不一样的方法用于插入、移除以及对队列中的元素进行检查。若是请求的操做不能获得当即执行的话,每一个方法的表现也不一样。这些方法以下: 安全
BlockingQueue 是个接口,你须要使用它的实现之一来使用 BlockingQueue,Java.util.concurrent 包下具备如下 BlockingQueue 接口的实现类:数据结构
ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不可以存储无限多数量的元素。它有一个同一时间可以存储元素数量的上限。你能够在对其初始化的时候设定这个上限,但以后就没法对这个上限进行修改了。多线程
DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。并发
LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构对其元素进行存储。若是须要的话,这一链式结构能够选择一个上限。若是没有定义上限,将使用 Integer.MAX_VALUE 做为上限。this
PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 同样的排序规则。你没法向这个队列中插入 null 值。全部插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。所以该队列中元素的排序就取决于你本身的 Comparable 实现。线程
SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只可以容纳单个元素。若是该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另外一个线程将该元素从队列中抽走。一样,若是该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另外一个线程向队列中插入了一条新的元素。据此,把这个类称做一个队列显然是夸大其词了。它更多像是一个汇合点。3d
下面用 BlockQueue 技术来实现一下:点击连接加入群【Java并发编程交流组】:https://jq.qq.com/?_wv=1027&k=5mOvK7L
/** 定义一个盘子类,能够放鸡蛋和取鸡蛋 */ public class BigPlate { /** 装鸡蛋的盘子,大小为5 */ private BlockingQueue<Object> eggs = new ArrayBlockingQueue<Object>(5); /** 放鸡蛋 */ public void putEgg(Object egg) { try { eggs.put(egg);// 向盘子末尾放一个鸡蛋,若是盘子满了,当前线程阻塞 } catch (InterruptedException e) { e.printStackTrace(); } // 下面输出有时不许确,由于与put操做不是一个原子操做 System.out.println("放入鸡蛋"); } /** 取鸡蛋 */ public Object getEgg() { Object egg = null; try { egg = eggs.take();// 从盘子开始取一个鸡蛋,若是盘子空了,当前线程阻塞 } catch (InterruptedException e) { e.printStackTrace(); } // 下面输出有时不许确,由于与take操做不是一个原子操做 System.out.println("拿到鸡蛋"); return egg; } /** 放鸡蛋线程 */ static class AddThread extends Thread { private BigPlate plate; private Object egg = new Object(); public AddThread(BigPlate plate) { this.plate = plate; } public void run() { plate.putEgg(egg); } } /** 取鸡蛋线程 */ static class GetThread extends Thread { private BigPlate plate; public GetThread(BigPlate plate) { this.plate = plate; } public void run() { plate.getEgg(); } } public static void main(String[] args) { BigPlate plate = new BigPlate(); // 先启动10个放鸡蛋线程 for(int i = 0; i < 10; i++) { new Thread(new AddThread(plate)).start(); } // 再启动10个取鸡蛋线程 for(int i = 0; i < 10; i++) { new Thread(new GetThread(plate)).start(); } } }
利用 Condition 来实现阻塞队列
Java 1.5 以后新增了显式锁的接口 java.util.concurrent.locks.Lock 接口,一样提供了显式的条件接口 Condition,并对条件队列进行了加强。
Condition 对象能够提供和 Object 的 wait 和 notify 同样的行为,可是后者必须使用 synchronized 这个内置的monitor锁,而 Condition 使用的是 RenentranceLock 。这两种方式在阻塞等待时都会将相应的锁释放掉,可是 Condition 的等待能够中断,这是两者惟一的区别。
下面就用 Condition 技术来实现一下:点击连接加入群【Java并发编程交流组】:https://jq.qq.com/?_wv=1027&k=5mOvK7L
class Buffer { final Lock lock = new ReentrantLock(); //定义一个锁 final Condition notFull = lock.newCondition(); //定义阻塞队列满了的Condition final Condition notEmpty = lock.newCondition();//定义阻塞队列空了的Condition final Object[] items = new Object[10]; //为了下面模拟,设置阻塞队列的大小为10,不要设太大 int putptr, takeptr, count; //数组下标,用来标定位置的 //往队列中存数据 public void put(Object x) throws InterruptedException { lock.lock(); //上锁 try { while (count == items.length) { System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时没法存数据!"); notFull.await(); //若是队列满了,那么阻塞存数据这个线程,等待被唤醒 } //若是没满,按顺序往数组中存 items[putptr] = x; if (++putptr == items.length) //这是到达数组末端的判断,若是到了,再回到始端 putptr = 0; ++count; //消息数量 System.out.println(Thread.currentThread().getName() + " 存好了值: " + x); notEmpty.signal(); //好了,如今队列中有数据了,唤醒队列空的那个线程,能够取数据啦 } finally { lock.unlock(); //放锁 } } //从队列中取数据 public Object take() throws InterruptedException { lock.lock(); //上锁 try { while (count == 0) { System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时没法取数据!"); notEmpty.await(); //若是队列是空,那么阻塞取数据这个线程,等待被唤醒 } //若是没空,按顺序从数组中取 Object x = items[takeptr]; if (++takeptr == items.length) //判断是否到达末端,若是到了,再回到始端 takeptr = 0; --count; //消息数量 System.out.println(Thread.currentThread().getName() + " 取出了值: " + x); notFull.signal(); //好了,如今队列中有位置了,唤醒队列满的那个线程,能够存数据啦 return x; } finally { lock.unlock(); //放锁 } } }