**本文正在参加「Java主题月 - Java Debug笔记活动」,详情查看 活动连接 **java
随便搜了一下,全是“深刻剖析阻塞队列”、“架构师带你手写阻塞队列”、“阻塞队列居然有8种”这一类的文章。恕我直言,关注点偏了,你越关注阻塞队列自己,越学很差阻塞队列。数组
提到阻塞队列,你们脑海中就会冒出:markdown
但JDK阻塞队列自己是很是简单的,难的是阻塞队列内部的AQS。多线程
若是你以前对阻塞队列一无所知又刚好想要学习,但愿能耐心看完下面的内容。仍是那句话,学习阻塞队列的重点不是阻塞队列自己...我本身均可以手写阻塞队列。架构
为了打破你们对阻塞队列“难”、“晦涩”、“神秘”的印象,我会重新的角度切入,重构你们对阻塞队列的认识。分布式
主要内容:ide
定义:post
针对同一个资源的操做有不一样种类的线程。学习
说人话就是:共享资源+多线程,最典型的例子就是锁和生产者消费者(本文以生产者-消费者为例子讲解)。测试
以现实生活为例。消费者和生产者就像两个线程,本来作着各自的事情,厂家管本身生产,消费者管本身买,通常状况下彼此互不影响。
但当物资到达某个临界点时,就须要根据供需关系适看成出调整。
当厂家作了一大堆东西,产能过剩时,应该暂停生产,扩大宣传,让消费者过来消费。
当消费者发现某个热销商品售罄,应该提醒厂家尽快生产。
在上面的案例中,生产者和消费者是不一样种类的线程,一个负责存入,另外一个负责取出,且它们操做的是同一个资源。但最难的部分在于:
你会发现,本来互不打扰的两个线程之间开始“沟通”了:
这种线程间的相互调度,也就是线程间通讯。
看到这,你内心暗暗想道:我擦,我只会new Thread().start(),怎么让A线程去喊B线程工做呢?
仍是以上面的生产者-消费者为例,有不少种方式能够实现线程间通讯。
设计理念:生产者和消费者线程各自使用while循环,每隔片刻就去判断Queue的状态,队列为空时生产者才可插入数据,队列不为空时消费者才能取出数据,不然一概sleep等待。
/** * 轮询版本 */
public class WhileQueue<T> {
// 容器,用来装东西
private final LinkedList<T> queue = new LinkedList<>();
public void put(T resource) throws InterruptedException {
while (queue.size() >= 1) {
// 队列满了,不能再塞东西了,轮询等待消费者取出数据
System.out.println("生产者:队列已满,没法插入...");
TimeUnit.MILLISECONDS.sleep(1000);
}
System.out.println("生产者:插入" + resource + "!!!");
queue.addFirst(resource);
}
public void take() throws InterruptedException {
while (queue.size() <= 0) {
// 队列空了,不能再取东西,轮询等待生产者插入数据
System.out.println("消费者:队列为空,没法取出...");
TimeUnit.MILLISECONDS.sleep(1000);
}
System.out.println("消费者:取出消息!!!");
queue.removeLast();
TimeUnit.MILLISECONDS.sleep(5000);
}
}
复制代码
测试
public class Test {
public static void main(String[] args) {
// 队列
WhileQueue<String> queue = new WhileQueue<>();
// 生产者
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.put("消息" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
// 消费者
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
复制代码
因为设定了队列最多只能存1个消息,因此只有当队列为空时,生产者才能插入数据。这是最简单的线程间通讯:
多个线程不断轮询共享资源,经过共享资源的状态判断本身下一步该作什么。
看到这,你发现本身被骗了:哦,原来要实现线程间通讯,并不是真的须要A线程直接去叫B线程干什么,只要能按实际状况完成线程切换便可!
但上面的实现方式存在一些缺点:
相对而言,等待唤醒机制则要优雅得多,底层经过维护线程队列的方式,避免了过多线程同时自旋形成的CPU资源浪费,很有点“用空间换时间”的味道。当一个生产者线程没法插入数据时,就让它在队列里休眠(阻塞),此时生产者线程会释放CPU资源,等到消费者抢到CPU执行权并取出数据后,再由消费者唤醒生产者继续生产。
举个例子,本来生产者和消费者都要时不时去店里看一下:
而如今,生产者去店里看了下,发现还有货,就管本身去后厨睡觉了,等店里货都卖完了,天然会有消费者过来喊他补货,不须要付出额外的精力在店里盯着。
Java有多种方式能够实现等待唤醒机制,最经典的就是wait和notify。
/** * wait/notify版本 */
public class WaitNotifyQueue<T> {
// 容器,用来装东西
private final LinkedList<T> queue = new LinkedList<>();
public synchronized void put(T resource) throws InterruptedException {
while (queue.size() >= 1) {
// 队列满了,不能再塞东西了,轮询等待消费者取出数据
System.out.println("生产者:队列已满,没法插入...");
this.wait();
}
System.out.println("生产者:插入" + resource + "!!!");
queue.addFirst(resource);
this.notify();
}
public synchronized void take() throws InterruptedException {
while (queue.size() <= 0) {
// 队列空了,不能再取东西,轮询等待生产者插入数据
System.out.println("消费者:队列为空,没法取出...");
this.wait();
}
System.out.println("消费者:取出消息!!!");
queue.removeLast();
this.notify();
}
}
复制代码
对比WhileQueue作了哪些改进:
但通常推荐使用notifyAll(为何?)。咱们给测试程序再加一个生产者线程就知道了:
开始不久后,整个程序全部线程都阻塞了
缘由是:在synchronized机制下,全部等待的线程都在同一个队列里,而notify又恰巧是随机唤醒线程(也就是说,有可能生产者唤醒生产者)。
最终结果是:全部线程都睡觉了...表如今程序上,就是卡住了。
解决办法是改用notifyAll,**把全部线程都唤醒,而后你们一块儿参与执行权的竞争。**你是否有疑问:若是和上面同样,生产者1仍是唤醒生产者2呢?
其实这个假设不成立...使用notifyAll之后就再也不是随机唤醒某一个线程了,而是唤醒全部线程并从新抢夺执行权。 也就是说,每个线程在进入阻塞以前,都会叫醒其余全部线程!
wait/notify版本的缺点是随机唤醒容易出现“己方唤醒己方,最终致使所有线程阻塞”的乌龙事件,虽然wait/notifyAll能解决这个问题,但唤醒所有线程又不够精确,会形成无谓的线程竞争(实际只须要唤醒敌方线程便可)。
做为改进版,可使用ReentrantLock的Condition替代synchronized的wait/notify:
/** * Condition版本 */
public class ConditionQueue<T> {
// 容器,用来装东西
private final LinkedList<T> queue = new LinkedList<>();
// 显式锁(相对地,synchronized锁被称为隐式锁)
private final ReentrantLock lock = new ReentrantLock();
private final Condition producerCondition = lock.newCondition();
private final Condition consumerCondition = lock.newCondition();
public void put(T resource) throws InterruptedException {
lock.lock();
try {
while (queue.size() >= 1) {
// 队列满了,不能再塞东西了,轮询等待消费者取出数据
System.out.println("生产者:队列已满,没法插入...");
// 生产者阻塞
producerCondition.await();
}
System.out.println("生产者:插入" + resource + "!!!");
queue.addFirst(resource);
// 生产完毕,唤醒消费者
consumerCondition.signal();
} finally {
lock.unlock();
}
}
public void take() throws InterruptedException {
lock.lock();
try {
while (queue.size() <= 0) {
// 队列空了,不能再取东西,轮询等待生产者插入数据
System.out.println("消费者:队列为空,没法取出...");
// 消费者阻塞
consumerCondition.await();
}
System.out.println("消费者:取出消息!!!");
queue.removeLast();
// 消费完毕,唤醒生产者
producerCondition.signal();
} finally {
lock.unlock();
}
}
}
复制代码
如何理解Condition呢?你能够认为lock.newCondition()建立了一个队列,调用producerCondition.await()会把生产者线程放入生产者的等待队列中,当消费者调用producerCondition.signal()时会唤醒从生产者的等待队列中唤醒一个生产者线程出来工做。
也就是说,ReentrantLock的Condition经过拆分线程等待队列,让线程的等待唤醒更加精确了,想唤醒哪一方就唤醒哪一方。
至此,你们应该对线程间通讯有了大体了解。若是你仔细观察,会发现上面其实都采用了阻塞队列实现。咱们都是先构造一个Queue,而后生产者和消费者直接操做Queue,至因而否阻塞,由Queue内部判断。这样封装的好处是,将生产者和消费者解耦的同时,不暴露过多细节,使用起来更简单。
你们应该都听过JDK的阻塞队列吧?基于上面的案例,咱们改进一下,抽取出一个自定义的阻塞队列(使用wait/nofityAll实现):
public class BlockingQueue<T> {
// 模拟队列
private final LinkedList<T> queue = new LinkedList<>();
private int MAX_SIZE = 1;
private int remainCount = 0;
public BlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("size最小为1");
}
this.MAX_SIZE = capacity;
}
public synchronized void put(T resource) throws InterruptedException {
while (queue.size() >= MAX_SIZE) {
// 队列满了,不能再塞东西了,阻塞生产者
System.out.println("插入阻塞...");
this.wait();
}
queue.addFirst(resource);
remainCount++;
printMsg(resource, "被插入");
this.notifyAll();
}
public synchronized T take() throws InterruptedException {
while (queue.size() <= 0) {
// 队列空了,不能再取东西了,阻塞消费者
System.out.println("取出阻塞...");
this.wait();
}
T resource = queue.removeLast();
remainCount--;
printMsg(resource, "被取出");
this.notifyAll();
return resource;
}
private void printMsg(T resource, String operation) throws InterruptedException {
System.out.println(resource + operation);
System.out.println("队列容量:" + remainCount);
}
}
复制代码
虽然不少人开口闭口“阻塞队列”,但“阻塞队列”在他脑中只是个很模糊的概念。连“阻塞队列”的前因后果都不甚清楚,又怎么能说了解呢?
实际上,和List、Set同样,“阻塞队列”也有本身的一脉。在JDK的util包下有一个Queue接口:
若是你继续往下扒,就会发现Queue和List其实很像,也是集合的一个分支罢了:
为何不少人会以为阻塞队列(好比ArrayBlockingQueue)高大上,听起来比ArrayList牛逼呢?主要在于“阻塞”二字!由于你们不了解阻塞,本身也不知道怎么实现阻塞,因此会以为阻塞队列很神秘,很牛逼。但仔细观察上面的继承关系你会发现,若是ArrayBlockingQueue没有实现BlockingQueue接口,那么它本应该是个普普统统的队列,而不是阻塞队列,也就没有那么惊艳了。
那么BlockingQueue作了啥呢?其实啥也没作,毕竟BlockingQueue只是个接口,而接口只能定义方法...就比如一栋摩天大厦建成了,楼顶有个空中泳池,你以为很牛逼。那么,你以为是当初说“我要楼顶有个大花园”的老板牛逼仍是把这个方案实现的设计师牛逼呢?
扯远了,其实BlockingQueue继承Queue接口后,就定义了几个方法:
BlockingQueue金口一开,后面的小弟只能知足,因此几个阻塞队列的实现类都有上面的几个方法。
那么阻塞队列的“阻塞”是怎么实现的呢?以ArrayBlockingQueue为例,经过上面的继承关系分析,Queue和BlockingQueue是接口,里面只有方法定义没有具体实现,有可能实现“阻塞”功能的要么在AbstractQueue,要么就是ArrayBlockingQueue自身。咱们查看AbstractQueue发现这家伙几乎啥都没写...
也就是说,当初老板发话“我但愿这个队列能阻塞”,经理微笑着满口答应,结果转手就交给3个小弟本身整了。好在3个小弟争气,还真给他们搞出来了...
经常使用的3个阻塞队列:
仍是以ArrayBlockingQueue为例,它是怎么实现阻塞的呢?
好家伙...居然用了ReentrantLock,这和咱们上面案例中写的ConditionQueue好像啊!
可是ArrayBlockingQueue只有notFull.await(),没看到signal(),不合理。仔细找找,惟一的多是ArrayBlockingQueue把signal()藏在enqueue(e)方法里了:
其余两个阻塞队列LinkedBlockingQueue和SynchronousQueue同理,也是用ReentrantLock实现阻塞的。
看到这里,相信阻塞队列在你们心中已经再也不那么神圣了,有什么了不得啊,咱们本身也能写啊,还用了好几种方式实现呢!可是扪心自问,阻塞队列总共也就:
而咱们所谓的手写阻塞队列,实际上是这样的:队列直接用了LinkedList,阻塞也是借用wait/notify和ReentrantLock实现的。也就是说,咱们其实只是作了组装工做,拿现成的队列+阻塞功能拼出了一个阻塞队列。
世间路千万条,总有人不走寻常路。按理说现成的List+wait/notifyAll已经能够造出阻塞队列了,但就是有大佬不知足。
Doug Lea老爷子震惊的说:
What?! Why you don't say earlly ya! I have already finished the AQS le...
是的,又是这个男人,他整出了一个AQS,再把AQS塞到ReentrantLock中,最后用ReentrantLock+数组、ReentrantLock+链表、ReentrantLock+Transfer搞出了ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue...阻塞队列只能算顺便的,他的初衷实际上是利用AQS统一并简化锁的实现,屏蔽同步状态管理、阻塞线程的排队和通知、唤醒机制等,让后续的二次开发更简便。
换句话说:
若是你纠结于阻塞队列怎么实现,那你的格局就过小了...JDK的阻塞队列依赖于ReentrantLock,而ReentrantLock只是对AQS的浅封装,真正须要咱们花功夫学习的其实有且只有AQS。
我是bravo1988,点个赞吧,求你了。短短半个月,我从“壮志凌云,想把知乎3w关注带到掘金”,转变为“垂头丧气,想把掘金30关注带回知乎”。
よろしく・つづく
往期文章: