在java集合中还有一个重要的分支是concurrent包中的阻塞队列,这些队列都是线程安全的,有基于数组建立的队列,也有基于单链表或者双链表建立的队列,比较常见的一些队列有:java
ArrayBlockingQueue:基于数组的阻塞队列(添加与获取均可阻塞)数组
LinkedBlockingQueue:基于链表的阻塞队列(添加与获取均可阻塞)安全
ArrayBlockingQueue与LinkedBlockingQueue放一块儿说,由于可能常常使用到这两个阻塞队列,主区别以下:ide
1.锁的方式不一样测试
ArrayBlockingQueue读写只有一个锁this
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
notEmpty和notFull都是从lock获取的,不适用于多生产者和消费者的场景,锁的争用致使效率下降线程
LinkedBlockingQueue读写锁是分离的code
private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
notEmpty由takeLock获取,notFull由putLock获取对象
2.添加的对象不一样排序
ArrayBlockingQueue直接将对象添加到数组中
LinkedBlockingQueue须要将对象从新构形成NODE(单链表节点)对象,再链接到链表中
3.使用场景
虽然ArrayBlockingQueue锁的争用会下降效率,可是LinkedBlockingQueue在建立Node也会增长开销,在本身电脑上实际测试中,ArrayBlockingQueue速度更优于LinkedBlockingQueue
总结:在实际开发中我认为任意选择他们中的一个都是能够的(但愿有朋友来指正)
测试代码以下:
package com.jv.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import com.jv.queue.TestLinkedBlockingQueue.Consumer; import com.jv.queue.TestLinkedBlockingQueue.Productor; public class TestArrayBlockingQueue { public static void main(String[] args) { ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<>(100000000); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); } static class Productor implements Runnable{ ArrayBlockingQueue<Integer> q ; public Productor(ArrayBlockingQueue<Integer> q) { this.q = q; } @Override public void run() { long start = System.currentTimeMillis(); for(int i = 0 ; i < 10000000 ; i++) { try { q.offer(i, 2000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } long end = System.currentTimeMillis(); System.out.println(end-start); } } static class Consumer implements Runnable{ ArrayBlockingQueue<Integer> q ; public Consumer(ArrayBlockingQueue<Integer> q) { this.q = q; } @Override public void run() { while(true) { try { q.poll(2000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
package com.jv.queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class TestLinkedBlockingQueue { public static void main(String[] args) { LinkedBlockingQueue<Integer> q = new LinkedBlockingQueue<>(100000000); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Productor(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); new Thread(new Consumer(q)).start(); } static class Productor implements Runnable{ LinkedBlockingQueue<Integer> q ; public Productor(LinkedBlockingQueue<Integer> q) { this.q = q; } @Override public void run() { long start = System.currentTimeMillis(); for(int i = 0 ; i < 10000000 ; i++) { try { q.offer(i, 2000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } long end = System.currentTimeMillis(); System.out.println(end-start); } } static class Consumer implements Runnable{ LinkedBlockingQueue<Integer> q ; public Consumer(LinkedBlockingQueue<Integer> q) { this.q = q; } @Override public void run() { while(true) { try { q.poll(2000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
PriorityBlockingQueue:可排序的阻塞队列(添加不阻塞,获取可阻塞)
能够输出排序后的队列元素,PriorityBlockingQueue是无界队列(刚开始设定一个队列的大小,当队列不够长时会自动增加), 该队列的元素必须实现comparable接口,使用迭代器queue.iterator是不能保证顺序的,应该使用for或者while循环
LinkedBlockingDeque:基于双向链表的阻塞队列(添加与获取均可阻塞)
能够双向添加、双向获取
DelayQueue:延迟到期队列(添加不阻塞,获取可阻塞)
核心是一个基于数组的PriorityQueue(并非concurrent包中的类),它包含的对象必须实现Delayed接口,重写getDelay(TimeUnit unit)方法---获得到期时间,DelayQueue是无界队列(刚开始设定一个队列的大小,当队列不够长时会自动增加)。这种队列能够用来实现任务计划,不过JDK都有本身的任务计划API了,可能会不多使用
API使用举例:
package com.jv.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.Test; public class TestBlockingQueue { /* * ArrayBlockQueue 是基于数组实现的有界队列,支持阻塞、非阻塞添加和获取数据 * */ @Test public void test1() throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5); queue.add("1"); queue.add("2"); queue.add("3"); queue.add("4"); queue.add("5"); //queue.add("6");//不阻塞 queue.offer("6");//不阻塞 queue.offer("6", 3000, TimeUnit.MILLISECONDS);//阻塞指定时间 //queue.put("6");//一直阻塞 System.out.println(queue.take());//会阻塞 System.out.println(queue.poll());//不阻塞 System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.peek()); System.out.println(queue.poll()); //System.out.println(queue.take()); System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间 System.out.println(queue.peek());//获取队列当前index对应的元素,不会将元素从队列中移除,本质上是调用itemAt方法,使用数组下标访问 } /* * LinkedBlockQueue 是基于链表实现的有界队列,支持阻塞、非阻塞添加和获取数据 * */ @Test public void test2() throws InterruptedException { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(5); queue.add("1"); queue.add("2"); queue.add("3"); queue.add("4"); queue.add("5"); //queue.add("6");//不阻塞 queue.offer("6");//不阻塞 queue.offer("6", 3000, TimeUnit.MILLISECONDS);//阻塞指定时间 //queue.put("6");//一直阻塞 System.out.println(queue.take());//会阻塞 System.out.println(queue.poll());//不阻塞 System.out.println(queue.poll()); System.out.println(queue.peek()); System.out.println(queue.poll()); System.out.println(queue.poll()); //System.out.println(queue.take()); System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间 System.out.println(queue.peek());//获取队列index的元素,不会讲元素从队列中移除,本质上是调用itemAt方法,使用数组下标访问 } /* * PriorityBlockingQueue 是基于数组实现的有界队列,支持非阻塞添加数据,支持阻塞和非阻塞获取数据 * 从类名能够看出该队列是支持根据优先级排序访问的,那么元素必须实现comparable接口 * 若是使用迭代器queue.iterator是不能保证顺序的,应该使用for或者while循环 */ @Test public void test3() throws InterruptedException { PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(5); queue.add("1");//不阻塞,调用的offer() queue.add("2"); queue.add("3"); queue.add("4"); queue.add("5"); queue.offer("6");//不阻塞 queue.offer("7", 3000, TimeUnit.MILLISECONDS);//不阻塞,调用的offer(),超时设置无效 queue.put("8");//不阻塞,调用的offer() System.out.println(queue.take());//会阻塞 System.out.println(queue.poll()); System.out.println(queue.peek()); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间 System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间 System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间 System.out.println(queue.take()); System.out.println(queue.peek());//获取队列index的元素,不会将元素从队列中移除,本质上是数组下标访问 } }