JDK核心源码之ReenrantLocak源码注释

 

        AQS的java的同窗面试常常被问到的一个问题。不少同窗被面到这个问题的时候,一脸蒙圈。可是说实话,这个AQS对于java的同窗来讲应该是一个比较重要的知识,由于咱们不少并发的对象都是基于这个实现的,因此考察java同窗的并发知识的功底,问这个AQS也是一个质量比较好的问题。java

         AQS的全称是AbstractQueuedSynchronizer(文中就叫:抽象队列),说白了其实就是一个抽象类。node

是咱们并发包里面的基石,重入锁,读写锁,不少并发的工具都是基于它实现的。因此理解好AQS是什么东西对于掌握好并发知识是有帮助的。面试

     不过,咱们若是分析AQS的时候,直接就读AQS的代码有点无聊,咱们就分析ReentantLock的源码,由于咱们平时真正使用的时候咱们使用的是ReentantLock,而不是AQS。可是ReentantLock就是基于AQS实现的。并发

     在分析AQS以前,我先给你们解析一下AQS的原理。以下图所示:工具

图展现的一个ReentantLocak(基于非公平锁分析)加锁的过程。ui

  1. 首先是线程一过来了,state的初始值是0,当前线程为null。由于历来没有人加过锁,因此线程一很容易就加锁成功了。加锁成功了就是把state0变为1,当前线程设置为本身的名字,好比就是线程1.
  2. 接下来,线程一再次进来进行重入加锁,发现当前的state1,可是当前线程是本身,那么直接就让state1,由1变为2,这样就完成了可重入加锁。
  3. 接着线程二进来了,线程二进来发现state2,可是当前线程不是本身,因此加锁就失败了。

加锁失败了之后线程二就加入到队列里面去(AQS内部实现了一个双向队列)this

  1. 再接着线程三就进来了,遇到的情况跟线程二同样,因此跟着也就到了队列里面了。

 

AQS重入加锁大概就是这么个原理。咱们接下来分析一下它的底层源码。我就以ReentantLocak为例写个例子。spa

public class ReentrantLockDemo {
    ReentrantLock reentrantLock = new ReentrantLock();
    int sum=0;
    public void count(){
//这个是加锁的代码
//咱们这次主要基于非公平锁分析源码,等到合适的时机再给你们
//解释公平锁和非公平锁的区别。
//首先咱们开始分析lock的方法。
        reentrantLock.lock();
        for (int i = 0; i < 10; i++) {
            sum++;
            System.out.println(sum);
        }
//这个是咱们释放锁的代码
        reentrantLock.unlock();
    }
    }
}
final void lock() {
//线程一第一次进来之间使用cas操做修改state的值
//这句代码的语义就是当前state的值是否为0,若是是0,那么就把修改成1。
    if (compareAndSetState(0, 1))
//设置当前线程为本身,其实线程一第一次进来加锁的时候,到这儿就加锁成功了!!
        setExclusiveOwnerThread(Thread.currentThread());
    else
//若是线程1第二次进来
//那么由于state不是0了,因此会cas操做失败
//因此会走这个方法 
        acquire(1);
}

接下来咱们分析一下,线程一第二次进来是如何加锁的:
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
咱们慢慢分析这段代码
首先应该分析的是:tryAcquire(arg)方法

因此咱们要想分析acquire方法,那么先分析里面的tryAcouire方法

//执行的是这个方法
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
这个方法调用的是以下的方法:
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程,当前线程固然是线程一喽
    final Thread current = Thread.currentThread();
    //获取当前的state 那么state 是1
int c = getState();
//若是c == 0
//其实咱们知道,代码之因此走到这儿就是由于前面c != 0
//可是jdk的源码为了健壮性,因此这儿仍是再次判断了一下
//意思就是若是当前的state是0,那么直接加锁就能够了。
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
//若是发现上一个加锁也是本身
//那么直接进行重入加锁就能够了
    else if (current == getExclusiveOwnerThread()) {
// 1+1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //修改状态值为2,可重入加锁成功,返回true.
setState(nextc);
        return true;
    }
    return false;
}




若是上个结果返回true以后。咱们再回过头来看这个方法:
public final void acquire(int arg) {
//(!true) 结果就是false
//那么代码就不继续执行了
//也就是说若是是重入加锁,那么这儿加锁成功之后就退出去了。
//换句话说,若是是重入加锁,代码执行到这儿重入加锁也就成功了!!
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如今咱们的的 线程是1,state=2
接下来咱们在分析一个状况,线程2进来了。咱们的代码又是如何走的。
首先state这个时候不是0了,那么直接走的是acquire方法。
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
//直接走这个方法
        acquire(1);
}

//咱们再次分析这段代码
//咱们分析一下这个方法。tryAcquire
//经过咱们前面的分析咱们知道,若是重入加锁成功了,那么这儿直接返回的是true
//可是若是发现当前的线程 和 里面加锁的线程不是同一个线程
//那么重入加锁失败。这儿就会返回来false
//若是tryAcquire返回的是false。那么  (!false) = true
//代码就会运行到acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//接下来咱们分析一下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这个方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}



分析acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这个方法
咱们首先须要分析addWariter这个方法。注意Node.EXCLUSIVE这个值是null


//进入到这个方法的时候咱们进入到了 类 AbstractQueuedSynchonzied里面。
private Node addWaiter(Node mode) {
//根据当前线程建立了一个Node
	//这个node的nextWaiter = mode = null
    Node node = new Node(Thread.currentThread(), mode);
    //tail一开始就是等于mull  
Node pred = tail;
//因此第一次进来等式不成立
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//第一次进来代码走的是这儿
    enq(node);
    return node;
}



//调的这个方法,建立来的参数的是当前线程的Node
private Node enq(final Node node) {
    for (;;) {//自旋
//第一次指针有变化
//第二次进来指针仍是有变化
        Node t = tail;
//注意咱们第一次进来,知足这个条件,因此t==null,是知足条件的
//第二次进来,那么这个时候t就不等于Null了,因此这儿的这个条件就不知足了
//去执行else语句
        if (t == null) { // Must initialize
//使用cas设置一个head头
            if (compareAndSetHead(new Node()))
//指针要发生变化
                tail = head;
//接着代码就会执行到这儿,你们必定要注意这儿。
//代码执行到这儿之后,由于这是一个死循环,因此
//执行到这儿之后再次运行。
        } else {
//当前的线程的prev指向t
            node.prev = t;
//使用cas操做设置队列的尾部
//这个cas的意思是当前的tail是否就是t,若是是t
//那么就把值修改成当前node,那很明显,当前的tail就是t
//因此这个就把当前node设置为tail
            if (compareAndSetTail(t, node)) {
                t.next = node;
//返回头结点结束这个死循环
                return t;
            }
        }
    }
}
到目前为止指针变化以下:

 

 

接下来假设线程三要进来了。
若是线程三进来,确定就会走到这段代码。
private Node addWaiter(Node mode) {
//建立线程三Node
    Node node = new Node(Thread.currentThread(), mode);
    //pred指向tail
Node pred = tail;
//此次pred就不等于null
    if (pred != null) {
//当前的node.pred指向 tail
        node.prev = pred;
//判断当前pred是否是tail,若是是
//就把当前node设置为tail,当前tail确定就是pred
        if (compareAndSetTail(pred, node)) {
//pred.netxt指向了线程三的node
            pred.next = node;
//返回当前node
            return node;
        }
    }
    enq(node);
    return node;
}

致使到此队列的指针变化以下:

 

 

到目前为止咱们分析清楚了addWaiter的方法,可是不要忘记了咱们的目标。咱们是分析
acquireQueued的这个方法。咱们只须要知道若是加锁失败了,那么就会调用addWaiter方法,addWaiter方法返回来的是当前的node
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
//须要注意一下咱们这儿是死循环
        for (;;) {
//获取当前节点的上一个节点,那么咱们线程三的
//上一个节点是线程二
            final Node p = node.predecessor();
//线程二不知足这儿条件
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//若是上面一个条件不知足的话,接下来就会调用以下的方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


//传进来的参数是上一个节点和当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//当一个节点刚床架你的时候 waitStatus默认值应该是0
    int ws = pred.waitStatus;
//这个条件不知足
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
//这个条件不知足
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
//那么就会直接执行这个方法。
//这个方法就会把上一个节点的waitStatus 设置为SINGAL
//也就是-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
//而后返回false
    return false;
}

 

咱们再回过头来分析上一段代码:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//刚刚咱们知道这儿的返回值是false
//那么若是是false的话,if条件就不知足了。
//不知足了之后再次执行for循环。
//继续执行shouldParkAfterFailedAcquire方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
当前的队列指针状况以下:

 

 

//再次执行这个方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取状态,如今ws=-1
    int ws = pred.waitStatus;
//符合这个提交
    if (ws == Node.SIGNAL)
//结果返回true
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//这个shouldParkAfterFailedAcquire方法结果为true了之后
//那么接下来执行parkAndCheckInterrupt方法
//因此接下来咱们分析一下parkAndCheckInterrupt方法。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
//这儿操做就比较简单了,这儿就直接把线程挂起,线程就停在这儿不动了
//必需要等另一个线程去执行unpark操做代码才能往下执行。
    LockSupport.park(this);
    return Thread.interrupted();
}

由于代码执行到这儿就已经卡住了。因此咱们回到源头看到如下,最终会让哪段代码卡住。

//这个地方是调动acquireQueued方法致使代码卡住,因此这儿的代码也会卡住不动。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

到目前为止咱们分析了一个加锁失败的线程进去到队列之后的状况。.net

