从ReentrantLock分析AbstractQueuedSynchronized源码

1.示例代码

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();

2.ReentrantLock构造方法

private final Sync sync;

//空构造的状况建立一个非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}

//传boolean值,true的状况建立一个公平锁,false建立一个非公平锁
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
}

3.Sync,FairSync,AbstractQueuedSynchronizer继承关系

AbstractQueuedSynchronizer提供了通用方法,但其tryAcquire方法须要子类FairSync本身实现node

4.Node节点

AbstractQueuedSynchronized内部维护了一个双向链表,经过waitStatus表示当前线程等待状态多线程

具体结构以下图:测试

阻塞队列不包含 head 节点,这里要先有一个概念,为何呢?由于首节点是一个虚节点的概念,不存储数据,从addWaiter方法中的enq方法内能够看出来。在添加一个节点进队列的时候,第一次添加的时候会构建一个空节点。ui

private Node enq(final Node node) {
    for (;;) {
        //获取当前尾节点
        Node t = tail;
        //若是当前尾节点为空
        if (t == null) { 
            //构造一个空节点并赋值给head节点
            if (compareAndSetHead(new Node()))
                //将尾节点指向当前空的首节点
                tail = head;
        } else {
            //循环第二次进来,尾节点一点不为空
            //设置当前节点的前置节点为以前的尾节点
            node.prev = t;
            //设置当前节点为尾节点
            if (compareAndSetTail(t, node)) {
                //设置初始化为空的尾节点的后继节点为当前节点
                t.next = node;
                //返回空节点
                return t;
            }
        }
    }
}
static final class Node {
 //表明节点当前在共享模式下
    static final Node SHARED = new Node();
 //表明当前节点在独占模式下
    static final Node EXCLUSIVE = null;

    //当前节点线程取消争抢锁
    static final int CANCELLED =  1;
    //当前node节点的后继节点的线程须要被唤醒
    static final int SIGNAL    = -1;
    //codition队列节点属性
    static final int CONDITION = -2;
    //这里不讨论
    static final int PROPAGATE = -3;

    //取值为上面的1,-1,-2,-3或者默认值0
    volatile int waitStatus;

    //前驱节点
    volatile Node prev;
    
    //后继节点
    volatile Node next;
    
    //当前node节点对应的线程
    volatile Thread thread;
    
    //
    Node nextWaiter;
    
    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    //获取队列的首节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }

}

5.state属性

state值表明当前线程是否获取锁this

//AbstractQueuedSynchronizer的核心属性,大于0说明线程持有锁,每重入一次该值+1
//所谓的重入能够理解为lock.lock()调用屡次
private volatile int state;

//父类AbstractQueuedSynchronizer的方法
//经过UNSAFE类设置内存地址中state的属性值,当state>=1的时候,说明当前线程持有锁
//线程每进行一次重入,state值会+1
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

6.从ReentrantLock公平锁分析AbstractQueuedSynchronized

公平锁和非公平锁的区别在于,非公平锁在入队以前,进行了2次CAS操做进行state值的设置,而公平锁只在tryAcquire中首先进行了hasQueuedPredecessors的判断以后,才进行了CAS操做,以后二者的逻辑同样,竞争激烈的时候线程都会被构形成Node节点进入阻塞队列。spa

加锁过程方法调用图,方便分析代码调用过程线程

6.1.加锁过程分析

6.1.1.AbstractQueuedSynchronized.acquire方法分析

//加锁核心方法入口
public final void acquire(int arg) {
    //条件1:FairSync.tryAcquire尝试获取锁,获取成功返回true,失败返回false
    //条件2:addWaiter将当前线程封装成Node加入到双向队列中
    //acquireQueued为核心方法,包含阻塞当前线程,清除队列中CANCELED状态线程
    //返回true表示
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();//设置当前线程的中断状态,这里涉及interrupt()和interrupted()方法的概念
}

interrupt()只设置线程的中断状态为true,并不能中断线程;blog

interrupted()测试当前线程是否已经被中断,并清除线程的中断状态;继承

6.1.2.FairSync.tryAcquire方法分析

aqs将tryAcquire交给子类去实现,这里示例为FairSync队列

