[源码分析]ReentrantLock & AbstractQueuedSynchronizer & Condition

首先声明一点: 我在分析源码的时候, 把jdk源码复制出来进行中文的注释, 有时还进行编译调试什么的, 为了不和jdk原生的类混淆, 我在类前面加了"My". 好比把ReentrantLock更名为了MyReentrantLock, 在源码分析的章节里, 我基本不会对源码进行修改, 因此请忽视这个"My"便可.java


一. 简介

锁是什么? 锁是一种标志, 或者是一种资源, 持有锁的线程才能够继续往下执行相应的内容. 未持有锁的线程须要等待这个锁资源. 直到获取到了这个锁, 才能够继续向下执行.node

0. ReentrantLock的一个小demo

想本身运行这段代码的话, 把代码中的"MyReentrantLock" 改成 "ReentrantLock" 便可. (后续的代码也同样, 若是想本身运行, 还编译报错, 请把我修改的代码改回来. 也就是把"My"都去掉就行了)数据结构

public class Main {
    private static MyReentrantLock lock = new MyReentrantLock();

    public static void main(String[] args) throws Exception {
        // 场景以下: 线程1先得到锁, 释放后, 线程2 再得到锁.

        new Thread(() -> {
            System.out.println("线程1启动");
            lock.lock();
            System.out.println("线程1抢到锁");
            try {
                System.out.println("这里是业务逻辑1");
                quietSleep(2);// 两秒后释放锁
                System.out.println("两秒后");
            } finally {
                lock.unlock();
                System.out.println("线程1释放锁");
            }
        }).start();

        new Thread(() -> {
            System.out.println("线程2启动");
            quietSleep(1); // 在这里进行谦让. 确保上面的线程能先运行. 也就是让上面的线程先得到锁
            lock.lock();
            System.out.println("线程2抢到锁");
            try {
                System.out.println("这里是业务逻辑2");
            } finally {
                lock.unlock();
                System.out.println("线程2释放锁");
            }
        }).start();

    }

