| 好看请赞,养成习惯css
你有一个思想,我有一个思想,咱们交换后,一我的就有两个思想web
If you can NOT explain it simply, you do NOT understand it well enough面试
现陆续将Demo代码和技术文章整理在一块儿 Github实践精选 ,方便你们阅读查看,本文一样收录在此,以为不错,还请Star🌟算法
前言
若是按照用途与特性进行粗略的划分,JUC 包中包含的工具大致能够分为 6 类:编程
-
执行者与线程池 -
并发队列 -
同步工具 -
并发集合 -
锁 -
原子变量
在【并发系列】中,主要讲解了 执行者与线程池
,同步工具
,锁
, 在分析源码时,或多或少的说起到了「队列」,队列在 JUC 中也是多种多样存在,因此本文就以「远看」视角,帮助你们快速了解与区分这些看似「杂乱」的队列数组
并发队列
Java 并发队列按照实现方式来进行划分能够分为 2 种:缓存
-
阻塞队列 -
非阻塞队列
若是你已经看完并发系列锁的实现,你已经可以知道他们实现的区别:微信
前者就是基于锁实现的,后者则是基于 CAS 非阻塞算法实现的多线程
常见的队列有下面这几种:并发

瞬间懵逼?看到这个没有人性的图想直接走人?客观先别急,一会就柳暗花明了
当下你也许有个问题:
为何会有这么多种队列的存在?
锁有应对各类情形的锁,队列也天然有应对各类情形的队列了, 是否是也有点单一职责原则的意思呢?
因此咱们要了解这些队列究竟是怎么设计的?以及用在了哪些地方?
先来看下图

若是你在 IDE 中打开以上非阻塞队列和阻塞队列,查看其实现方法,你就会发现,阻塞队列
较非阻塞队列
额外支持两种操做:
-
阻塞的插入
当队列满时,队列会阻塞插入元素的线程,直到队列不满
-
阻塞的移除
当队列为空时,获取元素的线程会阻塞,直到队列变为非空
综合说明入队/出队操做,看似杂乱的方法,用一个表格就能归纳了

抛出异常
-
当队列满时,此时若是再向队列中插入元素,会抛出 IllegalStateException (这很好理解) -
当队列空时,此时若是再从队列中获取元素,会抛出 NoSuchElementException (这也很好理解)
返回特殊值
-
当向队列插入元素时,会返回元素是否插入成功,成功则返回 true -
当从队列移除元素时,若是没有则返回 null
一直阻塞
-
当队列满时,若是 生产者线程向队列 put 元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出 -
当队列为空时,若是 消费者线程 从队列里面 take 元素,队列会阻塞消费者线程,直到队列不为空
关于阻塞,咱们其实早在 并发编程之等待通知机制 就已经充分说明过了,你还记得下面这张图吗?原理实际上是同样同样滴

超时退出
和锁同样,由于有阻塞,为了灵活使用,就必定支持超时退出,阻塞时间达到超时时间,就会直接返回
至于为啥插入和移除这么多种单词表示形式,我也不知道,为了方便记忆,只须要记住阻塞的方法形式便可:
单词
put
和take
字母t
首位相连,一个放,一个拿
到这里你应该对 Java 并发队列有了个初步的认识了,原来看似杂乱的方法貌似也有了规律。接下来就到了疯狂串知识点的时刻了,借助前序章节的知识,分分钟就理解所有队列了

ArrayBlockingQueue
以前也说过,JDK中的命名仍是很讲究滴,一看这名字,底层就是数组实现了,是否有界,那就看在构造的时候是否须要指定 capacity 值了
填鸭式的说明也容易忘,这些都是哪看到的呢?在全部队列的 Java docs 的第一段,一句话就归纳了该队列的主要特性,因此强烈建议你们本身在看源码时,简单瞄一眼 docs 开头,心中就有多半个数了

在讲 Java AQS队列同步器以及ReentrantLock的应用 时咱们介绍了公平锁与非公平锁的概念,ArrayBlockingQueue 也有一样的概念,看它的构造方法,就有 ReentrantLock 来辅助实现
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
默认状况下,依旧是不保证线程公平访问队列(公平与否是指阻塞的线程可否按照阻塞的前后顺序访问队列,先阻塞线访问,后阻塞后访问)
到这我也要临时问一个说过屡次的面试送分题了:
为何默认采用非公平锁的方式?它较公平锁方式有什么好处,又可能带来哪些问题?
知道了以上内容,结合上面表格中的方法,ArrayBlockingQueue 就能够轻松过关了

