较长一段时间以来我都发现很多开发者对 jdk 中的 J.U.C
(java.util.concurrent)也就是 Java 并发包的使用甚少,更别谈对它的理解了;但这却也是咱们进阶的必备关卡。java
以前或多或少也分享过相关内容,但都不成体系;因而便想整理一套与并发包相关的系列文章。git
其中的内容主要包含如下几个部分:github
基于这三点我相信你们对这部份内容不至于一问三不知。数据库
既然开了一个新坑,就不想作的太差;因此我打算将这个列表下的大部分类都讲到。api
因此本次重点讨论 ArrayBlockingQueue
。数组
在本身实现以前先搞清楚阻塞队列的几个特色:安全
实现队列的方式多种,总的来讲就是数组和链表;其实咱们只须要搞清楚其中一个便可,不一样的特性主要表现为数组和链表的区别。网络
这里的 ArrayBlockingQueue
看名字很明显是由数组实现。多线程
咱们先根据它这三个特性尝试本身实现试试。并发
我这里自定义了一个类:ArrayQueue
,它的构造函数以下:
public ArrayQueue(int size) { items = new Object[size]; }
很明显这里的 items
就是存放数据的数组;在初始化时须要根据大小建立数组。
写入队列比较简单,只须要依次把数据存放到这个数组中便可,以下图:
但仍是有几个须要注意的点:
先看第一个队列满的时候,写入的线程须要被阻塞
,先来考虑下如何才能使一个线程被阻塞,看起来的表象线程卡住啥事也作不了。
有几种方案能够实现这个效果:
Thread.sleep(timeout)
线程休眠。object.wait()
让线程进入 waiting
状态。固然还有一些
join、LockSupport.part
等不在本次的讨论范围。
阻塞队列还有一个很是重要的特性是:当队列空间可用时(取出队列),写入线程须要被唤醒让数据能够写入进去。
因此很明显Thread.sleep(timeout)
不合适,它在到达超时时间以后便会继续运行;达不到空间可用时才唤醒继续运行这个特色。
其实这样的一个特色很容易让咱们想到 Java 的等待通知机制来实现线程间通讯;更多线程见通讯的方案能够参考这里:深刻理解线程通讯
因此我这里的作法是,一旦队列满时就将写入线程调用 object.wait()
进入 waiting
状态,直到空间可用时再进行唤醒。
/** * 队列满时的阻塞锁 */ private Object full = new Object(); /** * 队列空时的阻塞锁 */ private Object empty = new Object();
因此这里声明了两个对象用于队列满、空状况下的互相通知做用。
在写入数据成功后须要使用 empty.notify()
,这样的目的是当获取队列为空时,一旦写入数据成功就能够把消费队列的线程唤醒。
这里的 wait 和 notify 操做都须要对各自的对象使用
synchronized
方法块,这是由于 wait 和 notify 都须要获取到各自的锁。
上文也提到了:当队列为空时,获取队列的线程须要被阻塞,直到队列中有数据时才被唤醒。
代码和写入的很是相似,也很好理解;只是这里的等待、唤醒刚好是相反的,经过下面这张图能够很好理解:
总的来讲就是:
先来一个基本的测试:单线程的写入和消费。
3 123 1234 12345
经过结果来看没什么问题。
当写入的数据超过队列的大小时,就只能消费以后才能接着写入。
2019-04-09 16:24:41.040 [Thread-0] INFO c.c.concurrent.ArrayQueueTest - [Thread-0]123 2019-04-09 16:24:41.040 [main] INFO c.c.concurrent.ArrayQueueTest - size=3 2019-04-09 16:24:41.047 [main] INFO c.c.concurrent.ArrayQueueTest - 1234 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 12345 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 123456
从运行结果也能看出只有当消费数据后才能接着往队列里写入数据。
而当没有消费时,再往队列里写数据则会致使写入线程被阻塞。
三个线程并发写入300条数据,其中一个线程消费一条。
=====0 299
最终的队列大小为 299,可见线程也是安全的。
因为不论是写入仍是获取方法里的操做都须要获取锁才能操做,因此整个队列是线程安全的。
下面来看看 JDK 标准的 ArrayBlockingQueue
的实现,有了上面的基础会更好理解。
看似要复杂些,但其实逐步拆分后也很好理解:
第一步其实和咱们本身写的同样,初始化一个队列大小的数组。
第二步初始化了一个重入锁,这里其实就和咱们以前使用的 synchronized
做用一致的;
只是这里在初始化重入锁的时候默认是非公平锁
,固然也能够指定为 true
使用公平锁;这样就会按照队列的顺序进行写入和消费。
更多关于
ReentrantLock
的使用和原理请参考这里:ReentrantLock 实现原理
三四两步则是建立了 notEmpty notFull
这两个条件,他的做用于用法和以前使用的 object.wait/notify
相似。
这就是整个初始化的内容,其实和咱们本身实现的很是相似。
其实会发现阻塞写入的原理都是差很少的,只是这里使用的是 Lock 来显式获取和释放锁。
同时其中的 notFull.await();notEmpty.signal();
和咱们以前使用的 object.wait/notify
的用法和做用也是同样的。
固然它仍是实现了超时阻塞的 API
。
也是比较简单,使用了一个具备超时时间的等待方法。
再看消费队列:
也是差很少的,一看就懂。
而其中的超时 API 也是使用了 notEmpty.awaitNanos(nanos)
来实现超时返回的,就不具体说了。
说了这么多,来看一个队列的实际案例吧。
背景是这样的:
有一个定时任务会按照必定的间隔时间从数据库中读取一批数据,须要对这些数据作校验同时调用一个远程接口。
简单的作法就是由这个定时任务的线程去完成读取数据、消息校验、调用接口等整个全流程;但这样会有一个问题:
假设调用外部接口出现了异常、网络不稳致使耗时增长就会形成整个任务的效率下降,由于他都是串行会互相影响。
因此咱们改进了方案:
其实就是一个典型的生产者消费者模型:
这样两个线程就能够经过这个队列来进行解耦,互相不影响,同时这个队列也能起到缓冲的做用。
但在使用过程当中也有一些小细节值得注意。
由于这个外部接口是支持批量执行的,因此在消费线程取出数据后会在内存中作一个累加,一旦达到阈值或者是累计了一个时间段便将这批累计的数据处理掉。
但因为开发者的大意,在消费的时候使用的是 queue.take()
这个阻塞的 API;正常运行没啥问题。
可一旦原始的数据源,也就是 DB 中没数据了,致使队列里的数据也被消费完后这个消费线程便会被阻塞。
这样上一轮积累在内存中的数据便一直没机会使用,直到数据源又有数据了,一旦中间间隔较长时即可能会致使严重的业务异常。
因此咱们最好是使用 queue.poll(timeout)
这样带超时时间的 api,除非业务上有明确的要求须要阻塞。
这个习惯一样适用于其余场景,好比调用 http、rpc 接口等都须要设置合理的超时时间。
关于 ArrayBlockingQueue
的相关分享便到此结束,接着会继续更新其余并发容器及并发工具。
对本文有任何相关问题均可以留言讨论。
本文涉及到的全部源码:
你的点赞与分享是对我最大的支持