从0到1实现本身的阻塞队列(上)

阻塞队列不止是一道热门的面试题,同时也是许多并发处理模型的基础,好比经常使用的线程池类ThreadPoolExecutor内部就使用了阻塞队列来保存等待被处理的任务。并且在大多数经典的多线程编程资料中,阻塞队列都是其中很是重要的一个实践案例。甚至能够说只有本身动手实现了一个阻塞队列才能真正掌握多线程相关的API。面试

在这篇文章中,咱们会从一个最简单的原型开始一步一步完善为一个相似于JDK中阻塞队列实现的真正实用的阻塞队列。在这个过程当中,咱们会一路涉及synchronized关键字、条件变量、显式锁ReentrantLock等等多线程编程的关键技术,最终掌握Java多线程编程的完整理论和实践知识。编程

阅读本文须要了解基本的多线程编程概念与互斥锁的使用,还不了解的读者能够参考一下这篇文章多线程中那些看不见的陷阱中到ReentrantLock部分为止的内容。数组

什么是阻塞队列?

阻塞队列是这样的一种数据结构,它是一个队列(相似于一个List),能够存放0到N个元素。咱们能够对这个队列执行插入或弹出元素操做,弹出元素操做就是获取队列中的第一个元素,而且将其从队列中移除;而插入操做就是将元素添加到队列的末尾。当队列中没有元素时,对这个队列的弹出操做将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操做就会被阻塞,直到有元素被弹出后才会被唤醒。安全

在线程池中,每每就会用阻塞队列来保存那些暂时没有空闲线程能够直接执行的任务,等到线程空闲以后再从阻塞队列中弹出任务来执行。一旦队列为空,那么线程就会被阻塞,直到有新任务被插入为止。bash

一个最简单的版本

代码实现

咱们先来实现一个最简单的队列,在这个队列中咱们不会添加任何线程同步措施,而只是实现了最基本的队列与阻塞特性。 那么首先,一个队列能够存放必定量的元素,并且能够执行插入元素和弹出元素的操做。而后由于这个队列仍是一个阻塞队列,那么在队列为空时,弹出元素的操做将会被阻塞,直到队列中被插入新的元素可供弹出为止;而在队列已满的状况下,插入元素的操做将会被阻塞,直到队列中有元素被弹出为止。数据结构

下面咱们会将这个最初的阻塞队列实现类拆解为独立的几块分别讲解和实现,到最后就能拼装出一个完整的阻塞队列类了。为了在阻塞队列中保存元素,咱们首先要定义一个数组来保存元素,也就是下面代码中的items字段了,这是一个Object数组,因此能够保存任意类型的对象。在最后的构造器中,会传入一个capacity参数来指定items数组的大小,这个值也就是咱们的阻塞队列的大小了。多线程

takeIndexputIndex就是咱们插入和弹出元素的下标位置了,为何要分别用两个整型来保存这样的位置呢?由于阻塞队列在使用的过程当中会不断地被插入和弹出元素,因此能够认为元素在数组中是像贪吃蛇同样一步一步往前移动的,每次弹出的都是队列中的第一个元素,而插入的元素则会被添加到队列的末尾。当下标到达末尾时会被设置为0,从数组的第一个下标位置从新开始向后增加,造成一个不断循环的过程。并发

那么若是队列中存储的个数超过items数组的长度时,新插入的元素岂不是会覆盖队列开头尚未被弹出的元素了吗?这时咱们的最后一个字段count就能派上用场了,当count等于items.length时,插入操做就会被阻塞,直到队列中有元素被弹出时为止。那么这种阻塞是如何实现的呢?咱们接下来来看一下put()方法如何实现。工具

/** 存放元素的数组 */
    private final Object[] items;
    
    /** 弹出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;
    
    /** 队列中的元素总数 */
    private int count;
    
    /**
     * 指定队列大小的构造器
     *
     * @param capacity  队列大小
     */
    public BlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // putIndex, takeIndex和count都会被默认初始化为0
        items = new Object[capacity];
    }
复制代码