和数组相对的天然是链表了
LinkedBlockingQueue

LinkedBlockingQueue 也算是一个有界阻塞队列 ,从下面的构造函数中你也能够看出,该队列的默认和最大长度为 Integer.MAX_VALUE ,这也就 docs 说 optionally-bounded 的缘由了
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
正如 Java 集合同样,链表形式的队列,其存取效率要比数组形式的队列高。可是在一些并发程序中,数组形式的队列因为具备必定的可预测性,所以能够在某些场景中得到更高的效率
看到 LinkedBlockingQueue 是否是也有些熟悉呢?为何要使用线程池? 就已经和它屡次照面了
建立单个线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
建立固定个数线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
面试送分题又来了
使用 Executors 建立线程池很简单,为何大厂严格要求禁用这种建立方式呢?
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界的阻塞队列,默认状况下采用天然顺序升序排列,固然也有非默认状况自定义优先级,须要排序,那天然要用到 Comparator 来定义排序规则了

能够定义优先级,天然也就有相应的限制,以及使用的注意事项
-
按照上图说明,队列中不容许存在 null 值,也不容许存在不能排序的元素
-
对于排序值相同的元素,其序列是不保证的,但你能够继续自定义其余能够区分出来优先级的值,若是你有严格的优先级区分,建议有更完善的比较规则,就像 Java docs 这样
class FIFOEntry<E extends Comparable<? super E>>
implements Comparable<FIFOEntry<E>> {
static final AtomicLong seq = new AtomicLong(0);
final long seqNum;
final E entry;
public FIFOEntry(E entry) {
seqNum = seq.getAndIncrement();
this.entry = entry;
}
public E getEntry() { return entry; }
public int compareTo(FIFOEntry<E> other) {
int res = entry.compareTo(other.entry);
if (res == 0 && other.entry != this.entry)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
}
} -
队列容量是没有上限的,可是若是插入的元素超过负载,有可能会引发OutOfMemory异常(这是确定的),这也是为何咱们一般所说,队列无界,心中有界
-
PriorityBlockingQueue 也有 put 方法,这是一个阻塞的方法,由于它是无界的,天然不会阻塞,因此就有了下面比较聪明的作法
public void put(E e) {
offer(e); // never need to block 请自行对照上面表格
} -
能够给定初始容量,这个容量会按照必定的算法自动扩充
// Default array capacity.
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}这里默认的容量是 11,因为也是基于数组,那面试送分题又来了
你一般是怎样定义容器/集合初始容量的?有哪些依据?
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列
-
是否延时确定是和某个时间(一般和当前时间) 进行 比较 -
比较事后还要进行排序,因此也是存在必定的 优先级
看到这也许以为这有点和 PriorityBlockingQueue
很像,没错,DelayQueue
的内部也是使用 PriorityQueue

上图绿色框线也告诉你,DelayQueue 队列的元素必需要实现 Depayed 接口:

因此从上图能够看出使用 DelayQueue 很是简单,只须要两步:
实现 getDelay() 方法,返回元素要延时多长时间
public long getDelay(TimeUnit unit) {
// 最好采用纳秒形式,这样更精确
return unit.convert(time - now(), NANOSECONDS);
}
实现 compareTo() 方法,比较元素顺序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
上面的代码哪来的呢?若是你打开 ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了 (ScheduledThreadPoolExecutor 内部就是应用 DelayQueue)
因此综合来讲,下面两种状况很是适合使用 DelayQueue
-
缓存系统的设计:用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,若是能从 DelayQueue 中获取元素,说明缓存有效期到了 -
定时任务调度:用 DelayQueue 保存当天会执行的任务以及时间,若是能从 DelayQueue 中获取元素,任务就能够开始执行了。好比 TimerQueue 就是这样实现的
SynchronousQueue

这是一个不存储元素的阻塞队列,不存储元素还叫队列?