咱们如今能够解释一下公平锁和非公平锁的区别了。
咱们以前的全部的代码分析的都是非公平锁的,非公平锁最大的特色就是在这儿。线程

final void lock() {
//加锁的时候不分青红皂白,也无论队列里面是否有人在排着队
//上来就是直接加锁,因此咱们想一下,假设咱们虽然如今队列里面有线程在排队加锁
//可是恰好当前的独占锁释放锁了,新进来的这个线程就加锁成功了。也就是插队成功了。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

若是是公平锁的话,加锁的时候走的是这个逻辑:
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
//调用这个方法
        acquire(1);
    }

public final void acquire(int arg) {
//首先执行的是这儿的tryAcquire方法
//其实到这儿的代码跟咱们以前看到的代码是同样的。
//可是在往下实现就跟非公平锁那儿不同了
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

 

protected final boolean tryAcquire(int acquires) {
//获取当前线程
        final Thread current = Thread.currentThread();
        int c = getState();
//若是真的当前的state=0
        if (c == 0) {
//须要这儿进行判断,咱们先单独把hasQueuedPredecessors
//代码拿出来分析一下,分析后获得,这个方法是判断队列里面
//是否还有节点了。若是还有节点,那么这个方法就返回true
//!true 就是false,代码就不走这儿了。
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
//这儿返回false
        return false;
    }
}


public final boolean hasQueuedPredecessors() {
 
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
//若是队列里面还有节点
//若是还有节点那么就返回来true
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

 

这样的话,咱们再回过头来看这段代码:
public final void acquire(int arg) {
// tryAcquire方法返回来的是false,那么!flase的结果就是等于true
    if (!tryAcquire(arg) &&
//而后接下来就是走这个方法,那么这个方法,就是跟咱们分析的同样了。
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
因此综上所述,咱们的公平队列就是每次在加锁的时候,先判断队列里面是否有线程,若是有就加到队列后面,若是没有,那么就直接加锁成功。


接下来咱们再分析一个场景,就是重入锁释放锁的逻辑。
public void unlock() {
    sync.release(1);
}

接下来调用的是这段代码:
public final boolean release(int arg) {
//重点调用的是这个方法
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

 

protected final boolean tryRelease(int releases) {
//加锁如今是咱们的线程一进行重入锁的释放,一开始state的值2
//如今传进来的参数releases 是1
//那么c的值是1
    int c = getState() - releases;
//若是释放锁的线程不是当前独占锁的线程,那么就会报错。
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
//若是把整个重入锁都释放完了,那么其实c==0
//可是第一次释放重入锁的时候,这儿c是1
    if (c == 0) {
        free = true;
//若是整个重入锁都释放了,那么就放当前的独占锁置为null
        setExclusiveOwnerThread(null);
    }
//更改线程的重入锁的个数
    setState(c);
//若是整个锁都释放完了,那么返回的是true
//若是只是释放了一部分,那么返回的是false。
    return free;
}

 

 

public final boolean release(int arg) {
//若是线程一有两个锁重入,当前只是减小了一个
//锁重入,那么tryRelease返回值是false。那么这个条件就不知足
  if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
//对两个锁重入,由于第一次进来,那么直接返回的是false。
    return false;
}

其实若是是只释放了线程一的第一个重入锁,那么这个返回值也没什么意义,咱们看到的是,就是把state的值减一了。
接下来咱们继续分析,若是线程一,再释放一个重入锁,也就是state由1变为0了。
咱们回过头来再分析以下代码:
public final boolean release(int arg) {
//条件知足
    if (tryRelease(arg)) {
        Node h = head;
//我这儿的分析是h.waitStatus就是等于0
//可是若是这个等于0的话,咱们的等于就走不下去了,可见这儿的值应该不等于零。
//这样咱们就执行里面的uparkSuccessor方法
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
//获取waitStatus状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
//获取到第一个节点
    Node s = node.next;
//在目前咱们虚拟的环境中,s!=null
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
      //这儿直接unpakr把线程唤醒
        LockSupport.unpark(s.thread);
}
接下来咱们分析一下unpank之后代码如何走:
其实咱们的代码以前卡住了,而后unpark之后会致使代码继续往下执行。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
           //以前咱们的代码卡在这个地方。
           //由于如今线程被唤醒了,因此这儿就能够继续往下执行。
           //咱们要注意这儿是一个死循环。
           //因此继续重复执行。
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
//重复执行到这儿,获取当前节点的上一个节点
//当前节点的上一个节点固然是head了
            final Node p = node.predecessor();
//p==head条件知足,因此接着就执行tryAcquire
//这个方法就 开始加锁了,修改当前加锁线程的名字
//把state 改成了1
            if (p == head && tryAcquire(arg)) {
//而后把当前线程的node设置为head
                setHead(node);
                //把当前线程的Node 置为null进行垃圾回收
                p.next = null; // help GC
                failed = false;
               //返回状态
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
相关文章
相关标签/搜索