Java多线程框架源码阅读之---ReentrantLock非公平锁

部分段落来自于http://javadoop.com/post/Abst...,他的文章至关不错。java

ReentrantLock基于Sync内部类来完成锁。Sync继承于AbstractQueuedSynchronizer。Sync有两个不一样的子类NonfairSync和FairSync。node

ReentrantLock的大部分方法都是基于AbstractQueuedSynchronizer实现,大部分仅仅是对AbstractQueuedSynchronizer的转发。所以,了解AbstractQueuedSynchronizer就很是重要。app

做为AbstractQueuedSynchronizer的实现者须要实现isHeldExclusively,tryAcquire,tryRelease,(可选tryAcquireShared,tryReleaseShared)less


那么咱们看看对于一个经常使用的套路,ReentrantLock是如何实现同步的oop

lock.lock();
try{
   i++;
}finally {
   lock.unlock();
}

lock.lock()内部实现为调用了sync.lock(),以后又会调用NonfairSync或FairSync的lock(),你看果真重度使用了AQS吧,这里咱们先记住这个位置,一会咱们还会回来分析。post

public void lock() {
    sync.lock();
}

先介绍一下AQS里面的属性,不复杂就4个主要的属性:AQS里面阻塞的节点是做为队列出现的,维护了一个head节点和tail节点,同时维护了一个阻塞状态,若是state=0表示没有锁,若是state>0表示锁被重入了几回。
注意head是一个假节点,阻塞的节点是做为head后面的节点出现的。
aqs-0.pngui

// 头结点,你直接把它当作 当前持有锁的线程 多是最好理解的
private transient volatile Node head;
// 阻塞的尾节点,每一个新的节点进来,都插入到最后,也就造成了一个隐视的链表
private transient volatile Node tail;
// 这个是最重要的,不过也是最简单的,表明当前锁的状态,0表明没有被占用,大于0表明有线程持有当前锁
// 之因此说大于0,而不是等于1,是由于锁能够重入嘛,每次重入都加上1
private volatile int state;
// 表明当前持有独占锁的线程,举个最重要的使用例子,由于锁能够重入
// reentrantLock.lock()能够嵌套调用屡次,因此每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

接着看一下FairSync和NonfairSync的实现,FairSync和NonfairSync都继承了Sync,并且Sync又继承了AbstractQueuedSynchronizer。能够看到FairSync和NonfairSync直接或间接的实现了isHeldExclusively,tryAcquire,tryRelease这三个方法。this

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
     //若是没有锁上,则设置为锁上并设置本身为独占线程
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
     //若是锁上了,并且独占线程是本身,那么从新设置state+1,而且返回true
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
     //不然返回false
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //若是没有人锁上,那么就设置我本身为独占线程,不然再acquire一次
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //调用到了AQS的acquire里面
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

以前咱们说到回到ReentrantLock的lock()调用了sync.lock();如今咱们回来看看非公平锁的逻辑是:若是抢到锁了,则设置本身的线程为占有锁的线程,不然调用acquire(1),这个是AQS的方法。spa

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire会调用tryAcquire,而这个是对于不一样的实现是不同的,非公平锁NonfairSync里面的tryAcquire,而tryAcquire又会调用到Sync的nonfairTryAcquire。总之tryAcquire在非公平锁场景下尝试去获取锁,若是获取上了,则置一下AQS状态state,并设置本身为独占线程,并支持重入锁功能。线程

addWaiter方法用于建立一个节点(值为当前线程)并维护一个双向链表。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

如今说一下Node的结构,主要有用的field为waitStatus,prev,next,thread。waitStatus目前仅要了解1,0,-1就够了。 0是默认状态,1表明争取锁取消,-1表示它的后继节点对应的线程须要被唤醒。也就是说这个waitStatus其实表明的不是本身的状态,而是后继节点的状态。能够看见默认进队的节点的waitStatus都是0

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个int常量是给waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示当前node的后继节点对应的线程须要被唤醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,因此略过吧
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 一样的不分析,略过吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值为上面的一、-一、-二、-3,或者0(之后会讲到)
    // 这么理解,暂时只须要知道若是这个值 大于0 表明此线程取消了等待,
    // 也许就是说半天抢不到锁,不抢了,ReentrantLock是能够指定timeouot的。。。
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程本尊
    volatile Thread thread;
}

acquireQueued的做用是从等待队列中尝试去把入队的那个节点去作park。另外当节点unpark之后,也会在循环中将本身设置成头结点,而后本身拿到锁

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //对于队首节点,刚才也许没有抢到锁,如今也许能抢到了,再试一次
                if (p == head && tryAcquire(arg)) {
                    //若是抢到了锁,这个入队的节点根本不须要park,直接能够执行
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //若是不是队首节点,或者是队首可是没有抢过其余节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire。这个方法说的是:"当前线程没有抢到锁,是否须要挂起当前线程?第一个参数是前驱节点,第二个参数才是表明当前线程的节点。注意由于默认加入的节点的状态都是0,这个方法会进来两次,第一次进来走到else分支里面修改前置节点的waitStatus为-1.第二次进来直接返回true。对于刚加入队列的节点,修改head节点的waitStatus为-1,对于后来加入的节点,修改它前一个节点的waitStatus为-1。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt的代码很简单,这个this就是ReentrantLock类的实例。阻塞了当前线程。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

再来看看怎么解锁。

public void unlock() {
    sync.release(1);
}

调用到AQS里面,若是锁被彻底释放了,那么就unpark head的下一个

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease是由Sync覆盖的。重置AQS里面的state,返回锁是否被彻底释放了的判断。

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //下面的代码就是唤醒后继节点,可是有可能后继节点取消了等待(waitStatus==1)
        // 从队尾往前找,找到waitStatus<=0的全部节点中排在最前面的 
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

等到unpark之后,parkAndCheckInterrupt的阻塞解除,将继续for无限循环,由于是队列里是一个一个阻塞的,此时阻塞节点的前置依次都是head,所以if (p == head && tryAcquire(arg)) 这句话若是它醒来抢锁成功了将执行成功,阻塞的线程获取锁并执行,将本身设置成head,同时也将本身从队列中清除出去。 注意这里是非公平锁,所以在tryAcquire有可能尚未抢过其余线程,那么抢到的那个将会直接执行,而没有抢到的,又在循环里锁住了。

相关文章
相关标签/搜索