SynchronousQueue原理详解-非公平模式

开篇

说明:本文分析采用的是jdk1.8

约定:下面内容中Ref-xxx表明的是引用地址,引用对应的节点java

前面已经讲解了公平模式的内容,今天来说解下关于非公平模式下的SynchronousQueue是如何进行工做的,在源码分析的时候,先来简单看一下非公平模式的简单原理,它采用的栈这种FILO先进后出的方式进行非公平处理,它内部有三种状态,分别是REQUEST,DATA,FULFILLING,其中REQUEST表明的数据请求的操做也就是take操做,而DATA表示的是数据也就是Put操做将数据存放到栈中,用于消费者进行获取操做,而FULFILLING表明的是能够进行互补操做的状态,其实和前面讲的公平模式也很相似。node

当有相同模式状况下进行入栈操做,相同操做指的是REQUEST和DATA两种类型中任意一种进行操做时,模式相同则进行入栈操做,以下图所示:app

同REQUEST进行获取数据时的入栈状况:oop

clipboard.png

一样的put的操做,进行数据操做时为DATA类型的操做,此时队列状况为:
clipboard.png
不一样模式下又是如何进行操做的?当有不一样模式进来的时候,他不是将当前的模式压入栈顶,而是将FullFill模式和当前模式进行按位或以后压入栈顶,也就是压入一个进行FullFill请求的模式进入栈顶,请求配对操做,以下图所示:
clipboard.png
经过上图可见,原本栈中有一个DATA模式的数据等待消费者进行消费,这时候来了一个REQUEST模式的请求操做来进行消费数据,这时候并无将REQUEST模式直接压入栈顶,而是将其转换为FULLFILLING模式,而且保留了原有的类型,这是进行FULLFILLING的请求,请求和栈顶下方元素进行匹配,当匹配成功后将栈顶和匹配元素同时进行出栈操做,详细请见下文分析:源码分析

TransferStack

字段信息

/** 消费者模式 */
static final int REQUEST    = 0;
/** 提供者模式 */
static final int DATA       = 1;
/** 互补模式 */
static final int FULFILLING = 2;
/** 栈顶指针 */
volatile SNode head;

方法

方法名 描述
isFulfilling 判断指定类型是不是互补模式
casHead 替换当前头结点
snode 生成SNode节点对象
transfer 主要处理逻辑
awaitFulfill 等待fulfill操做
shouldSpin 判断节点s是头结点或是fulfill节点则返回true

SNode内容

字段信息

volatile SNode next;        // 栈下一个元素
volatile SNode match;       // 匹配的节点
volatile Thread waiter;     // 控制park/unpark的线程
Object item;                // 数据或请求
int mode;                                        // 模式,上面介绍的三种模式

方法

方法名 描述
casNext 判断指定类型是不是互补模式
tryMatch 尝试匹配节点,若是存在匹配节点则判断是不是当前节点,直接返回判断结果,若是没有则替换match内容而且唤醒线程
tryCancel 取消当前节点,将当前节点的match节点设置为当前节点(this)
isCancelled 判断match节点是否是等于当前节点

通过上面内容的分析,接下来就进入正题,让咱们总体先看一下下transfer都为咱们作了些什么内容,下面是transfer源码内容:this