没错,SynchronousQueue 直译过来叫同步队列,若是在队列里面呆久了应该就算是“异步”了吧
因此使用它,每一个put() 操做必需要等待一个 take() 操做,反之亦然,不然不能继续添加元素
实际中怎么用呢?假如你须要两个线程之间同步共享变量,若是不用 SynchronousQueue 你可能会选择用 CountDownLatch 来完成,就像这样:
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
sharedState.set(producedElement);
countDownLatch.countDown();
};
Runnable consumer = () -> {
try {
countDownLatch.await();
Integer consumedElement = sharedState.get();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
这点小事就用计数器来实现,显然很不合适,用 SynchronousQueue 改造一下,感受瞬间就不同了
ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {
queue.put(producedElement);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
Runnable consumer = () -> {
try {
Integer consumedElement = queue.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
其实 Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
看到前面
LinkedBlockingQueue
用在newSingleThreadExecutor
和newFixedThreadPool
上,而newCachedThreadPool
却用SynchronousQueue
,这是为何呢?
由于单线程池和固定线程池中,线程数量是有限的,所以提交的任务须要在LinkedBlockingQueue
队列中等待空余的线程;
而缓存线程池中,线程数量几乎无限(上限为Integer.MAX_VALUE
),所以提交的任务只须要在SynchronousQueue
队列中同步移交给空余线程便可, 因此有时也会说 SynchronousQueue
的吞吐量要高于 LinkedBlockingQueue
和 ArrayBlockingQueue
LinkedTransferQueue
简单来讲,TransferQueue提供了一个场所,生产者线程使用 transfer
方法传入一些对象并阻塞,直至这些对象被消费者线程所有取出。
你有没有以为,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue。
但 LinkedTransferQueue 相比其余阻塞队列多了三个方法
-
transfer(E e)
若是当前有消费者正在等待消费元素,transfer 方法就能够直接将生产者传入的元素马上 transfer (传输) 给消费者;若是没有消费者等待消费元素,那么 transfer 方法会把元素放到队列的 tail(尾部)
节点,一直阻塞,直到该元素被消费者消费才返回
-
tryTransfer(E e)
tryTransfer,很显然是一种尝试,若是没有消费者等待消费元素,则立刻返回 false ,程序不会阻塞
-
tryTransfer(E e, long timeout, TimeUnit unit)
带有超时限制,尝试将生产者传入的元素 transfer 给消费者,若是超时时间到,尚未消费者消费元素,则返回 false
你瞧,全部阻塞的方法都是一个套路:
-
阻塞方式 -
带有 try 的非阻塞方式 -
带有 try 和超时时间的非阻塞方式
看到这你也许感受 LinkedTransferQueue 没啥特色,其实它和其余阻塞队列的差异还挺大的:
BlockingQueue 是若是队列满了,线程才会阻塞;可是 TransferQueue 是若是没有消费元素,则会阻塞 (transfer 方法)
这也就应了 Doug Lea 说的那句话:
LinkedTransferQueue
is actually a superset ofConcurrentLinkedQueue
,SynchronousQueue
(in “fair” mode), and unboundedLinkedBlockingQueues
. And it’s made better by allowing you to mix and match those features as well as take advantage of higher-performance i mplementation techniques.简单翻译:
LinkedTransferQueue
是ConcurrentLinkedQueue
,SynchronousQueue
(在公平模式下), 无界的LinkedBlockingQueues
等的超集; 容许你混合使用阻塞队列的多种特性因此,在合适的场景中,请尽可能使用
LinkedTransferQueue
上面都看的是单向队列 FIFO,接下来咱们看看双向队列
LinkedBlockingDeque
LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列,凡是后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/
, 刚接触它时我觉得是这个冰激凌的发音

所谓双向队列值得就是能够从队列的两端插入和移除元素。因此:
双向队列由于多了一个操做队列的入口,在多线程同时入队是,也就会减小一半的竞争
队列有头,有尾,所以它又比其余阻塞队列多了几个特殊的方法
-
addFirst -
addLast -
xxxxFirst -
xxxxLast -
... ...

这么一看,双向阻塞队列确实很高效,
那双向阻塞队列应用在什么地方了呢?
不知道你是否听过 “工做窃取”模式,看似不太厚道的一种方法,实则是高效利用线程的好办法。下一篇文章,咱们就来看看 ForkJoinPool 是如何应用 “工做窃取”模式的
总结
到这关于 Java 队列(其实主要介绍了阻塞队列)就快速的区分完了,将看似杂乱的方法作了分类整理,方便快速理解其用途,同时也说明了这些队列的实际用途。相信你带着更高的视角来阅读源码会更加轻松,最后也但愿你们认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就能够搞定了
参考
-
Java 并发编程的艺术 -
Java 并发编程之美 -
https://zhuanlan.zhihu.com/p/27148381
<<< 左右滑动见更多 >>>
往期推荐
本文分享自微信公众号 - 日拱一兵(gh_6235a38420b9)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。