下面是put()take()方法的实现,put()方法向队列末尾添加新元素,而take()方法从队列中弹出最前面的一个元素,咱们首先来看一下咱们目前最关心的put()方法。在put()方法的开头,咱们能够看到有一个判断count是否达到了items.length(队列大小)的if语句,若是count不等于items.length,那么就表示队列尚未满,随后就直接调用了enqueue方法对元素进行了入队。enqueue方法的实现会在稍后介绍,这里咱们只须要知道这个入队方法会将元素放入到队列中并对count加1就能够了。在成功插入元素以后咱们就会经过break语句跳出最外层的无限while循环,从方法中返回。post

可是若是这时候队列已满,那么count的值就会等于items.length,这将会致使咱们调用Thread.sleep(200L)使当前线程休眠200毫秒。当线程从休眠中恢复时,又会进入下一次循环,从新判断条件count != items.length。也就是说,若是队列没有弹出元素使咱们能够完成插入操做,那么线程就会一直处于“判断 -> 休眠”的循环而没法从put()方法中返回,也就是进入了“阻塞”状态。

随后的take()方法也是同样的道理,只有在队列不为空的状况下才能顺利弹出元素完成任务并返回,若是队列一直为空,调用线程就会在循环中一直等待,直到队列中有元素插入为止。