E transfer(E e, boolean timed, long nanos) {
    /*
     * Basic algorithm is to loop trying one of three actions:
     *
     * 1. If apparently empty or already containing nodes of same
     *    mode, try to push node on stack and wait for a match,
     *    returning it, or null if cancelled.
     *
     * 2. If apparently containing node of complementary mode,
     *    try to push a fulfilling node on to stack, match
     *    with corresponding waiting node, pop both from
     *    stack, and return matched item. The matching or
     *    unlinking might not actually be necessary because of
     *    other threads performing action 3:
     *
     * 3. If top of stack already holds another fulfilling node,
     *    help it out by doing its match and/or pop
     *    operations, and then continue. The code for helping
     *    is essentially the same as for fulfilling, except
     *    that it doesn't return the item.
     */

    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        if (h == null || h.mode == mode) {  // 栈顶指针为空或者是模式相同
            if (timed && nanos <= 0) {      // 制定了timed而且时间小于等于0则取消操做。
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // 判断头结点是否被取消了取消了就弹出队列,将头结点指向下一个节点
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {// 初始化新节点而且修改栈顶指针
                SNode m = awaitFulfill(s, timed, nanos);            // 进行等待操做
                if (m == s) {               // 返回内容是自己则进行清理操做
                    clean(s);
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // 尝试去匹配
            if (h.isCancelled())            // 判断是否已经被取消了
                casHead(h, h.next);         // 弹出取消的节点而且重新进入主循环
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//新建一个Full节点压入栈顶
                for (;;) { // 循环直到匹配
                    SNode m = s.next;       // s的下一个节点为匹配节点
                    if (m == null) {        // 表明没有等待内容了
                        casHead(s, null);   // 弹出full节点
                        s = null;           // 设置为null用于下次生成新的节点
                        break;              // 退回到主循环中
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // 弹出s节点和m节点两个节点
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // 若是失去了匹配
                        s.casNext(m, mn);   // 帮助取消链接
                }
            }
        } else {                            // 这里是帮助进行fillull
            SNode m = h.next;               // m是头结点的匹配节点
            if (m == null)                  // 若是m不存在则直接将头节点赋值为nll
                casHead(h, null);           // 弹出fulfill节点
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // h节点尝试匹配m节点
                    casHead(h, mn);         // 弹出h和m节点
                else                        // 丢失匹配则直接将头结点的下一个节点赋值为头结点的下下节点
                    h.casNext(m, mn);       
            }
        }
    }
}
  1. 模式相同的时候则进行等待操做,入队等待操做
  2. 当模式不相同时,首先判断头结点是不是fulfill节点若是不是则进行匹配操做,若是是fulfill节点先帮助头结点的fulfill节点进行匹配操做

接下来再来看一下awaitFulfill方法内容spa

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
      // 等待线程
    Thread w = Thread.currentThread();
      // 等待时间设置
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())            // 判断当前线程是否被中断 
            s.tryCancel();                    // 尝试取消操做 
        SNode m = s.match;                    // 获取当前节点的匹配节点,若是节点不为null表明匹配或取消操做,则返回
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

经过上面的源码,其实咱们以前分析同步模式的时候差不太多,变化的地方其中包括返回内容判断这里判断的是match节点是否为null,还有就是spins时间设置这里发现了shoudSpin用来判断是否进行轮训,来看一下shouldSpin方法:线程

/**
 * 判断节点是不是fulfill节点,或者是头结点为空再或者是头结点和当前节点相等时则不须要进行轮训操做
 */
boolean shouldSpin(SNode s) {
    SNode h = head;
    return (h == s || h == null || isFulfilling(h.mode));
}

实际上就是判断节点是不是fulfill节点,或者是头结点为空再或者是头结点和当前节点相等时则不须要进行轮训操做,若是知足上述条件就不小进行轮训等到操做了直接进行等待就好了。3d

接下来咱们来用例子一点点解析原理:指针

首先先进行一个put操做,这样能够简单分析下内部信息。

/**
 * SynchronousQueue原理内容
 *
 * @author battleheart
 */
public class SynchronousQueueDemo1 {
    public static void main(String[] args) throws Exception {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();
    }
}

首先它会进入到transfer方法中,进行第一步的判断他的类型信息,以下所示:

SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;

经过上面代码能够看到e=1因此是DATA类型,接下来进行判断是如何进行操做,当前堆栈是空的,如何判断堆栈为空呢?上面也讲到了head节点为空时则表明堆栈为空,接下来就要判断若是head节点为空或head指向的节点和当前操做内容模式相同,则进行等待操做,以下代码所示:

SNode h = head;
if (h == null || h.mode == mode) {  // empty or same-mode
    if (timed && nanos <= 0) {      // can't wait
        if (h != null && h.isCancelled())
            casHead(h, h.next);     // pop cancelled node
        else
            return null;
    } else if (casHead(h, s = snode(s, e, h, mode))) {
        SNode m = awaitFulfill(s, timed, nanos);
        if (m == s) {               // wait was cancelled
            clean(s);
            return null;
        }
        if ((h = head) != null && h.next == s)
            casHead(h, s.next);     // help s's fulfiller
        return (E) ((mode == REQUEST) ? m.item : s.item);
    }
}

