并发容器学习—LinkedTransferQueue

1、LinkedTransferQueue并发容器java

1.LinkedTransferQueue的底层实现node

    LinkedTransferQueue是一个底层数据结构由链表实现的无界阻塞队列,它与SynchronousQueue中公平模式的实现TransferQueue及其类似,LinkedTransferQueue中存储的也是操做而不是数据元素。能够对比着学习,更容易理解。先来看看LinkedTransferQueue结点的定义:数据结构

static final class Node {

    //用于标识结点的操做类型,true表示put操做,false表示take操做
    final boolean isData;  

    //结点的数据域,take类型的操做,该值为null,配对后则为put中的数据
    //put类型的操做,该值为要转移的数据
    volatile Object item;   

    //当前结点的后继结点
    volatile Node next;

    //等待的线程
    volatile Thread waiter; // null until waiting


    //CAS方式更新后继结点
    final boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    //CAS方式更新数据域
    final boolean casItem(Object cmp, Object val) {
        // assert cmp == null || cmp.getClass() != Node.class;
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }


    //构造方法
    Node(Object item, boolean isData) {
        UNSAFE.putObject(this, itemOffset, item); // relaxed write
        this.isData = isData;
    }


    //移除出队列,方便GC
    final void forgetNext() {
        UNSAFE.putObject(this, nextOffset, this);
    }


    //取消结点,就是本次取消操做
    final void forgetContents() {
        UNSAFE.putObject(this, itemOffset, this);
        UNSAFE.putObject(this, waiterOffset, null);
    }


    //判断结点的操做是否已经被匹配了,结点的操做被取消也包含在匹配当中
    //也就是说这个操做如果被取消了,也认为是匹配过的
    final boolean isMatched() {
        Object x = item;
        return (x == this) || ((x == null) == isData);
    }

    //判断当前结点的操做是否是未匹配过的REQUEST类型(take)的结点,true表明是
    final boolean isUnmatchedRequest() {
        return !isData && item == null;
    }

    //判断结点的操做类型与其数据(item的值)是否相符合,
    //例如take操做item值应该是null,put操做item则应该是数据
    final boolean cannotPrecede(boolean haveData) {
        boolean d = isData;
        Object x;
        return d != haveData && (x = item) != this && (x != null) == d;
    }

    //尝试匹配一个数据结点,用在移除结点的方法中
    final boolean tryMatchData() {
        // assert isData;
        Object x = item;
        if (x != null && x != this && casItem(x, null)) {
            LockSupport.unpark(waiter);
            return true;
        }
        return false;
    }


    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;
    private static final long waiterOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
            waiterOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiter"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

2.LinkedTransferQueue的继承关系并发

    LinkedTransferQueue的继承关系以下图所示,这么多的父类及接口中,只有一个TransferQueue接口未接触过,下面咱们先来看看这个接口是干什么的。app

public interface TransferQueue<E> extends BlockingQueue<E> {
    //尝试转移一个数据给一个正在等待消费者,若是没有等待的消费者当即返回false
    //转移失败的话,这个操做是不会入队等待被匹配的
    boolean tryTransfer(E e);

    //转移一个数据给一个消费者,若没有正在等待的消费者,那么该转移操做会阻塞
    //等待,或发生异常
    void transfer(E e) throws InterruptedException;

    //在必定时间内尝试转移数据给一个消费者,若是没有正在等待的消费者,那就
    //一直尝试到超时为止
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //是否有消费者在等待
    boolean hasWaitingConsumer();

    //获取等待的消费者数量
    int getWaitingConsumerCount();
}

3.LinkedTransferQueue中重要的属性及构造方法less

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    //当前计算机CPU核心数是否大于1
    private static final boolean MP =
        Runtime.getRuntime().availableProcessors() > 1;

    //结点为队列中第一个waiter时的自旋次数
    private static final int FRONT_SPINS   = 1 << 7;

