AbstractQueuedSynchronizer

1 简介

AbstractQueuedSynchronizer简称AQS是一个抽象同步框架,能够用来实现一个依赖状态的同步器。
JDK1.5中提供的java.util.concurrent包中的大多数的同步器(Synchronizer)如Lock, Semaphore, Latch, Barrier等,这些类之间大多能够互相实现,如使用Lock实现一个Semaphore或者反过来,可是它们都是基于java.util.concurrent.locks.AbstractQueuedSynchronizer这个类的框架实现的,理解了这个稍微复杂抽象的类再去理解其余的同步器就很轻松了。java

2 原理介绍

AQS的核心是一个线程等待队列,采用的是一个先进先出FIFO队列。用来实现一个非阻塞的同步器队列有主要有两个选择Mellor-Crummey and Scott (MCS) locks和Craig, Landin, and Hagersten (CLH) locks的变种。CLH锁更适合处理取消和超时,因此AQS基于CLH进行修改做为线程等待队列。
CLH队列使用pred引用前节点造成一个队列,入队enqueue和出队dequeue操做均可以经过原子操做完成。node

 

在 AQS 内部,经过维护一个FIFO 队列来管理多线程的排队工做。在公平竞争的状况下,没法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会经过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。大体示意图以下:算法

当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将本身设为头结点。大体示意图以下:多线程

其中每一个节点包含以下状态:app

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
CANCELED表示线程等待已经取消,是惟一一个大于0的状态。
SINALG表示须要唤醒next节点
CONDITION代表线程正在等待一个条件
PROPAGATE用于acquireShared中向后传播

3 acquire

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

一、若是尝试获取锁成功整个获取操做就结束,不然转到2. 尝试获取锁是经过方法tryAcquire来实现的,AQS中并无该方法的具体实现,只是简单地抛出一个不支持操做异常,在AQS简介中谈到tryAcquire有不少实现方法,这里再也不细化,只须要知道若是获取锁成功该方法返回true便可;框架

二、若是获取锁失败,那么就建立一个表明当前线程的结点加入到等待队列的尾部,是经过addWaiter方法实现的,来看该方法的具体实现:oop

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

该方法建立了一个独占式结点,而后判断队列中是否有元素,若是有(pred!=null)就设置当前结点为队尾结点,即将当前节点插入到尾节点的后面,而后返回;ui

若是没有元素(pred==null),表示队列为空,走的是入队操做this

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法采用的是变种CLH算法,先看头结点是否为空,若是为空就建立一个傀儡结点,头尾指针都指向这个傀儡结点,这一步只会在队列初始化时会执行;atom

若是头结点非空,就采用CAS操做将当前结点插入到头结点后面,若是在插入的时候尾结点有变化,就将尾结点向后移动直到移动到最后一个结点为止,而后再把当前结点插入到尾结点后面,尾指针指向当前结点,入队成功。

三、将新加入的结点放入队列以后,这个结点有两种状态,要么获取锁,要么就挂起,若是这个结点不是头结点的后继节点,就看看这个结点是否应该挂起,若是应该挂起,就挂起当前结点,是否应该挂起是经过shouldParkAfterFailedAcquire方法来判断的  

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //判断前驱节点是不是头节点,若是是,则循环获取同步
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //若是获取同步状态失败或者前驱节点不是头节点,则判断是否将当前节点挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }    

  

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
/**
 * 该方法主要用途是,当线程在获取同步状态失败时,根据前驱节点的等待状态,决定后续的动做。好比前驱
 * 节点等待状态为 SIGNAL,代表当前节点线程应该被阻塞住了。不能总是尝试,避免 CPU 忙等。
 *    —————————————————————————————————————————————————————————————————
 *    | 前驱节点等待状态 |                   相应动做                     |
 *    —————————————————————————————————————————————————————————————————
 *    | SIGNAL         | 阻塞                                          |
 *    | CANCELLED      | 向前遍历, 移除前面全部为该状态的节点               |
 *    | waitStatus < 0 | 将前驱节点状态设为 SIGNAL, 并再次尝试获取同步状态   |
 *    —————————————————————————————————————————————————————————————————
 */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //首先检查前趋结点的waitStatus位,若是为SIGNAL,表示前趋结点会通知它,那么它能够放心大胆地挂起了
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            //若是前趋结点是一个被取消的结点怎么办呢?那么就向前遍历跳过被取消的结点,直到找到一个没有被取消的结点为止,将找到的这个结点做为它的前趋结点,
       //将找到的这个结点的waitStatus位设置为SIGNAL,返回false表示线程不该该被挂起,继续尝试获取锁 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 等待状态为 0 或 PROPAGATE,设置前驱节点等待状态为 SIGNAL, * 并再次尝试获取同步状态。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() {
    // 调用 LockSupport.park 阻塞本身
    LockSupport.park(this);
    return Thread.interrupted();
}

 4 release

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  

一、release过程比acquire要简单,首先调用tryRelease释放锁,若是释放失败,直接返回;

二、释放锁成功后须要唤醒继任结点,是经过方法unparkSuccessor实现的

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

一、node参数传进来的是头结点,首先检查头结点的waitStatus位,若是为负,表示头结点还须要通知后继结点,这里不须要头结点去通知后继,所以将该该标志位清0.

二、而后查看头结点的下一个结点,若是下一个结点不为空且它的waitStatus<=0,表示后继结点没有被取消,是一个能够唤醒的结点,因而唤醒后继结点返回;若是后继结点为空或者被取消了怎么办?寻找下一个可唤醒的结点,而后唤醒它返回。

这里并无从头向尾寻找,从队列尾部开始向前查找,找到队列最前面没有被取消的节点,而后,将其唤醒。

为何须要从队列尾部开始向前查找呢?

由于在CLH队列中的结点随时有可能被中断,被中断的结点的waitStatus设置为CANCEL,并且它会被踢出CLH队列,如何个踢出法,就是它的前趋结点的next并不会指向它,而是指向下一个非CANCEL的结点,而它本身的next指针指向它本身。一旦这种状况发生,如何从头向尾方向寻找继任结点会出现问题,由于一个CANCEL结点的next为本身,那么就找不到正确的继任接点。

有的人又会问了,CANCEL结点的next指针为何要指向它本身,为何不指向真正的next结点?为何不为NULL?

第一个问题的答案是这种被CANCEL的结点最终会被GC回收,若是指向next结点,GC没法回收。

相关文章
相关标签/搜索