    public static void quietSleep(long sec) {
        try {
            Thread.sleep(sec * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

 输出的结果以下:并发

1. sync字段

首先来看一下ReentrantLock里惟一的一个字段函数

Sync继承自AQS(AbstractQueuedSynchronizer, 如下简称AQS) . 公平锁和非公平锁都继承了Sync. Sync是ReentrantLock类里锁的统一声明. 源码分析

2. lock/unlock依赖Sync

ReentraintLock的 lock()和unlock()方法实际上都是靠Sync来实现的:ui

3. 锁内部类定义

Sync 和 公平锁 和 非公平锁 都是ReentrantLock的内部类, 类的定义部分以下(细节先隐藏起来了, 后面会讲):this

4. ReentrantLock构造器

ReentrantLock有两个构造器..net

1. 默认构造器是直接使用了非公平锁. 非公平锁就是不必定按照"先来后到"的顺序来进行争抢.线程

2. 带参构造器能够传递一个bool类型. true的时候为公平锁. 公平锁就是按照"先来后到"的顺序来进行争抢.

二. 公平锁申请锁

使用锁的第一个步骤, 固然就是先申请锁了, 咱么来分析一下源码, 看看申请锁的流程吧. 

1. 公平锁获取锁的流程(单线程, 没有争抢) 

首先从最外层的调用lock()方法开始. 我们在Main方法里写下这两行代码:

MyReentrantLock就是ReentrantLock, 我复制了源代码, 而后改了个名字而已.

Reentraint类的lock()方法最终仍是调用的sync.lock()

因为咱们如今使用的是公平锁. 因此sync如今是FairSync. 因此sync.lockI()实际上就是FairSync类里的lock()方法

发现lock()调用的是acquire(1)这个方法, 这个方法是在AQS类里实现的.代码以下:

arg当时传进来的是1, 因此首先进行的是tryAcquire(1)来进行"尝试获取锁"的操做. 这时一种乐观的想法.

tryAcquire方法的具体实如今FairSync类里, 具体代码以下:

/**
     * @return 返回true: 获取到锁; 返回false: 未获取到锁
     * 何时返回true呢?  1.没有线程在等待锁;2.重入锁,线程原本就持有锁,也就能够理所固然能够直接获取
     * @implNote 尝试直接获取锁.
     */
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程的引用
        final Thread current = Thread.currentThread();

        // 当前锁的计数器. 用于计算锁被获取的次数.在重入锁中表示锁重入的次数.因为这个锁是第一次被获取, 因此c==0
        int c = getState();

        // c==0, 也就是 state == 0 ,重入次数是0, 表示此时没有线程持有锁.
        if (c == 0) {
            // 公平锁, 因此要讲究先来后到
            // 由于有多是上一个持有锁的线程刚刚释放锁, 队列里的线程还没来得及争抢, 本线程就乱入了
            // 因此每次公平锁抢锁以前, 都要判断一下等待队列里是否有其余线程
            if (!hasQueuedPredecessors() &&
                    // 执行到这里说明等待队列里没有其余线程在等待.
                    // 若是没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
                    // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
                    compareAndSetState(0, acquires)) {
                
                // 到这里就获取到锁了,标记一下,告诉你们,如今是我(当前线程)占用了锁
                setExclusiveOwnerThread(current);
                // 成功获取锁了, 因此返回true
                return true;
            }


            //-- 因为如今模拟的是单纯地获取一次锁, 没有重入和争抢的状况, 因此执行不到这里, 上面的cas确定会成功, 而后返回true


        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

争抢完锁以后会返回true, 而后回到上层方法acquire : 

if语句里 && 前面是false, 不会继续往下执行了. 当前线程获取到了锁, 并且执行了全部该执行的内容, 就完事儿了.

2. 公平锁进行重入的流程

重入就是一个线程获取到了锁, 而后这个线程又一次申请(进入)了这个锁.

重入用synchronized来举例就是这样:

用ReentrantLock来举例子就是这样:

同一个线程(main线程) 首先进行了lock.lock()申请并占有了锁, 随后又执行了一次lock.lock(). 还没释放锁的状况下, 又一次申请锁. 这样就是重入了.  

上面一小节已经分析了第一行的lock.lock()是如何获取到锁的, 因此咱们只分析 重入的部分, 也就是后面那句lock.lock()的执行流程.

前面的执行过程一直是如出一辙的, 直到这里:

/**
     * @return 返回true: 获取到锁; 返回false: 未获取到锁
     * 何时返回true呢?  1.没有线程在等待锁;2.重入锁,线程原本就持有锁,也就能够理所固然能够直接获取
     * @implNote 尝试直接获取锁.
     */
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程的引用
        final Thread current = Thread.currentThread();

        // 当前锁的计数器. 因为前面的那句lock已经获取到锁了, 因此这里是status==1, 也就是 c==1
        int c = getState();

        // c==1, 表示当前有线程持有锁, 因此这段if是进不去了
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }

        // 因为 c==1 , 没法进入if语句, 因此来看看满不知足这里的 else if
        // 这个锁被人占了, 但仍是不死心, 因而看一下是否是当前线程本身占的这个锁.
        // (人家女生说有喜欢的人, 为何不问问是否是本身呢 = =.)
        // 因为是同一个线程, 因此就是本身啦! 因此会进入这个else if分支,
        } else if (current == getExclusiveOwnerThread()) {
            // 代码执行到这里了, 就是所谓的 重入 了

            // 这里的acquires的值是1, 因此nextc =  1 + 1 , 也就是2了
            int nextc = c + acquires;
            // 小于0, 说明int溢出了
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            // 在这里把状态更新一下, 把state更新为2, 意思就是这个锁被同一个线程得到2次了. 
            // (你们就能够以此类推, 下次再重入的话, 那么就会再+1, 就会变为3....)
            setState(nextc);
            // 重入完成, 返回true
            return true;
        }
        
        return false;
    }

 还记得上小节讲的, 获取锁的时候进入的是这段代码的if语句, 而重入就不同了, 进入的是 else if语句. 但最终返回的仍是true, 表示成功. 

上面讲的是无争强的状况, 接下来说讲有争抢的状况.

3. 公平锁cas争抢失败

场景以下:

一开始锁是空闲状态, 而后两个线程同时争抢这把锁(在cas操做处发生了争抢).

一个线程cas操做成功, 抢到了锁; 另外一个线程cas失败. 

代码例子以下(代码的意思到位了, 可是这段代码最后不必定会在cas处进行争抢, 你们意会就行了):

cas操做成功的线程就和上面第1小节的同样, 就不用再重复描述了.

而cas争抢失败的线程会何去何从呢? 看我给你们分析: 

 /**
     * @return 返回true: 获取到锁; 返回false: 未获取到锁
     * 何时返回true呢?  1.没有线程在等待锁;2.重入锁,线程原本就持有锁,也就能够理所固然能够直接获取
     * @implNote 尝试直接获取锁.
     */
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程的引用
        final Thread current = Thread.currentThread();

        // 当前锁的计数器.
        int c = getState();

        // state == 0 表示此时没有线程持有锁
        if (c == 0) {
            // 本场景中, 一开始锁是空闲的, 因此队列里没有等待的线程
            if (!hasQueuedPredecessors() &&
                    // 两个线程在这里进行争抢
                    // cas抢成功的会进入到if代码块
                    // cas抢失败的, 就跳出整个if-else, 也就是直接到最后一行代码
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }

        // cas 操做失败后, 会这直接执行到这里. 返回false.
        return false;
    }

 在这里返回了false, 回到上一层函数.

第一个条件是true, 因此会继续往下执行acquireQueued方法. 来准备让这个失败的线程进入队列等待.

下面继续来给你们讲解 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) .

先讲讲这个addWaiter(Node.EXCLUSIVE):

/**
     * 将当前线程封装为Node, 而后根据所给的模式, 进行入队操做
     *
     * @param mode 有两种模式 Node.EXCLUSIVE 独占模式, Node.SHARED 共享模式
     * @return 返回新节点, 这个新节点封装了当前线程.
     */
    private Node addWaiter(Node mode) { // 这个mode没用上.
        Node node = new Node(Thread.currentThread(), mode);
        // 我们刚才都没见到过tail被赋予了其余的值, 固然就是null了.
        Node pred = tail;
        // tail是null的话, pred就是null, 因此不会进入到这个if语句中.因此跳过这个if语句.
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        // 由于锁的等待队列是懒初始化, 直到有节点插入进来, 它才初始化.
        // 而如今这个挣钱失败的线程, 正好是锁创建以来, 第一个进入等待队列的线程. 因此如今才准备进行初始化.
        // 初始化完了后会把当前线程的相关信息和引用封装成Node节点, 而后插入到队列当中.而且制定head 和 tail.
        // tail就不等于null了, 因此下一次addWaiter方法被调用的时候, 就会执行上面的if语句了. 而不会跳过if语句, 来到这里进行初始化了.
        enq(node);
        // 返回这个Node节点.
        return node;
    }