    //前驱结点正在处理,当前结点须要自旋的次数
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    //队列进行清理的阈值
    static final int SWEEP_THRESHOLD = 32;

    //队列头结点
    transient volatile Node head;

    //队列尾结点
    private transient volatile Node tail;

    //移除结点连接失败(修改结点的next失败)的次数,当该值大于SWEEP_THRESHOLD
    //时,会对队里进行一次清理,清理掉哪些无效的结点
    private transient volatile int sweepVotes;

    //下面四个值,用于标识xfer方法的类型
    /**
    * NOW:表明不等待消费者,直接返回结果的类型。poll方法和tryTransfer方法中使用
    * ASYNC:表示异步操做,直接添加数据元素到队尾,不等待匹配,用于offer,add,put方法中
    * SYNC:同步操做,等待数据元素被消费者接受,用于take,transfer方法中
    * TIMED:延时操做,等待必定时间后在返回匹配的结果,用于待超时时间的poll和tryTransfer方法中
    */
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer

    public LinkedTransferQueue() {
    }


    public LinkedTransferQueue(Collection<? extends E> c) {
        this();
        addAll(c);
    }
}

4.LinkedTransferQueue入队操做dom

    LinkedTransferQueue中的入队方法包含有offer,put及add三种,这三个方法本质是都是同样的,都是调用的同一个方法且参数也都同样,而且由于LinkedTransferQueue是个无界阻塞队列,容量没有限制,所以不会出现入队等待的现象。异步