//尝试获取锁
protected final boolean tryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取当前线程状态
    int c = getState();
    //c==0说明当前线程还未获取锁
    if (c == 0) {
        //条件1:判断当前队列中是否有其余线程在等待,没有则返回false
        //条件2:条件1中队列中没有其余线程在等待,尝试修改state值,修改为功则表明加锁成功
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //设置当前线程为独占线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //c>0说当前线程已经获取锁,判断当前线程和独占线程是否相等
    else if (current == getExclusiveOwnerThread()) {
        //将state的值递增
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        //更新state的值
        setState(nextc);
        return true;
    }
    return false;
}

6.1.3.AbstractQueuedSynchronized.hasQueuedPredecessors方法分析

FairSync.tryAcquire里面面有一个hasQueuedPredecessors方法,这是和非公平锁的区别,公平锁这里多了这个方法。用于判断队列中是否已经有其余线程在等待,若是没有,则返回false。由于前面也已经说过head节点不存储数据,只是一个虚节点,因此判断队列中是否有处于等待的节点有如下判断:

1.h != t,若是head和tail相同,也就不用判断了,确定没有处于等待的节点。

2.(s = h.next) == null说明只有首节点,确定没有处于等待的节点。

3.s.thread != Thread.currentThread(),首节点的next节点的线程不和当前线程同样。若是相同,说明当前首节点只有一个next节点,也就是只有一个线程竞争锁资源,能够直接经过CAS竞争锁资源。

这里有个细节,tail的声明在head以前,由于根据tail你必定能够获取head,可是反过来有head就不必定有tail了。由于head确定是在tail以前初始化的。这样在多线程竞争状况下,若是head先声明,tail后声明,就会出现head初始化了但tail还未初始化的过程,使得h!=t等式成立。

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

6.1.4.AbstractQueuedSynchronized.addWaiter方法分析

//将当前线程封装成Node节点并返回,指定为独占模式,则节点的nextWaiter为NULL
//若是当前节点存在尾节点,则将当前节点加入到队列中,并设置当前节点为尾节点
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //将节点加入到队尾当中
    Node pred = tail;
    //若是尾节点不为空
    if (pred != null) {
        //设置当前节点的前驱节点为尾节点
        node.prev = pred;
        //经过cas方式设置当前节点为尾节点
        if (compareAndSetTail(pred, node)) {
            //以前尾节点的后续节点为当前节点
            pred.next = node;
            return node;
        }
    }
    //两种状况走到这里:1.尾节点为空,则队列为空;2.经过CAS设置尾节点失败,存在竞争的时候可能出现
    //自旋方式设置当前节点为尾节点,若是尾节点为空,则初始化一个空节点做为首节点,并将当前节点和空的首
    //节点构成双向队列,返回该空节点
    enq(node);
    return node;
}

6.1.5.AbstractQueuedSynchronized.enq方法分析

自旋方式设置当前节点为尾节点,若是尾节点为空,则初始化一个空节点做为首节点,并将当前节点和空的首节点构成双向队列,返回该空节点

private Node enq(final Node node) {
    for (;;) {
        //获取当前尾节点
        Node t = tail;
        //若是当前尾节点为空
        if (t == null) { 
            //构造一个空节点并赋值给head节点
            if (compareAndSetHead(new Node()))
                //将尾节点指向当前空的首节点
                tail = head;
        } else {
            //循环第二次进来,尾节点一点不为空
            //设置当前节点的前置节点为以前的尾节点
            node.prev = t;
            //设置当前节点为尾节点
            if (compareAndSetTail(t, node)) {
                //设置初始化为空的尾节点的后继节点为当前节点
                t.next = node;
                //返回空节点
                return t;
            }
        }
    }
}

6.1.6.AbstractQueuedSynchronized.acquireQueued方法分析


