java多线程系列之synchronousQueue

synchronousQueue详解

在使用cachedThreadPool的时候,没有对原理去很好的理解,因此致使使用起来有些不放心,主要是对synchronousQueue的原理不太了解,因此有此文的分析java

本文从两个方面分析synchronousQueue:node

  1. synchronousQueue的使用,主要是经过Executors框架提供的线程池cachedThreadPool来说,由于synchronousQueue是它的workQueue
  2. synchronousQueue的原理,主要是从实现角度,分析一下数据结构

synchronousQueue基本使用

首先说说api吧,关于队列有几套api,核心是下面的两套:算法

take() & put() //这是阻塞的,会阻塞操做线程
poll() & offer() //这是非阻塞的(在不设置超时时间的前提下),当操做不能达成的时候会立马返回boolean复制代码

synchronousQueue是一个没有数据缓冲的阻塞队列,生产者线程对其的插入操做put()必须等待消费者的移除操做take(),反过来也同样。api

可是poll()和offer()就不会阻塞,举例来讲就是offer的时候若是有消费者在等待那么就会立马知足返回true,若是没有就会返回false,不会等待消费者到来。数据结构

下面咱们分析一下cachedThreadPool的使用流程,经过这个过程咱们来了解synchronousQueue的使用方式:先看代码并发

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//1
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }复制代码
  1. 对于使用synchronousQueue的线程池,在第一次execute任务的时候会在1处返回false,由于线程池中尚未线程,因此没有消费者在等待,因此就会直接建立线程进行执行任务
  2. 在上篇线程池的分析中咱们提到:建立的线程在执行完毕任务后会去循环的getTask,在getTask的过程当中会调用take去获取任务。因此当咱们再次调用execute提交任务的时候1就会返回成功(前提是先前建立的线程已经执行完毕,正在执行gettask方法进行等待),由于这个时候已经有一个线程在等待task了,因此offer直接返回成功!
  3. 这就达到了cachedThreadPool线程复用的目的,也就是说:在提交任务的时候,若是全部工做线程都处于忙碌的状态就会新建线程来执行,若是有工做线程处于空闲状态则把任务交给空闲线程来执行!而这其中的黑科技就是经过synchronousQueue来进行的。

synchronousQueue内部数据结构

根据上面咱们介绍的synchronousQueue的队列语义,咱们其实能够很容易的经过锁或者信号量等一系列的同步机制来实现一个synchronousQueue的结构,可是咱们知道有锁的通常效率都不会过高,因此java为咱们提供了下面一种无锁的算法。框架

无锁在java里面通常就是cas和spin来实现的!具体会在以后介绍java并发包的时候来分析TODO。下面的分析的核心在于:cas和spin,通常把cas和spin能够组合起来使用,spin就是不断循环重试cas操做,确保操做可以成功。这些就不详细介绍,网上有不少相关文章!这也是java并发的基础!ide

稍微跟踪一下代码,就会发现synchronousQueue内部是经过Transferer来实现的,具体分为两个Transferer,分别是TransferStack和TransferQueue,二者差异在因而否公平:下面咱们只分析TransferQueue的实现。this

/** Node class for TransferQueue. */
        static final class QNode { //1
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

            boolean casNext(QNode cmp, QNode val) {//2
                return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

            boolean casItem(Object cmp, Object val) {//2
                return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }
         }复制代码
  1. 既然是队列,确定有个node是表明队列的节点的。
  2. 2处表明了典型的两个cas赋值操做,表明了如何设置next和item的值,用于进行并发更新

以后是transfer操做idea

E transfer(E e, boolean timed, long nanos) {//1
            QNode s = null; 
            boolean isData = (e != null);
            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)       
                    continue;                       

                if (h == t || t.isData == isData) { //2
                    QNode tn = t.next;
                    if (t != tail)                 
                        continue;
                    if (tn != null) {               
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);//3
                    if (!t.casNext(null, s))       //4
                        continue;

                    advanceTail(t, s);              // 5
                    Object x = awaitFulfill(s, e, timed, nanos);//6
                    if (x == s) {                  
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {          
                        advanceHead(t, s);        
                        if (x != null)            
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                           //7
                    QNode m = h.next;            //8
                    if (t != tail || m == null || h != head)
                        continue;                   

                    Object x = m.item;
                    if (isData == (x != null) ||   
                        x == m ||                  
                        !m.casItem(x, e)) {         // 9
                        advanceHead(h, m);          
                        continue;
                    }

                    advanceHead(h, m);              // 10
                    LockSupport.unpark(m.waiter);//11
                    return (x != null) ? (E)x : e;
                }
            }
        }复制代码
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {//6.1
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)//6.2
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);//6.3
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }复制代码

在上面的代码中,我把重要的地方分了11步,分别进行解释:

首先说一下大体的操做。在transfer中,把操做分为两种,一种就是入队put,一种是出队take,入队的时候会建立data节点,值为data。出队的时候会建立一个request节点,值为null。

  1. put和take操做都会调用该方法,区别在于,put操做的时候e值为数据data,take操做的时候e值为null

  2. 若是h==t也就是队列为空,或者当前队列尾部的数据类型和调用该方法的数据类型一致:好比当前队列为空,第一次来了一个入队请求,这时候队列就会建立出一个data节点,若是第二次又来了一个入队请求(和第一次也就是队列尾部的数据类型一致,都是入队请求),这时候队列会建立出第二个data节点,并造成一个链表。同理,若是刚开始来了request请求,也会入队,以后若是继续来了一个reqeust请求,也会继续入队!

  3. 知足2的条件,就会进入3,中间会有一些一致性检查这也是必须的,避免产生并发冲突。3会建立出一个节点,根据e值的不一样,多是data节点或者request节点。

  4. 把3中建立的节点经过cas方式设置到队列尾部去。

  5. 把tail经过cas方式修改为3中新创建的s节点

  6. 调用方法awaitFulfill进行等待,若是3中建立的是data节点,那么就会等待来一个reqeust节点,反之亦然!

    1. 放入队列以后就开始进行循环判断
    2. 终止条件是节点的值被修改,具体若是是data节点,那么会被修改为null,若是是request节点,那么会被修改为data值。这个修改是在第9步中由相对的请求(若是建立的是data节点,那么就由reqeust请求来进行修改,反之亦然)来作的。若是一直没有相对的请求过来,那么节点的值就一直不会被修改,这样就跳不出循环体!
    3. 若是没有被修改,那么就须要进入park休眠,等待第9步进行修改后再经过unpark进行唤醒,唤醒以后就会判断节点值被修改从而返回。
  7. 若是在插入一个节点的时候,不知足2的条件,也就是队列不为空而且尾部节点和当前要插入节点的类型不同(这就表明来了一个相对请求),好比上图中的尾部是data节点,若是来了一个插入reqeust节点的请求,那么就会走到7这里
  8. 因为是队列,先进先出,因此会取队列里面的第一个节点,也就是h.nex
  9. 把8中取出的节点的值经过cas的方式设置成新来节点的e值,这样就成功的知足了6-2的终止条件
  10. 将head节点日后移动,这样就把第一个节点成功的出队。
  11. 每一个节点都保存了对应的操做线程,将8中节点对应的线程进行唤醒,这样6-3处于休眠的线程就醒来了,而后继续进行for循环,进而判断6-2终止条件知足,因而返回

整个过程就是这样,上文的分析只是分析了正常的工做流程,没有具体的分析操做中的竞态条件,好比两个线程同时进行入队的时候如何正确设置链表的状态,都讲的话篇幅过大。

相关文章
相关标签/搜索