public void put(E e) {
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

5.xfer方法分析学习

    xfer方法是LinkedTransferQueue种最核心的一个方法,将其理解清楚,那么LinkedTransferQueue队列也就明白了。LinkedTransferQueue与SynchronousQueue中公平模式的实现TransferQueue是同样的,队列中存放的不是数据,而是操做(取出数据的操做take和放入数据的操做put)。队列中既能够存放take操做也能够存放put操做,可是队列中不能同时存在两种不一样的操做,由于不一样的操做会触发队列进行配对(操做出队)。this

    知道了这些咱们再来看xfer方法的大体流程(超时等待部分和操做取消部分暂不分析,等分析源码时在说):当队列为空时,若是有一个线程执行take操做,此时对列中是没有对应的put操做与之匹配的,那么这个take操做就会入队,同时阻塞(也多是自旋)执行这个操做的线程以等待匹配操做的到来;同理,空队列时来的是一个put操做,那么这个put操做也要入队阻塞等待匹配的take操做到来。而当队列不为空时(假设队列中都是take操做),某一线程执行put操做,此时队列检测到来了一个与队列中存放的操做相匹配的操做,那么就会将队首操做与到来的操做进行匹配,匹配成功,就会唤醒队首操做所在的线程,同时将已经匹配度额操做移除出队;而如果某一线程执行的是与队里中相同的操做,那么就将该操做直接添加到队尾。

//1.当e!=null 且haveData为true,how为ASYNC,nanos==0,表示没有超时设置的当即返回的放入数据的操做(put,add,offer)
//2.当e==null 且haveData为false,how为SYNC,nanos==0,表示没有超时设置的等待匹配到放入数据的操做(take)
//3.当e!=null 且haveData为true,how为SYNC,nanos==0,表示没有超时设置的等待匹配到取出数据的操做(transfer)
//4.当e!=null 且haveData为true,how为TIMED,nanos>0,表示设置在超时等待时间内匹配取出数据的
//操做(tryTransfer(E e, long timeout, TimeUnit unit))
//5.当e==null 且haveData为false,how为TIMED,nanos>0,表示设置在超时等待时间内匹配放入数据的
//操做(poll(long timeout, TimeUnit unit))
//6.当e!=null 且haveData为true,how为NOW,nanos==0,表示当即匹配取出数据的操做(tryTransfer)
//7.当e==null 且haveData为false,how为NOW,nanos==0,表示当即匹配放入数据的操做(poll)
private E xfer(E e, boolean haveData, int how, long nanos) {

    //判断本次操做是否是放入数据的操做类型,如果则e不能为null
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race

        //从head开始匹配
        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean isData = p.isData;    //获取队首结点的操做类型
            Object item = p.item;    //获取队首的数据

            //判断队首结点是否已经被取消或匹配过了且队首结点的操做类型与其数据内容是否一致
            //isData为true对应put操做,则item不能为null,反之item必须为null。
            //所以,如果不一致说明p已经不是队首了,须要从新查找队首
            if (item != p && (item != null) == isData) { // unmatched

                //此处判断本次操做应该是入队操做仍是匹配操做,即判断与队首的操做类型是否一致
                //若本次操做与队列中的操做类型(都是put或都是take)相同,那么须要将本次操做入队
                //如果不一样,那么须要将队首结点的操做与本次操做匹配
                if (isData == haveData)   // can't match
                    break;    //操做类型相同,退出当前循环,去执行入队步骤

                //到此,说明队首操做与本次操做时相互匹配的,那么接下来须要作配对以后的工做
                //尝试修改p中数据item为e,若修改为功,说明操做匹配成功
                //如果修改失败,说明别其余线程抢先匹配了,那么就往队列后继续查找匹配
                if (p.casItem(item, e)) { // match

                    //本次操做已经与p匹配成功,那么p以前的结点要么是被匹配过,要么已经被取消
                    //都不能再作为head了,所以,这里须要将head更新
                    for (Node q = p; q != h;) {
                        //获取后继结点
                        Node n = q.next;  // update by 2 unless singleton

                        //判断head是否仍是h,如果,说明head还没被其余线程更新过,那当前线程能够尝试更新
                        //如果更新成功,说明h结点已经被移除出队了,那么就须要将其后继指针指向自身表明
                        //这个结点已被移除出队,方便GC回收
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry

                        //到这说明head已经被更新过,或是当前线程要更新head失败,那么就从新获取head并判断
                        //head是否为null(是不是空队列),如果则直接结束循环;若不是,再继续判断其后继结点
                        //是否为null(head是不是队列中最后一个结点),如果也直接结束循环(不须要再继续尝试
                        //更新head);若不是,在判断这个后继结点是否已经匹配过,若未匹配,那么也放弃更新head
                        //这里能够总结出,head更新的要求:head不会随着队首被匹配就当即更新,head的更新会滞后
                        //只有当head及其后继都被匹配后,才会对head进行匹配;也就是说队列中要有至少两个结点匹配过
                        //会触发head的更新(即松弛度>2才更新head)
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);    //p结点匹配成功,唤醒等待该结点的线程
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            //往p的后继继续查找未匹配的结点
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        //到这说明队列中的操做与本次操做相同,只能将操做入队
        //判断本次操做的模式,NOw为不等待,当即返回的模式
        if (how != NOW) {                 // No matches available

            //s未初始化的话,进行初始化
            if (s == null)
                s = new Node(e, haveData);
            Node pred = tryAppend(s, haveData);    //尝试添加到队尾,并返回其前驱
            if (pred == null)    //返回前驱为null,说明添加失败,从新开始
                continue retry;           // lost race vs opposite mode

            //添加的结点不为异步模式,说明是同步或超时模式,那么要等待匹配
            //若为异步模式,则不须要等待匹配,由于异步模式必然是add,offer,put
            //三个方法,不须要等待
            if (how != ASYNC)    
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

//结点自环
final void forgetNext() {
    UNSAFE.putObject(this, nextOffset, this);
}

//尝试添加结点到队尾
private Node tryAppend(Node s, boolean haveData) {

    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail

        //判断队列是否为空,为空的话,直接更新head结点为s后结束,即空队列直接入队
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;                 // initialize
        }

        //判断结点是否符合入队要求,即验证s结点的操做类型与p是否相同
        else if (p.cannotPrecede(haveData))
            return null;                  //不一样,直接返回null

        //判断p是否有后继,新增结点是要添加到队尾的,而tail是可能滞后于队尾的,
        //且其余线程也可能抢先更新队尾,所以若p的后继不为null,说明当前p不是真正的
        //队尾,须要推动查找队尾。
        else if ((n = p.next) != null)
            p = p != t && t != (u = tail) ? (t = u) : (p != n) ? n : null;  

        //n为null,说明找到队尾了,此时须要将p的后继更新成s,如果更新失败说明有其
        //它线程抢先了,那么就从新获取队尾,再尝试    
        else if (!p.casNext(null, s))
            p = p.next;                   // 继续查找队尾

        //成功将s入队
        else {

            //此时,若p不等t,说明t不是队尾,能够看看tail需不须要更新
            if (p != t) {                 // update if slack now >= 2

                /**
                * 判断是否须要更新tail,如果当前的tail离
                * 真正的队尾不超过2个结点,那就暂时不更新tail
                * 如果超过的话,就更新tail
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

//阻塞或自旋等待匹配
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {

    //计算截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();    //获取当前线程
    int spins = -1; // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    //死循环
    for (;;) {
        Object item = s.item;    //s结点的数据

        //判断s是否被匹配过,未匹配时item==e,匹配过或取消后item就会改变
        if (item != e) {                  // matched
            // assert item != s;

            //s被匹配过,那么须要将item设为s自己,且waiter要恢复成null
            s.forgetContents();           // avoid garbage
            return LinkedTransferQueue.<E>cast(item);    //返回数据
        }

        //判断s所在的线程是否被中断过或者超时时间是否到了,那么就须要取消本次s
        //结点对应的操做了(将s.item设为s)
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            unsplice(pred, s);    //将s移除出队列
            return e;
        }

        //下面都是进行的超时阻塞或者自旋操做
        //判断自旋次数是否初始化过    
        if (spins < 0) {                  // establish spins at/near front

            //初始化自旋次数,即计算自旋次数
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            --spins;    //自旋次数递减
            // 生成随机数来让出CPU时间
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            s.waiter = w;                 //设置等待线程
        }
        else if (timed) {
            //计算超时等待时间
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);    //超时阻塞
        }
        else {
            LockSupport.park(this);    //阻塞
        }
    }
}

//将s结点移除出队列,即解除和前驱结点的连接
final void unsplice(Node pred, Node s) {

    //将s.item设为s自己,且waiter要恢复成null
    s.forgetContents(); // forget unneeded fields

    //当s的前驱不为null,且前驱与s不相同的条件下才能进行解除连接
    if (pred != null && pred != s && pred.next == s) {
        Node n = s.next;    //获取s结点的后继

        //判断s是否有后继(s是否为队尾),若s有后继那么后继是否是s自己(s是否已匹配
        //或取消了),若后继不是s自身,那么就尝试将pred的后继结点更新成s的后继n,若
        //是更新成功,再判断pred是否已经被匹配过或取消了
        if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) {

            //更新head
            for (;;) {               // check if at, or could be, head
                Node h = head;    //获取head

                //h为pred或s或队列已空,那就不须要更新了,直接返回
                if (h == pred || h == s || h == null)
                    return;          // at head or list empty

                //如果h未被匹配过,说明不须要更新,退出当前循环
                if (!h.isMatched())
                    break;
                //获取h的后继
                Node hn = h.next;
                if (hn == null)
                    return;          // now empty
                //只要h未被匹配或取消,就尝试更新head
                if (hn != h && casHead(h, hn))
                    //将h结点移除出队,h.next==h
                    h.forgetNext();  // advance head
            }

            //s节点被移除后,须要记录删除的操做次数,若是超过阀值,则须要清理队列
            if (pred.next != pred && s.next != s) {     // 从新检查移除是否成功
                for (;;) {           // sweep now if enough votes
                    int v = sweepVotes;    //返回当前删除次数的记录
                
                    //判断是否超过阈值,没超过就更新记录,超过就将记录恢复为0
                    //而且清理队列
                    if (v < SWEEP_THRESHOLD) {
                        if (casSweepVotes(v, v + 1))
                            break;
                    }
                    else if (casSweepVotes(v, 0)) {
                        sweep();    //清理队列
                        break;
                    }
                }
            }
        }
    }
}

//清理队列
private void sweep() {

    //遍历队列
    for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
        //判断s是否被匹配过,未被匹配过就继续向后遍历
        if (!s.isMatched())
            // Unmatched nodes are never self-linked
            p = s;

        //s节点被匹配,可是是尾节点,则退出循环,队尾就算被匹配了也不能直接
        //移除
        else if ((n = s.next) == null) // trailing node is pinned
            break;

        //判断s是否已经被移除了,如果,则从新从head开始清理
        else if (s == n)    // stale
            // No need to also check for p == s, since that implies s == n
            p = head;
        else
            p.casNext(s, n);    //移除s出队列
    }
}

    以上就是xfer的所有过程了,一个xfer方法直接包含了LinkedTransferQueue的全部功能,不只add,put,offer方法是由其实现的,其余的如poll,take,transfer,tryTransfer方法也均是由其实现的,只不过参数不一样:

public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) == null;
}


public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}


public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}


public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}


public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}


public E poll() {
    return xfer(null, false, NOW, 0);
}

6.LinkedTransferQueue中主要方法流程

    1.offer,add,put三个异步放入数据的操做的大体过程以下:

 

    2.take同步取出数据的大体流程以下:

 

 

    3.transfer同步放入数据的流程大体以下:

 

    4.tryTransfer带超时设置的放入数据的流程大体以下:

 

    5.poll带超时设置的取出数据的流程大体以下:

 

    6.tryTransfer不作任何等待放入数据(只作一次放入数据的尝试,失败直接结束)的流程大体以下:

 

    7.poll只进行一次取出数据的操做,失败直接返回null,大体过程以下:

 

7.其余方法

//查找队列中第一个item不为null或结点自己的结点,将其item返回,不移除出队列
public E peek() {
    return firstDataItem();
}

//遍历队列查找item不为null也不指向结点自身的结点,返回其item
private E firstDataItem() {

    //遍历队列
    for (Node p = head; p != null; p = succ(p)) {
        Object item = p.item;
        
        if (p.isData) {
            if (item != null && item != p)
                return LinkedTransferQueue.<E>cast(item);
        }
        else if (item == null)
            return null;
    }
    return null;
}

//获取结点的后继,若后继为自身(即已被移除出队),那么返回head
final Node succ(Node p) {
    Node next = p.next;
    return (p == next) ? head : next;
}

//返回队列中的结点数
public int size() {
    return countOfMode(true);
}

//计算结点数
private int countOfMode(boolean data) {
    int count = 0;

    //遍历队列计算有效的结点数
    for (Node p = head; p != null; ) {
        if (!p.isMatched()) {
            if (p.isData != data)
                return 0;
            if (++count == Integer.MAX_VALUE) // saturated
                break;
        }
        Node n = p.next;
        if (n != p)
            p = n;
        else {
            count = 0;
            p = head;
        }
    }
    return count;
}

//删除队列查询到的第一个item为o的结点
public boolean remove(Object o) {
    return findAndRemove(o);
}

private boolean findAndRemove(Object e) {
    if (e != null) {

        //遍历队列查找删除
        for (Node pred = null, p = head; p != null; ) {
            Object item = p.item;
            if (p.isData) {

                //判断结点是不是须要删除的数据
                if (item != null && item != p && e.equals(item) &&
                    p.tryMatchData()) {
                    unsplice(pred, p);    //将结点移除出队列
                    return true;
                }
            }
            else if (item == null)    //item为null,说明队列中都是取出数据的操做,不可能有e了
                break;
            pred = p;
            if ((p = p.next) == pred) { // stale
                pred = null;
                p = head;
            }
        }
    }
    return false;
}
相关文章
相关标签/搜索