 目的就是要将这个cas失败的线程封装成节点, 而后插入到队尾中. (等待队列是懒初始化,) 

若是队列已经初始化了, 那么tail就不会是null, 就会执行上面代码中的if语句, 调整一下指针的引用就行了.

可是若是队列还未初始化, 那么就应该先初始化, 再插入. 先初始化,再插入, 对应的代码是enq(node). 

接下来说解一下enq方法: 

   /**
     * 采用自旋的方式入队
     * CAS设置tail,直到争抢成功.
     */
    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            //  最开始tail确定是null, 进入if进行初始化head和tail.
            if (t == null) { // Must initialize
                // 设置head 和tail. cas来防止并发.
                if (compareAndSetHead(new Node())) tail = head;
                
            // if 语句执行完了后, 以后的for循环就会走else了.
            } else {
                // 争抢入队, 没抢到就继续for循环迭代.抢成功了就能够return了,否则一直循环.
                // 为何是用cas来争抢呢? 由于怕是多个线程一块儿执行到这里啊 
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 刚才的addWaiter(Node.EXCLUSIVE) 分析完了, 总之就是addWaiter以后, 队列确定是被建立完了, 并且还把node(当前线程的封装)插入到了队列的队尾. 而且返回了这个node.  acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 能够简化为 acquireQueued(node)

因此继续分析acquireQueued方法.

final boolean acquireQueued(final Node node, int arg) {
        // node是刚才addWaiter方法插入到队尾的节点
        // arg 是 1

        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                // 获取node节点的前驱.
                final Node p = node.predecessor();

                // 若是node节点的前驱是head
                if (p == head
                        // 那么能够再尝试着抢一下锁. 
                        // 等待队列里的第一个节点很乐观, 由于确实颇有可能会立刻轮到他
                        && tryAcquire(arg)) {
                    // 若是这个node就是那么巧合, 刚刚锁被释放了, 这回从新抢就真的抢到了
                    // 那么就把当前节点设为头结点.(头结点的含义就是当前持有锁的线程)
                    setHead(node);
                    // 上一个节点既然已经释放了锁, 也就该GC了. 置为null, 方便GC收集
                    p.next = null; // help GC
                    // 很明显是获取锁成功了啊, 因此failed = false
                    failed = false;

                    // 这么大一段代码, 只有这一处return
                    return interrupted;
                }

                //---- 若是不是队头,  那么就会执行到这里.
                //---- 或者虽然做为等待队列里的第一名, 单因为持有锁的线程仍是没有释放, 因此仍是没抢到锁. 那么也会执行到这里

                // 获取锁失败的时候是否该阻塞
                if (shouldParkAfterFailedAcquire(p, node)
                        // 在这里阻塞, 等待唤醒
                        && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 上面那段, 若是中途异常了的话, 就会执行到这里. (通常不会到这里的)
            if (failed) cancelAcquire(node);
        }
    }

 上面这段代码中shouldParkAfterFailedAcquire方法 和 parkAndCheckInterrupt() 方法 还未解释. 一个一个来.

/**
     * 当前线程没有抢到锁,是否须要挂起当前线程
     *
     * @param pred 前驱结点
     * @param node 当前结点
     * @return 若是线程须要被阻塞, 那么就返回true
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程须要挂起,直接能够返回true
        if (ws == Node.SIGNAL)
            return true;
        // 大于0, 其实就是等于1, Node.CANCELLED 是 1, 由于状态中只有这个状态是大于0的...说明前驱节点取消了排队
        // 因此下面这块代码说的是, 在链表中从prev结点开始, 往前删掉CANCELLED状态的结点.
        // 只有CANCELLED状态值大于0
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
                
            // 删掉以后再往前看看, 看看前面是否是CANCELLED, 若是是, 那还得继续往前删
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 在前面的两个if语句中排除掉了waitStatus值为-1和1的状况,
            // 只剩下0,-2,-3这三个状态了
            // 然而在咱们前面的源码中,都没有看到有设置waitStatus的,
            // 因此只剩下等于0的状况了
            // 下面的操做就是, 若是waitStatus等于0, 那么就用cas将前驱结点的waitStatus设置为-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

shouldParkAfterFailedAcquire里的前两行是在判断前驱节点prev的状态. 可是以前我们分析代码, 并无发现哪里设置了waitState.

因此waitState是默认值0.

因此shouldParkAfterFailedAcquire会直接执行下面的else, 在这里吧pred的waitState设置为-1, 而后返回false.

回到刚才的acquireQueued方法. 因为外层是for循环, 会在下一次for循环在此执行到shouldParkAfterFailedAcquire方法. 

因为刚才已经把前驱节点prev的waitState改成1了, 因此此次在前两行判断prev的waitState时, 直接就知足条件, 而后return true了.

shouldParkAfterFailedAcquire方法return true了, 才会往下执行parkAndCheckInterrupt方法.

下面是parkAndCheckInterrupt()方法. 最终返回Thread.interrupted(). 返回线程是否被中断. (中断和挂起不是一回事 )

    /**
     * 在这里线程阻塞.
     * 被唤醒的时候会返回, 若是被中断过, 那么就返回true
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        // 挂起.
        MyLockSupport.park(this);
        return Thread.interrupted();
    }

 LockSupport.park(this)会挂起当前线程. 可是LockSupport.park还有一个隐藏功能. 就是, 若是先对一个线程unpark, 再对这个线程park, 那么此次的park是失效的. 下一次park才会挂起.

缘由就是, 对一个没有被park的线程进行unpark的时候, 会把标志位perm置为1. 而每次park的操做, 都是先去检查perm是否为1.

若是是1, 那么置为0, 而且此次不挂起.

若是perm为0, 那么就直接挂起这个线程.

4. 公平锁因为队列内有元素而失败

demo以下. 前两个线程, 其中一个获取锁成功, 另外一个失败, 而后进入等待队列.

稍后, 第三个线程来获取锁, 可是这时因为等待队列中已经有元素在等待了. 因此会直接失败, 而后会被插入到等待队列的尾部.

上面的main方法中总共有三个线程想要占有锁. 前两个锁的争抢在上小节就已经模拟过了.

咱么如今只分析第三个线程申请锁的流程. 这个场景下的tryAcquire方法以下(会直接返回false):

/**
         * @return 返回true: 获取到锁; 返回false: 未获取到锁
         * 何时返回true呢?  1.没有线程在等待锁;2.重入锁,线程原本就持有锁,也就能够理所固然能够直接获取
         * @implNote 尝试直接获取锁.
         */
        protected final boolean tryAcquire ( int acquires){
            // 获取当前线程的引用
            final Thread current = Thread.currentThread();

            // 当前锁的计数器.
            int c = getState();

            // 不会走这的if语句, 由于锁被其余线程占有, 确定不是0
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }

            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }

            // 因为队列内有元素, 因此if语句不执行, 
            // 因为不是重入, else if 也不执行.
            // 直接返回false
            return false;
        }

这段方法返回false, 说明须要执行这个. acquireQueued(addWaiter(Node.EXCLUSIVE), arg). 先看看addWaiter方法有什么区别.

/**
     * 将当前线程封装为Node, 而后根据所给的模式, 进行入队操做
     *
     * @param mode 有两种模式 Node.EXCLUSIVE 独占模式, Node.SHARED 共享模式
     * @return 返回新节点, 这个新节点封装了当前线程.
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 如下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
        Node pred = tail;
        // 若是tail不是空, 说明有头结点.说明这个队列已经被初始化了.
        // 由于本小节讲的就是: 由于公平锁的等待队列中有其余线程才致使当前线程争锁失败, 因此说明等待队列不只被初始化了, 并且里面还有元素.
        if (pred != null) {
            // node设置本身的前驱为pred
            node.prev = pred;
            // 用CAS把当前节点node设置为队尾, 若是成功后,tail指针就指向了node
            if (compareAndSetTail(pred, node)) {
                // 若是cas争抢成功, 那么就会在这里返回.(而cas失败的, 会跳过这个if代码块, 会执行到下面的enq方法)
                // 剩下的就是整理一下链表数据结构的链接问题了
                // pred调整本身的后继为node
                pred.next = node;
                return node;
            }
        }

        // 若是在上面的cas中设置失败, 那么仍是会执行到这里.
        // 而后在enq方法里靠for循环+cas的形式, 不断尝试着插入到队尾.

        enq(node);
        return node;
    }

 后续执行的就和上小节的同样了.就不重复了...

固然, 场景是举不完的, 举完的话就跟笛卡尔积那样了. 我这里只是靠这四个例子来尽可能完整地分析了获取锁的流程.

三. 公平锁释放锁

刚才申请锁的流程. 可是争抢失败的那些线程, 最后都进入到了等待队列里, 而后就杳无音讯了.

那当前持有锁的线程释放锁后, 是如何唤醒等待队列里的线程, 让下一个线程获取锁的呢?

咱么接下来分析一下释放锁的过程吧.

1. 申请1次锁, 执行一些业务, 而后释放

 

我们只关注unlock, lock就跳过了, 前面讲过了.

ReentrantLock类的lock()方法 代码以下: 

  

而这个release是AQS里的方法. 源码以下:

其中arg变量值是1.  首先会执行tryRelease(1) 来尝试释放锁.

若是尝试成功了, 那么tryRelease(1)就会返回true, 就会继续执行if代码块里的内容. 

若是尝试失败了, 那么tryRelease(1)就会返回false. 而后就会跳过if语句, 最终本段方法(release方法)也会返回false.

我们先分析一下tryRelease方法吧(tryRelease方法的源码在Sync抽象类里):

protected final boolean tryRelease(int releases) {
        // releases == 1

        // c 就是重入次数 -1 , 因为本场景下模拟的是简单的获取一次锁, 而后释放, 不涉及到重入. 因此getState() == 1
        // 因此c = 1 - 1 , c如今等于0
        int c = getState() - releases;

        // 判断当前的线程是否是持有锁的线程, 否则抛异常.
        // 这是为了其余的线程捣乱.
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();

        // 用于标记是否能够彻底释放锁
        boolean free = false;

        // c等于0, 说明没有重入了, 能够彻底释放了.
        if (c == 0) {
            // 标记一下, 准备彻底释放锁
            free = true;
            // 把锁的持有者设置为空, 表示锁被释放.
            setExclusiveOwnerThread(null);
        }
        // 把刚才c==0 设置为state
        setState(c);

        // 表示是否彻底释放. 本场景下返回true
        return free;
    }

 因为返回的是true, 因此返回后还有if语句块要执行:

接下来分析一下其中的unparkSuccessor方法, 看看他是如何唤醒下一个节点的.(这个方法在AQS里)

unpark以后, 就会把以前park(挂起)的线程激活, 而后继续执行:

若是线程被中断了, 那么parkAndCheckInterrupt()方法会返回true, 而后就会执行interrupted = true 这句话. 

挂起和中断不是一回事, 通常不会被中断的. 因此通常不会执行interrupted=true这句话. 

外层是个for循环, 当前线程被激活后, 做为等待队列中的第一个线程, 来进行获取锁. 因为是公平锁, 因此能够放心拿到, 没有人会抢, 因此会正常获取到锁.

2. 重入锁的释放

 

释放重入的锁(同一个线程屡次获取的锁), 执行流程惟一不一样的就是tryRelease方法了, 其余的都同样, 能够直接参考上面一小节的.

咱么看看重入的时候, tryRelease是如何执行的吧.

 protected final boolean tryRelease(int releases) {
        // 其实就是重入计数器 -1
        // 而因为本线程获取了2次这个锁, 因此state字段的值为2
        // 因此c = 2 - 1
        // 因此如今c == 1
        int c = getState() - releases;

        // 判断当前的线程是否是持有锁的线程, 否则抛异常.
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();

        // 用来标记是否彻底释放锁
        boolean free = false;

        // c如今等于1, 不会进入这个if代码块
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }


        // 设置重入计数器, 也就是让 state =1 
        setState(c);

        // 返回false.
        return free;
    }

 本小节和上小节的区别就是这段代码了.既然这段方法返回false, 那么返回后, release方法的if代码块天然也就不执行了. 

四. 非公平锁的获取

刚才讲解了公平锁, 那么接下来说讲非公平锁, 究竟是怎么个不公平呢?

1. 非公平锁与公平锁获取的区别

因为非公平锁的获取与公平锁的获取, 只有一点点区别. 因此咱么只分析出区别就行了, 其余的部分都同样的.

而后会调用到NonfairSync类里的lock()方法.

这里就体现出了区别.

公平锁里的lock()方法里面, 只有acquire(1). 

而非公平锁在acquire(1)以前多了一次cas操做. 一上来就尝试着抢占锁, 看看有没有机会(万一真的这个时候持有锁的线程正好把锁释放了呢). 非公平锁根本无论是否有其余人在排队.上来就是一抢.

当此次cas失败了, 才会像公平锁同样进入acquire(1)方法:

 

这里和公平锁同样. 只是, 非公平锁的tryAcquire方法和公平锁的tryAcquire方法内部实现不同.

看看非公平锁的tryAcquire方法吧:

我们继续往下看看nonfairTryAcquire方法吧:

/**
         * 不公平地尝试获取锁.
         * 不公平的语义就是: 不用判断队列里是否有其余线程在等待, 直接抢.
         */
        final boolean nonfairTryAcquire(int acquires) {
            // 获取当前线程的引用
            final Thread current = Thread.currentThread();
            // 当前线程的重入次数
            int c = getState();
            // 若是是0, 表示此时此刻锁还被被任何一个线程所占用
            if (c == 0) {
                // 当c==0的时候, 公平锁锁是先判断队列里是否有其余线程在等待, 若是没有, 再去cas争抢.
                // 而非公平锁这里, 就是根本就不去理会等待队列, 本身抓到机会就赶忙抢
                // cas来争抢, 让重入次数变1.
                // 用cas是由于这个地方会发生并发.
                // 多个抢占固然只有一个成功了
                if (compareAndSetState(0, acquires)) {
                    // 设置锁的拥有者为当前线程.
                    setExclusiveOwnerThread(current);
                    return true;
                }

                // 若是不是0, 说明锁被某一个线程占用了
                // 既然被占用了, 那就有两种状况: 1. 被本身占用; 2. 被别的线程占用
                // 因此先看看是否是本身占用的, 若是是本身占用的, 那就重入.
            } else if (current == getExclusiveOwnerThread()) {
                // 其实就是+1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 这里不会产生争抢, 没必要用cas
                // 由于只有占用锁的这一个线程才能进入到这个else if 里
                // 一个线程不可能发生争抢
                setState(nextc);
                return true;
            }
            // 1. 若是在if里的cas争抢失败
            // 2. 或者是不知足else if的条件
            // 那就会直接返回false
            // 无论是成功仍是失败, 都不会有线程的等待阻塞之类的. 都是当即返回.
            return false;
        }

 这里的非公平锁的nonfairTryAcquire方法 和 公平锁的tryAcquire方法很像. 区别就是:

非公平锁是, 当c==0. 也就是此时此刻, 锁是空闲状态的时候. 直接就尝试着用cas来争抢锁, 看看是否能成功, 而无论等待队列是否还有其余线程再等待.

而公平锁在c==0的时候, 也就是state==0 的时候, 先去看看队列里是否有其余线程再等待, 若是队列里没有其余线程在等待, 才会去cas争抢. 否则就会把机会让给队列里的第一个线程, 而本身会进入到等待队列的尾部.

为何c==0了, 队列里还有可能会有其余的元素在等待呢?

由于c==0只是说明当前锁的状态是空闲状态. 只是上一个线程刚刚把锁释放, 当前线程就来争抢锁了, 还没来得及唤醒等待队列里的第一个线程呢.  

其余地方就跟公平锁都同样的, 就是多了本小节讲的两处cas. 

五. 非公平锁的释放

1. 非公平锁会致使饥饿

也就是说, 上一个线程释放锁后, `等待队列` 里的第一个线程就会被激活, 而后会执行tryAcquire方法. 若是这个时候有新的线程来争抢, 

因为是非公平模式, 有可能新的线程会抢到这个锁. 若是新的线程抢到了锁, 那么刚刚被激活的线程(等待队列里的第一个线程)就是执行tryAcquire失败, 这个方法执行失败就意味着会被再次被挂起. 若是并发量严重, 极可能`等待队列`里的全部线程在必定时间内都没法被正常调度.也就是产生了线程饥饿的现象.