final boolean acquireQueued(final Node node, int arg) {
    //失败标识
    boolean failed = true;
    try {
        //中断标识
        boolean interrupted = false;
        //自旋
        for (;;) {
            //获取当前节点前驱节点
            final Node p = node.predecessor();
            //若是当前节点为前驱节点为首节点,尝试加锁
            if (p == head && tryAcquire(arg)) {
                //加锁成功则设置当前节点为首节点
                setHead(node);
                //以前头节点的后续节点设置为空,帮助gc
                p.next = null; // help GC
                //失败标识设置为false
                failed = false;
                return interrupted;
            }
            //若是节点的前驱节点不是头节点或者尝试加锁失败
            //shouldParkAfterFailedAcquire判断当前节点线程是否须要阻塞
            //当能阻塞当前线程时,调用parkAndCheckInterrupt方法阻塞线程
            //若是被阻塞,当前自旋操做也走不下去了
            //线程被阻塞在自旋的这里等待唤醒,这里是很是须要注意的,释放锁后,唤醒的下一个节点的
            //代码会当即运行到这一段,而后尝试加锁并把本身设置为首节点,这样就完成了首节点的变动
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //若是阻塞过程发生异常或其余缘由致使失败,将当前线程状态设置为CANCELED,同时从队列中剔除
        if (failed)
            cancelAcquire(node);
    }
}

6.1.7.AbstractQueuedSynchronized.shouldParkAfterFailedAcquire方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取当前节点前驱节点的状态
    int ws = pred.waitStatus;
    //若是前驱节点状态为SIGNAL,当前节点线程能够直接阻塞
    if (ws == Node.SIGNAL)
        return true;
    //若是前驱节点的状态为CANCELLED,从当前节点向前循环获取第一个waitStatus不为CANCELLED的节点,
    //并将找到的前驱节点和当前节点构成双向队列,剔除CANCELLED状态的节点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {//若是ws=0,设置当前节点的前驱节点的状态为SIGNAL
        //经过cas设置前驱节点的状态为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

6.1.8.AbstractQueuedSynchronized.parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
    //阻塞当前线程,当前线程已被中断
    LockSupport.park(this);
    //清除当前线程的中断状态并测试线程是否已被中断,这里返回true表示线程已经被中断
    //虽然这里清除了线程的中断状态,但acquire方法里面调用了selfInterrupt()设置了线程的中断状态
    return Thread.interrupted();
}

6.1.9.AbstractQueuedSynchronized.cancelAcquire方法

取消加锁操做,将当前Node从队列中移除,构建新的队列,移除构建新的队列过程当中遍历到的CANCELED状态的节点,根据当前节点所处位置分析是否须要唤醒当前节点的后续节点。

这段代码首先干了这几件事:

首先将当前节点的线程设置为空,状态设置为CANCELED,从后向前找到当前节点第一个状态不为CANCELED的前驱节点,并把当前节点的前驱节点设置为找到的这个节点。

再来就是构建新的双向队列,根据当前节点所处的位置有三种状况:

(1).当前节点是尾节点。这种状况咱们要作什么呢?前面咱们已经从后向前获取到了当前节点的前驱节点中第一个不为CANCELED状态的节点,那么是否就须要将全部的CANCELED状态的节点和当前节点剔除呢?没错,这里面就作了这件事。把找到的前驱节点设置为尾节点,把找到的前驱节点的后续节点设置为空,这样就造成了新的双向队列。

(2).当前节点不是尾节点,也不是head的后继节点,对应的就是这个条件判断pred != head。这种状况节点就处于中间了。

相似结构:head<->node1<->node2<->...<->pred(searchNode)<->...(canceled状态节点)<->currentNode<->next<->...<->tail,...(canceled状态节点)<->currentNode这之间的节点都须要剔除,而后将pred(searchNode)<->next构成新的链表。

那么这里是否还须要将找到的pred节点的状态设置为SIGNAL呢,表明后续节点须要被唤醒。

(3).当前节点是head节点的后继节点,须要唤醒当前节点的后继节点,具体方法在unparkSuccessor中