/**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            // 直到队列未满时才执行入队操做并跳出循环
            if (count != items.length) {
                // 执行入队操做,将对象e实际放入队列中
                enqueue(e);
                break;
            }

            // 队列已满的状况下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            // 直到队列非空时才继续执行后续的出队操做并返回弹出的元素
            if (count != 0) {
                // 执行出队操做,将队列中的第一个元素弹出
                return dequeue();
            }

            // 队列为空的状况下休眠200ms
            Thread.sleep(200L);
        }
    }
复制代码

在上面的put()take()方法中分别调用了入队方法enqueue和出队方法dequeue,那么这两个方法到底须要如何实现呢?下面是这两个方法的源代码,咱们能够看到,在入队方法enqueue()中,总共有三步操做:

  1. 首先把指定的对象e保存到items[putIndex]中,putIndex指示的就是咱们插入元素的位置。
  2. 以后,咱们会将putIndex向后移一位,来肯定下一次插入元素的下标位置,若是已经到了队列末尾咱们就会把putIndex设置为0,回到队列的开头。
  3. 最后,入队操做会将count值加1,让count值和队列中的元素个数一致。

而出队方法dequeue中执行的操做则与入队方法enqueue相反。

/**
     * 入队操做
     *
     * @param e 待插入的对象
     */
    private void enqueue(Object e) {
        // 将对象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增长元素总数
        count++;
    }

    /**
     * 出队操做
     *
     * @return  被弹出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 并将该位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 减小元素总数
        count--;

        // 返回以前代码中取出的元素e
        return e;
    }
复制代码

到这里咱们就能够将这个三个模块拼接为一个完整的阻塞队列类BlockingQueue了。完整的代码以下,你们能够拷贝到IDE中,或者本身从新实现一遍,而后咱们就能够开始上手用一用咱们刚刚完成的阻塞队列了。

public class BlockingQueue {

    /** 存放元素的数组 */
    private final Object[] items;

    /** 弹出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;

    /** 队列中的元素总数 */
    private int count;

    /**
     * 指定队列大小的构造器
     *
     * @param capacity  队列大小
     */
    public BlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        items = new Object[capacity];
    }

    /**
     * 入队操做
     *
     * @param e 待插入的对象
     */
    private void enqueue(Object e) {
        // 将对象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增长元素总数
        count++;
    }

    /**
     * 出队操做
     *
     * @return  被弹出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 并将该位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 减小元素总数
        count--;

        // 返回以前代码中取出的元素e
        return e;
    }

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            // 直到队列未满时才执行入队操做并跳出循环
            if (count != items.length) {
                // 执行入队操做,将对象e实际放入队列中
                enqueue(e);
                break;
            }

            // 队列已满的状况下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            // 直到队列非空时才继续执行后续的出队操做并返回弹出的元素
            if (count != 0) {
                // 执行出队操做,将队列中的第一个元素弹出
                return dequeue();
            }

            // 队列为空的状况下休眠200ms
            Thread.sleep(200L);
        }
    }

}
复制代码

测验阻塞队列实现

既然已经有了阻塞队列的实现,那么咱们就写一个测试程序来测试一下吧。下面是一个对阻塞队列进行并发的插入和弹出操做的测试程序,在这个程序中,会建立2个生产者线程向阻塞队列中插入数字0~19;同时也会建立2个消费者线程从阻塞队列中弹出20个数字,并打印这些数字。并且在程序中也统计了整个程序的耗时,会在全部子线程执行完成以后打印出程序的总耗时。

这里咱们指望这个测验程序可以以任意顺序输出0~19这20个数字,而后打印出程序的总耗时,那么实际执行状况会如何呢?

public class BlockingQueueTest {

    public static void main(String[] args) throws Exception {

        // 建立一个大小为2的阻塞队列
        final BlockingQueue q = new BlockingQueue(2);

        // 建立2个线程
        final int threads = 2;
        // 每一个线程执行10次
        final int times = 10;

        // 线程列表,用于等待全部线程完成
        List<Thread> threadList = new ArrayList<>(threads * 2);
        long startTime = System.currentTimeMillis();

        // 建立2个生产者线程,向队列中并发放入数字0到19,每一个线程放入10个数字
        for (int i = 0; i < threads; ++i) {
            final int offset = i * times;
            Thread producer = new Thread(() -> {
                try {
                    for (int j = 0; j < times; ++j) {
                        q.put(new Integer(offset + j));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            threadList.add(producer);
            producer.start();
        }

        // 建立2个消费者线程,从队列中弹出20次数字并打印弹出的数字
        for (int i = 0; i < threads; ++i) {
            Thread consumer = new Thread(() -> {
                try {
                    for (int j = 0; j < times; ++j) {
                        Integer element = (Integer) q.take();
                        System.out.println(element);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            threadList.add(consumer);
            consumer.start();
        }

        // 等待全部线程执行完成
        for (Thread thread : threadList) {
            thread.join();
        }

        // 打印运行耗时
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }
}
复制代码

在个人电脑上运行这段程序的输出为:

0
1
2
3
4
5
null
10
8
7
14
9
16
15
18
17
null
复制代码

不只是打印出了不少个null,并且打印出17行以后就再也不打印更多数据,并且程序也就一直没有打印总耗时并结束了。为何会发生这种状况呢?

缘由就是在咱们实现的这个阻塞队列中彻底没有线程同步机制,因此同时并发进行的4个线程(2个生产者和2个消费者)会同时执行阻塞队列的put()take()方法。这就可能会致使各类各样并发执行顺序致使的问题,好比两个生产者同时对阻塞队列进行插入操做,有可能就会在putIndex没更新的状况下对同一下标位置又插入了一次数据,致使了数据还没被消费就被覆盖了;而两个消费者也可能会在takeIndex没更新的状况下又获取了一次已经被清空的位置,致使打印出了null。最后由于这些缘由都有可能会致使消费者线程最后尚未弹出20个数字count就已经为0了,这时消费者线程就会一直处于阻塞状态没法退出了。

那么咱们应该如何给阻塞队列加上线程同步措施,使它的运行不会发生错误呢?

一个线程安全的版本

使用互斥锁来保护队列操做

以前碰到的并发问题的核心就是多个线程同时对阻塞队列进行插入或弹出操做,那么咱们有没有办法让同一时间只能有一个线程对阻塞队列进行操做呢?

也许不少读者已经想到了,咱们最经常使用的一种并发控制方式就是synchronized关键字。经过synchronized,咱们可让一段代码同一时间只能有一个线程进入;若是在同一个对象上经过synchronized加锁,那么put()take()两个方法能够作到同一时间只能有一个线程调用两个方法中的任意一个。好比若是有一个线程调用了put()方法插入元素,那么其余线程再调用put()方法或者take()就都会被阻塞直到前一个线程完成对put()方法的调用了。

在这里,咱们只修改put()take()方法,把这两个方法中对enqueuedequeue的调用都包装到一个synchronized (this) {...}的语句块中,保证了同一时间只能有一个线程进入这两个语句块中的任意一个。若是对synchronized之类的线程同步机制还不熟悉的读者,建议先看一下这篇介绍多线程同步机制的文章《多线程中那些看不见的陷阱》再继续阅读以后的内容,相信会有事半功倍的效果。

/**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            synchronized (this) {
                // 直到队列未满时才执行入队操做并跳出循环
                if (count != items.length) {
                    // 执行入队操做,将对象e实际放入队列中
                    enqueue(e);
                    break;
                }
            }

            // 队列已满的状况下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                // 直到队列非空时才继续执行后续的出队操做并返回弹出的元素
                if (count != 0) {
                    // 执行出队操做,将队列中的第一个元素弹出
                    return dequeue();
                }
            }

            // 队列为空的状况下休眠200ms
            Thread.sleep(200L);
        }
    }
复制代码

再次测试

咱们再来试一试这个新的阻塞队列实现,在个人电脑上测试程序的输出以下:

0
1
2
3
10
11
4
5
6
12
13
14
15
7
8
9
16
17
18
19
总耗时:1.81s
复制代码

这下看起来结果就对了,并且多跑了几回也都能稳定输出全部0~19的20个数字。看起来很是棒,咱们成功了,来给本身鼓个掌吧!

可是仔细那么一看,好像最后的耗时是否是有一些高了?虽然“1.81秒”也不是太长的时间,可是好像通常计算机程序作这么一点事情只要一眨眼的功夫就能完成才对呀。为何这个阻塞队列会这么慢呢?

一个更快的阻塞队列

让咱们先来诊断一下以前的阻塞队列中究竟是什么致使了效率的下降,由于put()take()方法是阻塞队列的核心,因此咱们天然从这两个方法看起。在这两个方法里,咱们都看到了同一段代码Thread.sleep(200L),这段代码会让put()take()方法分别在队列已满和队列为空的状况下进入一次固定的200毫秒的休眠,防止线程占用过多的CPU资源。可是若是队列在这200毫秒里发生了变化,那么线程也仍是在休眠状态没法立刻对变化作出响应。好比若是一个调用put()方法的线程由于队列已满而进入了200毫秒的休眠,那么即便队列已经被消费者线程清空了,它也仍然会忠实地等到200毫秒以后才会从新尝试向队列中插入元素,中间的这些时间就都被浪费了。

可是若是咱们去掉这段休眠的代码,又会致使CPU的使用率太高的问题。那么有没有一种方法能够平衡二者的利弊,同时获得两种状况的好处又没有各自的缺点呢?

使用条件变量优化阻塞唤醒

为了完成上面这个困难的任务,既要马儿跑又要马儿不吃草。那么咱们就须要有一种方法,既让线程进入休眠状态再也不占用CPU,可是在队列发生改变时又能及时地被唤醒来重试以前的操做了。既然用了对象锁synchronized,那么咱们就找找有没有与之相搭配的同步机制能够实现咱们的目标。

Object类,也就是全部Java类的基类里,咱们找到了三个有意思的方法Object.wait()Object.notify()Object.notifyAll()。这三个方法是须要搭配在一块儿使用的,其功能与操做系统层面的条件变量相似。条件变量是这样的一种线程同步工具:

  1. 每一个条件变量都会有一个对应的互斥锁,要调用条件变量的wait()方法,首先须要持有条件变量对应的这个互斥锁。以后,在调用条件变量的wait()方法时,首先会释放已持有的这个互斥锁,而后当前线程进入休眠状态,等待被Object.notify()或者Object.notifyAll()方法唤醒;
  2. 调用Object.notify()或者Object.notifyAll()方法能够唤醒由于Object.wait()进入休眠状态的线程,区别是Object.notify()方法只会唤醒一个线程,而Object.notifyAll()会唤醒全部线程。

由于咱们以前的代码中经过synchronized获取了对应于this引用的对象锁,因此天然也就要用this.wait()this.notify()this.notifyAll()方法来使用与这个对象锁对应的条件变量了。下面是使用条件变量改造后的put()take()方法。仍是和以前同样,咱们首先以put()方法为例分析具体的改动。首先,咱们去掉了最外层的while循环,而后咱们把Thread.sleep替换为了this.wait(),以此在队列已满时进入休眠状态,等待队列中的元素被弹出后再继续。在队列知足条件,入队操做成功后,咱们经过调用this.notifyAll()唤醒了可能在等待队列非空条件的调用take()的线程。take()方法的实现与put()也基本相似,只是操做相反。

/**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        synchronized (this) {
            if (count == items.length) {
                // 队列已满时进入休眠
                this.wait();
            }

            // 执行入队操做,将对象e实际放入队列中
            enqueue(e);

            // 唤醒全部休眠等待的进程
            this.notifyAll();
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        synchronized (this) {
            if (count == 0) {
                // 队列为空时进入休眠
                this.wait();
            }

            // 执行出队操做,将队列中的第一个元素弹出
            Object e = dequeue();

            // 唤醒全部休眠等待的进程
            this.notifyAll();

            return e;
        }
    }
复制代码

可是咱们在测试程序运行以后发现结果好像又出现了问题,在我电脑上的输出以下:

0
19
null
null
null
null
null
null
null
null
null
18
null
null
null
null
null
null
null
null
总耗时:0.10s
复制代码

虽然咱们解决了耗时问题,如今的耗时已经只有0.10s了,可是结果中又出现了大量的null,咱们的阻塞队列好像又出现了正确性问题。那么问题出在哪呢?建议读者能够先本身尝试分析一下,这样有助于你们积累解决多线程并发问题的能力。

while循环判断条件是否知足

通过分析,咱们看到,在调用this.wait()后,若是线程被this.notifyAll()方法唤醒,那么就会直接开始直接入队/出队操做,而不会再次检查count的值是否知足条件。而在咱们的程序中,当队列为空时,可能会有不少消费者线程在等待插入元素。此时,若是有一个生产者线程插入了一个元素并调用了this.notifyAll(),则全部消费者线程都会被唤醒,而后依次执行出队操做,那么第一个消费者线程以后的全部线程拿到的都将是null值。并且同时,在这种状况下,每个执行完出队操做的消费者线程也一样会调用this.notifyAll()方法,这样即便队列中已经没有元素了,后续进入等待的消费者线程仍然会被本身的同类所唤醒,消费根本不存在的元素,最终只能返回null

因此要解决这个问题,核心就是在线程从this.wait()中被唤醒时也仍然要从新检查一遍count值是否知足要求,若是count不知足要求,那么当前线程仍然调用this.wait()回到等待状态当中去继续休眠。而咱们是没办法预知程序在第几回判断条件时能够获得知足条件的count值从而继续执行的,因此咱们必须让程序循环执行“判断条件 -> 不知足条件继续休眠”这样的流程,直到count知足条件为止。那么咱们就可使用一个while循环来包裹this.wait()调用和对count的条件判断,以此达到这个目的。

下面是具体的实现代码,咱们在其中把count条件(队列未满/非空)做为while条件,而后在count值还不知足要求的状况下调用this.wait()方法使当前线程进入等待状态继续休眠。

/**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        synchronized (this) {
            while (count == items.length) {
                // 队列已满时进入休眠
                this.wait();
            }

            // 执行入队操做,将对象e实际放入队列中
            enqueue(e);

            // 唤醒全部休眠等待的进程
            this.notifyAll();
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        synchronized (this) {
            while (count == 0) {
                // 队列为空时进入休眠
                this.wait();
            }

            // 执行出队操做,将队列中的第一个元素弹出
            Object e = dequeue();

            // 唤醒全部休眠等待的进程
            this.notifyAll();

            return e;
        }
    }
复制代码

再次运行咱们的测试程序,在个人电脑上获得了以下的输出:

0
10
1
2
11
12
13
3
4
14
5
6
15
16
7
17
8
18
9
19
总耗时:0.11s
复制代码

耗时只有0.11s,并且结果也是正确的,看来咱们获得了一个又快又好的阻塞队列实现。这是一个里程碑式的版本,咱们实现了一个真正能够在程序代码中使用的阻塞队列,到这里能够说你已经学会了如何实现一个阻塞队列了,让咱们为本身鼓个掌吧。

当时进度条出卖了我,这篇文章还有很多内容。既然咱们已经学会如何实现一个真正可用的阻塞队列了,咱们为何还要继续看这么多内容呢?别慌,虽然咱们已经实现了一个真正可用的版本,可是若是咱们更进一步的话就能够实现一个JDK级别的高强度版本了,这听起来是否是很是的诱人?让咱们继续咱们的旅程吧。

一个更安全的版本

咱们以前的版本中使用这些同步机制:synchronized (this)this.wait()this.notifyAll(),这些同步机制都和当前对象this有关。由于synchronized (obj)可使用任意对象对应的对象锁,而Object.wati()Object.notifyAll()方法又都是public方法。也就是说不止在阻塞队列类内部可使用这个阻塞队列对象的对象锁及其对应的条件变量,在外部的代码中也能够任意地获取阻塞队列对象上的对象锁和对应的条件变量,那么就有可能发生外部代码滥用阻塞队列对象上的对象锁致使阻塞队列性能降低甚至是发生死锁的状况。那咱们有没有什么办法可让阻塞队列在这方面变得更安全呢?

使用显式锁

最直接的方式固然是请出JDK在1.5以后引入的代替synchronized关键字的显式锁ReentrantLock类了。ReentrantLock类是一个可重入互斥锁,互斥指的是和synchronized同样,同一时间只能有一个线程持有锁,其余获取锁的线程都必须等待持有锁的线程释放该锁。而可重入指的就是同一个线程能够重复获取同一个锁,若是在获取锁时这个锁已经被当前线程所持有了,那么这个获取锁的操做仍然会直接成功。

通常咱们使用ReentrantLock的方法以下:

lock.lock();
try {
    作一些操做
}
finally {
    lock.unlock();
}
复制代码

上面的lock变量就是一个ReentrantLock类型的对象。在这段代码中,释放锁的操做lock.unlock()被放在了finally块中,这是为了保证线程在获取到锁以后,不论出现异常或者什么特殊状况都能保证正确地释放互斥锁。若是不这么作就可能会致使持有锁的线程异常退出后仍然持有该锁,其余须要获取同一个锁的线程就永远运行不了。

那么在咱们的阻塞队列中应该如何用ReentrantLock类来改写呢?

首先,咱们显然要为咱们的阻塞队列类添加一个实例变量lock来保存用于在不一样线程间实现互斥访问的ReentrantLock锁。而后咱们要将原来的synchronized(this) {...}格式的代码修改成上面使用ReentrantLock进行互斥访问保护的实现形式,也就是lock.lock(); try {...} finally {lock.unlock();}这样的形式。

可是原来与synchronized所加的对象锁相对应的条件变量使用方法this.wait()this.notifyAll()应该如何修改呢?ReentrantLock已经为你作好了准备,咱们能够直接调用lock.newCondition()方法来建立一个与互斥锁lock相对应的条件变量。而后为了在不一样线程中都能访问到这个条件变量,咱们一样要新增一个实例变量condition来保存这个新建立的条件变量对象。而后咱们原来使用的this.wait()就须要修改成condition.await(),而this.notifyAll()就修改成了condition.signalAll()

/** 显式锁 */
    private final ReentrantLock lock = new ReentrantLock();

    /** 锁对应的条件变量 */
    private final Condition condition = lock.newCondition();
    
    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                // 队列已满时进入休眠
                // 使用与显式锁对应的条件变量
                condition.await();
            }

            // 执行入队操做,将对象e实际放入队列中
            enqueue(e);

            // 经过条件变量唤醒休眠线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                // 队列为空时进入休眠
                // 使用与显式锁对应的条件变量
                condition.await();
            }

            // 执行出队操做,将队列中的第一个元素弹出
            Object e = dequeue();

            // 经过条件变量唤醒休眠线程
            condition.signalAll();

            return e;
        } finally {
            lock.unlock();
        }
    }
复制代码

到这里,咱们就完成了使用显式锁ReentrantLock所须要作的全部改动了。整个过程当中并不涉及任何逻辑的变动,咱们只是把synchronized (this) {...}修改成了lock.lock() try {...} finally {lock.unlock();},把this.wait()修改成了condition.await(),把this.notifyAll()修改成了condition.signalAll()。就这样,咱们的锁和条件变量由于是private字段,因此外部的代码就彻底没法访问了,这让咱们的阻塞队列变得更加安全,是时候能够提供给其余人使用了。

可是这个版本的阻塞队列仍然还有很大的优化空间,继续阅读下一篇文章,相信你就能够实现出JDK级别的阻塞队列了。

相关文章
相关标签/搜索