六. Condition简介

1. condition简介和demo

public class Main {
    private static MyReentrantLock lock = new MyReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {
        new Thread(Main::funcA).start();
        new Thread(Main::funcB).start();
    }

    public static void funcA() {
        lock.lock();

        System.out.println("await以前");
        try {
            condition.await(); // 在这里等待被其余线程通知(signal)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("await以后");

        lock.unlock();
    }

    public static void funcB() {
        lock.lock();

        System.out.println("signal以前");
        condition.signal(); // 这里会通知funcA的await().让funcA()继续执行下去
        System.out.println("signal以后");

        lock.unlock();
    }
}

 运行这段代码, 输出以下: 

若是仍是没有体会到区别, 那么把main方法里的第二行注释掉, 而后再执行一下:

输出结果以下: 

也就是说, await()会使当前线程挂起, 须要其余线程通知他, 他才能被激活(唤醒).

七. Condition实例化

1. 获取condition的例子

2. condition实例化的源码

newCondition方法在ReentrantLock类里的实现以下:

Sync类里的newCondition()方法以下:

ConditionObject是AQS里的一个内部类,实现自Condition,  类的声明以下(具体源码后面再解释):

八. condition的等待(await) 和 通知(signal)

1. 只执行一句await()后的流程

 

await()方法的具体实如今AQS里的内部类ConditionObject类里:

public final void await() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        // 添加到 condition 的`条件队列`中
        Node node = addConditionWaiter();
        // 彻底释放锁,返回值是释放锁以前的 state 值
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 这里的isOnSyncQueue就是在判断node节点是否在锁的`等待队列`里
        while (!isOnSyncQueue(node)) {
            // 在这里线程挂起
            MyLockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //---- 程序不会执行到下面, 由于在前面就已经挂起了.


        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

 这里面有几个方法以前没提到过, 在这里一一攻破.

先解决addConditionWaiter()方法:

   /**
     * 将当前线程对应的节点入队,插入队尾, 而且做为本方法的返回值.
     */
    private Node addConditionWaiter() {
        // 本例子中的场景下, 只执行过一次await()方法, 因此是第一个进入`条件队列`的元素.
        // 因此lastWaiter和firstWaiter确定都是null.
        Node t = lastWaiter;
        // 本例子中t==null, 因此这段if暂时不考虑吧
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        
        // 新建节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        
        // 由于t==null, 意思是队列目前仍是空的, 因此这个节点是第一个节点, 因此是firstWaiter.
        if (t == null) firstWaiter = node;
        else t.nextWaiter = node;
        
        // 但node同时也是最后一个节点, 也就是lastWaiter
        lastWaiter = node;
        
        // 最后会返回本方法
        return node;
    }

 接下来是fullRelease(node)方法, 来彻底释放锁:

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 这里使用了当前的 state 做为 release 的参数,也就是彻底释放掉锁,将 state 置为 0
            if (release(savedState)) {
                failed = false;
                // 而且把释放锁以前的state值返回出去. (本例子中是1)
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

 最后就是isOnSyncQueue(node)方法, 来判断锁的`等待队列`中有没有当前这个node:

   /**
     * 这个方法就是判断 node 是否已经移动到sync queue了
     * (signal 的时候会将节点从条件队列移到sync queue)
     */
    final boolean isOnSyncQueue(Node node) {
        // 当进入Condition队列时,waitStatus确定为CONDITION,
        // 若是同时别的线程调用signal,Node会从Condition队列中移除,
        // 而且移除时会清除CONDITION状态。
        // 从移除到进入sync queue队列,中间这段时间prev必然为null,因此仍是返回false,即被park
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            // 本例子中, 会在这里返回
            return false;

        
        //--- 本例子中, 程序不会往下执行了. 可是下面的代码仍是分析一下吧. 这样待会儿就不用再从新讲个方法了.
        
        
        // 当别的线程进入sync queue队列时,会和前一个Node创建先后关系,因此若是next存在,说明必定在release队列中
        if (node.next != null) // If has successor, it must be on queue
            return true;

        // 到这里还没找到, 那只能去锁的`等待队列`里一个一个找了

        // 可能该Node刚刚最后一个进入release队列,因此是tail,其next必然是null,因此须要从队尾向前查找
        // 这个方法的源码就不讲了, 太简单了, 就是链表从后往前找node.找到了就true.没找到就false.
        return findNodeFromTail(node);
    }

 最终会执行到await()方法里的park()方法, 线程挂起. 等待被别的线程唤醒.

2. 只执行一句signal()后的流程

而后我们看看signal()的源码.

因为firstWaiter==null, 因此first==null, signal方法直接就退出了.

3.一个线程await等待, 另外一个线程用signal来唤醒

本场景的程序demo以下: 

public class Main {
    private static Scanner scanner = new Scanner(System.in);

    private static MyReentrantLock lock = new MyReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws Exception {
        new Thread(Main::funcA).start();
        new Thread(Main::funcB).start();
    }

    public static void funcA() {
        lock.lock();

        System.out.println("await以前");
        try {
            condition.await(); // 在这里等待被其余线程通知(signal)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("await以后");

        lock.unlock();
    }

    public static void funcB() {
        lock.lock();

        System.out.println("signal以前");

        System.out.print("请输入任意内容并回车, 以执行signal方法: ");
        scanner.next(); // 在这里进行阻塞, 在控制台输入任意内容后回车, 就会接触阻塞, 就会执行signal方法, 也就是通知funcA()方法.
        condition.signal(); // 这里会通知funcA的await().让funcA()继续执行下去
        System.out.println("signal以后");

        lock.unlock();
    }

}

 首先, await()仍然执行到park这句, 而后挂起, 这点与本章第1小节的流程是同样的(看下图, 我选中的park那行代码, await就在这里挂起): 

 

而此时控制台以下:

此时尚未执行signal, 由于我用输入流给signal方法进行阻塞了, 须要输入内容后回车, 就能够调用到signal方法.signal通知后,await就会被唤醒.

以下:

我们分析一下signal是如何通知await, 而后让await线程被唤醒的:

由于刚才执行过await(), 因此firstWaiter不会是null. 因此会调用到doSignal方法:

 

上面这段代码也比较简单, 就是将firstWaiter为头的这个链表, 把第一个元素出队, 而后让第二个元素当新的头部. 而后让刚才出队的那个元素执行tansferForSignal方法.

    /**
     * 将节点从条件队列转移到锁的`等待队列`
     *
     * true 表明成功转移
     * false 表明在 signal 以前,节点已经取消了
     */
    final boolean transferForSignal(Node node) {
        /*
         * 在这里将 waitStatus 置为 0.
         * 若是成功设置为0, 那么继续往下面执行
         * 若是CAS 失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,那么直接return false.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * enq(node): 自旋进入阻塞队列的队尾.这个在将lock()方法的时候你们见到过.就是同一个方法.
         * 这里的返回值 p 是 node 在阻塞队列的前驱节点
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。
        // 若是 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用
        // 由于节点入队后,须要把前驱节点的状态设为 Node.SIGNAL(-1)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 若是前驱节点取消或者 CAS 失败,会进到这里唤醒线程(可是本场景下不会执行到这里)
            MyLockSupport.unpark(node.thread);
        // 返回true
        return true;
    }

 将上面这段代码总结一下就是: 将本节点的waitState设置为了0. 而后让本节点插入到到锁的`等待队列`, 而后将前驱节点的waitState设置为了1. 而后返回了true.  

这一行的tansferForSignal返回了true, 取反了就是false了, 因此退出了 while循环. 至此signal方法就执行完毕了.

signal干的主要事情就是: 把`条件队列`里的第一个元素转移(尾插)到了锁的`等待队列`里. 

`条件队列`就是firstWaiter为头结点的一个链表.

`等待队列`就是我们上面将lock() unlock()的时候提到的锁的等待队列.

signal方法执行完了后, 接下来就该执行unlock()方法了. 以下图:

unlock()所作的事情就是, 释放当前的锁, 而后激活`等待队列`里的第一个线程. 

而在本场景下, 如今等待队列里有且仅有一个元素, 就是signal方法转移的那个元素.

unlock()以前分析过, unlock会调用release方法:

release方法所作的就是释放锁(第一个红色代码), 而后唤醒`等待队列`里的第一个线程(第二个红色代码).

unlock()方法执行完了后, 刚才await挂起的那个线程就又被激活了.

因此接下来执行的是acquireQueued方法, 这个方法在将锁的时候讲过, 因此这里简单讲解一下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                // 本场景下: node是队列里的第一个元素, 也就是await的线程对应的node.
                // 本场景下: p是node的前一个节点, 也就是head节点了
                final Node p = node.predecessor();
                // 本场景下: p==head. 锁如今空闲, tryAcquire也会成功.
                if (p == head && tryAcquire(arg)) {
                    // 将node设置为新的head. head节点隐含的意思就是: head节点对应的线程是当前锁的持有者
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;

                    // 返回false. 由于本场景下该线程没有被中断过.
                    return interrupted;
                }
                
                //--- 本场景下, 不会执行到下面的代码
                
                if (shouldParkAfterFailedAcquire(p, node)
                        && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) cancelAcquire(node);
        }
    }

 最终这个方法返回了true. 接下来, await()方法继续执行剩下的几行代码就能够退出了:

这两行就是作了相应的维护操做, 和线程中断判断, 这里就不讲解了. 

随后,await方法执行完了, 退出方法栈.

而后就继续往下执行.  执行System.out.println, 而后是unlock.

至此本段程序就执行完了. 

相关文章
相关标签/搜索