private void cancelAcquire(Node node) {
    if (node == null)
        return;
 //将当前节点线程设置为空
    node.thread = null;
 //获取当前节点的前驱节点
    Node pred = node.prev;
    //从后向前循环,找到当前节点的前驱节点中第一个不为CANCELED状态的节点
    //并设置当前节点的前驱节点为找到的节点
    while (pred.waitStatus > 0)
        //node.prev = pred->设置当前节点的前驱节点
        //pred = pred.prev
        node.prev = pred = pred.prev;

    //获取找到的前驱节点的后续节点
    //多是CANCELLED状态的节点,也多是当前节点本身
    Node predNext = pred.next;

    //设置当前节点的状态为CANCELLED
    node.waitStatus = Node.CANCELLED;

    //(1)
    //1.若是当前节点是尾节点
    //则设置找到的当前节点不为CANCELLED状态的节点为尾节点,为CANCELLED状态的节点能够从队列中剔除了
    if (node == tail && compareAndSetTail(node, pred)) {
        //同时设置找到的当前节点不为CANCELLED状态的前驱节点的尾节点为空,剔除队列中CANCELLED状态的节点
        compareAndSetNext(pred, predNext, null);
    } else {
        //(2)
        int ws;
        //2.当前节点不是head的next节点也不是尾节点
        //条件1:当前节点不是head的next节点也不是尾节点
        //条件2.1:(ws = pred.waitStatus) == Node.SIGNAL,说明当前节点找到的前驱节点状态是SIGNAL
        //若是不是SIGNAL,也可能为0
        //条件2.2:ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
        //若是前驱节点的状态<=0,则设置找到的前驱节点的状态为SIGNAL,表示须要唤醒后继节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            //获取当前节点的后续节点
            Node next = node.next;
            //若是当前节点后续节点不为空且状态<=0
            if (next != null && next.waitStatus <= 0)
                //设置当前节点找到的前驱节点的后继节点为当前节点的next节点,剔除CANCELLED状态的节点
                compareAndSetNext(pred, predNext, next);
        } else {
            //(3)
            //当前节点是head的next节点,须要唤醒当前节点的后继节点,具体方法在unparkSuccessor中
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

6.1.10.AbstractQueuedSynchronized.unparkSuccessor方法

private void unparkSuccessor(Node node) {
 //获取当前节点状态
    int ws = node.waitStatus;
    //若是状态是<0,则设置当前节点状态为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
 //若是ws > 0
    //获取当前节点的后续节点
    Node s = node.next;
    //若是后续节点空或者状态为CANCELED
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从后向前找到最靠近当前节点的状态<1的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //若是找到了这个节点,解除线程的阻塞
    if (s != null)
        LockSupport.unpark(s.thread);
}

6.2.释放锁过程分析

lock.unlock();

这段代码主要调了AbstractQueuedSynchronized的release方法。

6.2.1.分析AbstractQueuedSynchronized的release方法

public final boolean release(int arg) {
    //尝试释放锁,当state=0的时候返回true
    if (tryRelease(arg)) {
        Node h = head;
        //当首节点不为空且首节点状态不为初始化的状态的状况,唤醒后续节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//递减c的值,该方法也是aqs交给子类实现的,这里的具体实现是在Sync当中
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //当c的值递减为0的时候
    if (c == 0) {
        free = true;
        //设置独占线程为空
        setExclusiveOwnerThread(null);
    }
    //设置state的值为0
    setState(c);
    return free;
}

6.2.2.释放锁变动首节点的逻辑和next节点加锁逻辑

acquireQueued方法里面,全部处于阻塞过程当中的线程的代码都卡在了自旋的这里,若是当前首节点释放锁,当前首节点的next节点当即从自旋的代码这里恢复,并尝试加锁并把本身变成首节点,这样就完成了已执行完的节点的出队操做。并且本身成为首节点执行完成释放锁时,也把本身的线程设置为空了,而后继续唤醒下一个节点,下一个节点的线程卡在自旋代码块中,而后解除阻塞继续执行。

//acquireQueued方法的自旋逻辑,全部被阻塞的节点的线程都会被卡在这里等待唤醒
for (;;) {
    //获取当前节点前驱节点
    final Node p = node.predecessor();
    //若是当前节点为前驱节点为首节点,尝试加锁
    if (p == head && tryAcquire(arg)) {
        //加锁成功则设置当前节点为首节点
        setHead(node);
        //以前头节点的后续节点设置为空,帮助gc
        p.next = null; // help GC
        //失败标识设置为false
        failed = false;
        return interrupted;
    }
    //若是节点的前驱节点不是头节点或者尝试加锁失败
    //shouldParkAfterFailedAcquire判断当前节点线程是否须要阻塞
    //当能阻塞当前线程时,调用parkAndCheckInterrupt方法阻塞线程
    //若是被阻塞,当前自旋操做也走不下去了
    //线程被阻塞在自旋的这里等待唤醒,这里是很是须要注意的,释放锁后,唤醒的下一个节点的
    //代码会当即运行到这一段,而后尝试加锁并把本身设置为首节点,这样就完成了首节点的变动
    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
}
相关文章
相关标签/搜索