显然头结点是空的,因此进入到第一个fi语句中执行等待操做,若是指定了timed则判断时间是否小于0,若是小于0则直接null,反之判断当前节点是否不是头结点以及头结点是否取消,潘祖条件弹出头结点,并将下一个节点设置为头结点,上述条件在当前例子中都不知足,因此要进入到下面这段代码中,首先进行对s进行初始化值,而且进行入栈操做,casHead(h, s = snode(s, e, h, mode)),下面看一下栈中的状况以下图所示:
clipboard.png
当执行完了入栈操做以后接下来要执行awaitFulfill这里的操做就是轮训以及将当前节点的线程赋值,而且挂起当前线程。此时的栈的状况以下图所示:

clipboard.png

当有一样的模式进行操做时候也是重复上述的操做内容,咱们这里模拟两次put操做,让让咱们看一下栈中的状况以下图所示:

clipboard.png

经过上图能够看到,其实就是将头结点移动到了新的节点上,而后新节点的next节点维护这下一个节点的引用,好了,上述内容分析是同模式的操做,接下来咱们试着进行take操做时,这时候会发什么内容呢?

/**
 * SynchronousQueue例子二进行两次put操做和一次take操做
 *
 * @author battleheart
 */
public class SynchronousQueueDemo1 {
    public static void main(String[] args) throws Exception {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();

        Thread thread1 = new Thread(() -> {
            try {
                queue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread1.start();
        Thread.sleep(2000);
        Thread thread2 = new Thread(() -> {
            try {
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread2.start();

        Thread.sleep(2000);
        Thread thread6 = new Thread(() -> {
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
          thread6.start();
    }
}

上面例子正好符合上面例子两次put操做的截图,进行两次put操做事后再进行take操做,接下来咱们来看一下take操做是如何进行操做的,换句话说当有不一样模式的操做时又是如何进行处理呢?上面分析的内容是同种操做模式下的,当有不一样操做则会走下面内容:

else if (!isFulfilling(h.mode)) { // try to fulfill
    if (h.isCancelled())            // already cancelled
        casHead(h, h.next);         // pop and retry
    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
        for (;;) { // loop until matched or waiters disappear
            SNode m = s.next;       // m is s's match
            if (m == null) {        // all waiters are gone
                casHead(s, null);   // pop fulfill node
                s = null;           // use new node next time
                break;              // restart main loop
            }
            SNode mn = m.next;
            if (m.tryMatch(s)) {
                casHead(s, mn);     // pop both s and m
                return (E) ((mode == REQUEST) ? m.item : s.item);
            } else                  // lost match
                s.casNext(m, mn);   // help unlink
        }
    }
} else {                            // help a fulfiller
    SNode m = h.next;               // m is h's match
    if (m == null)                  // waiter is gone
        casHead(h, null);           // pop fulfilling node
    else {
        SNode mn = m.next;
        if (m.tryMatch(h))          // help match
            casHead(h, mn);         // pop both h and m
        else                        // lost match
            h.casNext(m, mn);       // help unlink
    }
}

最下面的else咱们等会来进行分析,咱们看到若是不是同模式的话,则会先判断是不是fulfill模式,若是不是fulfill模式,则进入到第一个if语句中,显然经过图示6能够得出,头结点head模式并非fillfull模式,则进入到该if语句中,上来首先判断当前头结点是否被取消了,若是被取消则将头结点移动到栈顶下一个节点,反之则将s节点赋值为fulfill模式按位或当前节点模式,我的认为目的是既保留了原有模式也变成了fulfill模式,咱们开篇就讲到了,REQUEST=0,二进制则是00,而DATA=1,其二进制为01,而FULFILLING=2,其二进制表示10,也就是说若是当前节点是REQUEST的话那么节点的内容值时00|10=10,若是节点是DATA模式则s节点的模式时01|10=11,这样的话11既保留了原有模式也是FULFILLING模式,而后将头节点移动到当前s节点,也就是将FULFILLING模式节点入栈操做,目前分析到这里时casHead(h, s=snode(s, e, h, FULFILLING|mode),栈的状况以下图所示:

clipboard.png

接下来运行for循环里面内容,先运行以下内容:

SNode m = s.next;       // m is s's match
if (m == null) {        // all waiters are gone
    casHead(s, null);   // pop fulfill node
    s = null;           // use new node next time
    break;              // restart main loop
}

先判断当前节点也就是头结点s的下一个节点上图中head=s节点,因此s.next节点表明的是Ref-750,判断当前节点是否为空,若是为空的话表明没有可匹配的节点,先对head进行替换为null表明堆栈为空,而后将当前s节点设置为null,退出fulfill匹配模式进入到主循环中,会从新进行对当前节点进行操做,是消费仍是匹配,显然本例子中m节点是不为空的,因此这里不会运行,跳过以后运行下面内容:

SNode mn = m.next;
if (m.tryMatch(s)) {
    casHead(s, mn);     // pop both s and m
    return (E) ((mode == REQUEST) ? m.item : s.item);
} else                  // lost match
    s.casNext(m, mn);   // help unlink

mn节点在上图中对应的是Ref-681,这里是重点,m.tryMatch(s),m节点尝试匹配s节点,进入到方法里,到这一步是咱们再来看一下头结点的元素的内容:

clipboard.png

而且唤醒m节点的,告诉m节点,你如今有匹配的对象了你能够被唤醒了,这里唤醒以后就会进入到awaitFulfill下面的操做

SNode m = awaitFulfill(s, timed, nanos);
if (m == s) {               // wait was cancelled
    clean(s);
    return null;
}
if ((h = head) != null && h.next == s)
    casHead(h, s.next);     // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);

运行这里的线程显然是上图中的m节点,由于m节点被唤醒了,m==s表明的是取消了节点,显然没有进行该操做,而后就是帮助头结点进行fulfill操做,这里重点说一下这段代码:

if ((h = head) != null && h.next == s)
    casHead(h, s.next);

获取当前头结点,也就是上图中的头结点若是不为空并且h.next节点为m节点正好是m节点进行操做时的s节点,也就是说这个语句是成立的,直接将头节点指向了上图的mn节点,这里的操做和take中的下面操做是同样的,也就是帮助fulfill操做弹出栈顶和栈顶匹配的节点内容,下面代码:

SNode mn = m.next;
if (m.tryMatch(s)) {
    casHead(s, mn);     // pop both s and m
    return (E) ((mode == REQUEST) ? m.item : s.item);
} else                  // lost match
    s.casNext(m, mn);   // help unlink

重点是casHead的代码,弹出s和m两个节点,此时栈中内容以下图所示:

clipboard.png

主要的流程分析完毕了,可是细心的朋友会发现,最后面还有一个帮助fulfill的操做,(transfer中)代码以下所示:

else {                            // help a fulfiller
    SNode m = h.next;               // m is h's match
    if (m == null)                  // waiter is gone
        casHead(h, null);           // pop fulfilling node
    else {
        SNode mn = m.next;
        if (m.tryMatch(h))          // help match
            casHead(h, mn);         // pop both h and m
        else                        // lost match
            h.casNext(m, mn);       // help unlink
    }
}

我的理解是这样的,咱们上面也分析到了若是模式是相同模式状况和若是是不一样模式且模式不为匹配模式的状况,可是还会有另一种状况就是若是是不一样模式而且头结点是匹配模式的就会进入到帮助去fullfill的状况,我来画图说明一下该状况:

clipboard.png

如上图所示,上一个匹配操做没有进行完而后又来了一个请求操做,他就会帮助head进行匹配操做,也就是运行上面的代码逻辑,逻辑和匹配内容是同样的。

接下来让咱们看一下取消的clean方法内容:

void clean(SNode s) {
    s.item = null;   // 将item值设置为null
    s.waiter = null; // 将线程设置为null

    SNode past = s.next;   // s节点下一个节点若是不为空,而且节点是取消节点则指向下下个节点,这里是结束的标识,表明没有了。
    if (past != null && past.isCancelled())
        past = past.next;

    // 若是取消的是头节点则运行下面的清理操做,操做逻辑很简单就是判断头结点是否是取消节点,若是是则将节点必定到下一个节点
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // 取消不是头结点的嵌套节点。
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

经过源码能够看到首先是先找到一个能够结束的标识past,也就说到这里就结束了,判断是否不是头节点被取消了,若是是头节点被取消了则进行第一个while语句,操做也很简单就是将头节点替换头结点的下一个节点,若是不是头节点被取消了则进行下面的while语句操做,其实就是将取消的上一个节点的下一个节点指定为被取消节点的下一个节点,到此分析完毕了。

结束语

若是有分析不正确的请各位指正,我这边改正~

相关文章
相关标签/搜索