在使用cachedThreadPool的时候,没有对原理去很好的理解,因此致使使用起来有些不放心,主要是对synchronousQueue的原理不太了解,因此有此文的分析java
本文从两个方面分析synchronousQueue:node
首先说说api吧,关于队列有几套api,核心是下面的两套:算法
take() & put() //这是阻塞的,会阻塞操做线程
poll() & offer() //这是非阻塞的(在不设置超时时间的前提下),当操做不能达成的时候会立马返回boolean复制代码
synchronousQueue是一个没有数据缓冲的阻塞队列,生产者线程对其的插入操做put()必须等待消费者的移除操做take(),反过来也同样。api
可是poll()和offer()就不会阻塞,举例来讲就是offer的时候若是有消费者在等待那么就会立马知足返回true,若是没有就会返回false,不会等待消费者到来。数据结构
下面咱们分析一下cachedThreadPool的使用流程,经过这个过程咱们来了解synchronousQueue的使用方式:先看代码并发
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//1
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}复制代码
根据上面咱们介绍的synchronousQueue的队列语义,咱们其实能够很容易的经过锁或者信号量等一系列的同步机制来实现一个synchronousQueue的结构,可是咱们知道有锁的通常效率都不会过高,因此java为咱们提供了下面一种无锁的算法。框架
无锁在java里面通常就是cas和spin来实现的!具体会在以后介绍java并发包的时候来分析TODO。下面的分析的核心在于:cas和spin,通常把cas和spin能够组合起来使用,spin就是不断循环重试cas操做,确保操做可以成功。这些就不详细介绍,网上有不少相关文章!这也是java并发的基础!ide
稍微跟踪一下代码,就会发现synchronousQueue内部是经过Transferer来实现的,具体分为两个Transferer,分别是TransferStack和TransferQueue,二者差异在因而否公平:下面咱们只分析TransferQueue的实现。this
/** Node class for TransferQueue. */
static final class QNode { //1
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
boolean casNext(QNode cmp, QNode val) {//2
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {//2
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
}复制代码
以后是transfer操做idea
E transfer(E e, boolean timed, long nanos) {//1
QNode s = null;
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)
continue;
if (h == t || t.isData == isData) { //2
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0)
return null;
if (s == null)
s = new QNode(e, isData);//3
if (!t.casNext(null, s)) //4
continue;
advanceTail(t, s); // 5
Object x = awaitFulfill(s, e, timed, nanos);//6
if (x == s) {
clean(t, s);
return null;
}
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { //7
QNode m = h.next; //8
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
if (isData == (x != null) ||
x == m ||
!m.casItem(x, e)) { // 9
advanceHead(h, m);
continue;
}
advanceHead(h, m); // 10
LockSupport.unpark(m.waiter);//11
return (x != null) ? (E)x : e;
}
}
}复制代码
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {//6.1
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)//6.2
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);//6.3
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}复制代码
在上面的代码中,我把重要的地方分了11步,分别进行解释:
首先说一下大体的操做。在transfer中,把操做分为两种,一种就是入队put,一种是出队take,入队的时候会建立data节点,值为data。出队的时候会建立一个request节点,值为null。
put和take操做都会调用该方法,区别在于,put操做的时候e值为数据data,take操做的时候e值为null
若是h==t也就是队列为空,或者当前队列尾部的数据类型和调用该方法的数据类型一致:好比当前队列为空,第一次来了一个入队请求,这时候队列就会建立出一个data节点,若是第二次又来了一个入队请求(和第一次也就是队列尾部的数据类型一致,都是入队请求),这时候队列会建立出第二个data节点,并造成一个链表。同理,若是刚开始来了request请求,也会入队,以后若是继续来了一个reqeust请求,也会继续入队!
知足2的条件,就会进入3,中间会有一些一致性检查这也是必须的,避免产生并发冲突。3会建立出一个节点,根据e值的不一样,多是data节点或者request节点。
把3中建立的节点经过cas方式设置到队列尾部去。
把tail经过cas方式修改为3中新创建的s节点
调用方法awaitFulfill进行等待,若是3中建立的是data节点,那么就会等待来一个reqeust节点,反之亦然!