从0到1实现本身的阻塞队列(下)

在上一篇文章《从0到1实现本身的阻塞队列(上)》中,咱们已经实现了一个可使用的阻塞队列版本。在这篇文章中,咱们能够继续咱们的冒险之旅,将咱们的阻塞队列提高到接近JDK版本的水平上。java

更进一步优化效率

咱们一直使用的都是Object.notifyAll()或者condition.signalAll()这样会唤醒全部线程的方法,那么若是只有一个线程可以顺利执行,可是其余线程都要再次回到等待状态继续休眠,那不是很是的浪费吗?好比若是有N个消费者线程在等待队列中出现元素,那么当一个元素被插入之后全部N个消费者线程都被所有唤醒,最后也只会有一个消费者线程可以真正拿到元素并执行完成,其余线程不是都被白白唤醒了吗?咱们为何不用只会唤醒一个线程的Object.notify()condition.signal()方法呢?编程

拆分条件变量

在阻塞队列中,咱们可使用Object.notify()或者condition.signal()这样只唤醒一个线程的方法,可是会有一些前提条件:数组

  1. 首先,在一个条件变量上等待的线程必须是同一类型的。好比一个条件变量上只有消费者线程在等待,另外一个条件变量上只有生产者线程在等待。这么作的目的就是防止发生咱们在插入时想唤醒的是消费者线程,可是唤醒了一个生产者线程,这个生产者线程又由于队列已满又进入了等待状态,这样咱们须要唤醒的消费者线程就永远不会被唤醒了。
  2. 另外还有一点就是这个条件变量上等待的线程只能互斥执行,若是N个生产者线程能够同时执行,咱们也就不须要一个一个唤醒了,这样反而会让效率下降。固然,在咱们的阻塞队列当中,无论是插入仍是弹出操做同一时间都只能有一个线程在执行,因此天然就知足这个要求了。

因此,咱们只须要知足第一个要求让不一样类型的线程在不一样的条件变量上等待就能够了。那么具体要怎么作呢?安全

首先,咱们天然是要把原来的一个条件变量condition给拆分红两个实例变量notFullnotEmpty,这两个条件变量虽然对应于同一互斥锁,可是两个条件变量的等待和唤醒操做是彻底隔离的。这两个条件变量分别表明队列未满和队列非空两个条件,消费者线程由于是被队列为空的状况所阻塞的,因此就应该等待队列非空条件获得知足;而生产者线程由于是被队列已满的状况所阻塞的,天然就要等待队列未满条件的成立。bash

/** 队列未满的条件变量 */
    private final Condition notFull = lock.newCondition();

    /** 队列非空的条件变量 */
    private final Condition notEmpty = lock.newCondition();
复制代码

因此在put()take()方法中,咱们就须要把take()方法中原来的condition.await()修改成等待队列非空条件,即notEmpty.await();而put()方法中的condition.await()天然是要修改成等待队列未满条件成立,即notFull.await()。既然咱们把等待条件变量的语句都改了,那么唤醒的语句也要作一样的修改,put()操做要唤醒等待的消费者线程,因此是notEmpty.signal()take()操做要唤醒的生产者线程,因此是notFull.signal()。修改完成后的代码以下,你们能够参考一下:多线程

/**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                // 队列已满时进入休眠
                // 等待队列未满条件获得知足
                notFull.await();
            }

            // 执行入队操做,将对象e实际放入队列中
            enqueue(e);

            // 插入元素后唤醒一个等待队列非空条件成立的线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                // 队列为空时进入休眠
                // 等待队列非空条件获得知足
                notEmpty.await();
            }

            // 执行出队操做,将队列中的第一个元素弹出
            Object e = dequeue();

            // 弹出元素后唤醒一个等待队列未满条件成立的线程
            notFull.signal();

            return e;
        } finally {
            lock.unlock();
        }
    }
复制代码

验证程序的效率

既然咱们对阻塞队列作了效率上的改进,那么就让咱们来实际检验一下吧。咱们仍是以前已经提供的检验程序,可是不一样的是,为了明显地看出效率上的变化,咱们须要修改一下程序中的两个变量。首先,咱们须要把检验程序中运行的线程数threads增长到400,而后咱们须要把每一个线程执行的次数改成100次,就像下面这样:并发

// 建立400个线程
        final int threads = 400;
        // 每一个线程执行100次
        final int times = 100;
复制代码

最后咱们分别使用改进前和改进后的版原本执行这个这个阻塞队列,在个人电脑上,改进前的版本耗时为7.80s,改进后的版本耗时为1.35s。看起来咱们对阻塞队列的效率作了一个很是大的提高,很是好,那咱们还有没有办法再加快一点呢?工具

还能不能更快?

在上面的阻塞队列实现中,咱们主要使用的就是put()take()两个操做。而由于有互斥锁ReentrantLock的保护,因此这两个方法在同一时间只能有一个线程调用。也就是说,生产者线程在操做队列时一样会阻塞消费者线程。不过从咱们的代码中看,实际上put()方法和take()方法之间须要有互斥锁保护的共享数据访问只发生在入队操做enqueue方法和出队操做dequeue方法之中。在这两个方法里,对于putIndextakeIndex的访问是彻底隔离的,enqueue只使用putIndex,而dequeue只使用takeIndex,那么线程间的竞争性数据就只剩下count了。这样的话,若是咱们能解决count的更新问题是否是就能够把锁lock拆分为两个互斥锁,分别让生产者线程和消费者线程使用了呢?这样的话生产者线程在操做时就只会阻塞生产者线程而不会阻塞消费者线程了,消费者线程也是同样的道理。post

拆分锁

这时候就要请出咱们很熟悉的一种同步工具CAS了,CAS是一个原子操做,它会接收两个参数,一个是当前值,一个是目标值,若是当前值已经发生了改变,那么就会返回失败,而若是当前值没有变化,就会将这个变量修改成目标值。在Java中,咱们通常会经过java.util.concurrent中的AtomicInteger来执行CAS操做。在AtomicInteger类上有原子性的增长与减小方法,每次调用均可以保证对指定的对象进行增长或减小,而且即便有多个线程同时执行这些操做,它们的结果也仍然是正确的。优化

首先,为了保证入队和出队操做之间的互斥特性移除后两个方法可以并发执行,那么咱们就要保证对count的更新是线程安全的。所以,咱们首先须要把实例变量count的类型从int修改成AtomicInteger,而AtomicInteger类就提供了咱们须要的原子性的增长与减小接口。

/** 队列中的元素总数 */
    private AtomicInteger count = new AtomicInteger(0);
复制代码

而后对应地,咱们须要将入队方法中的count++和出队方法中的count--分别改成Atomic原子性的加1方法getAndIncrement与减1方法getAndDecrement

/**
     * 入队操做
     *
     * @param e 待插入的对象
     */
    private void enqueue(Object e) {
        // 将对象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增长元素总数
        count.getAndIncrement();
    }

    /**
     * 出队操做
     *
     * @return  被弹出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 并将该位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 减小元素总数
        count.getAndDecrement();

        // 返回以前代码中取出的元素e
        return e;
    }
复制代码

到这里,咱们就已经解决了put()take()方法之间的数据竞争问题,两个方法如今就能够分别用两个锁来控制了。虽然相同类型的线程仍然是互斥的,例如生产者和生产者之间同一时间只能有一个生产者线程在操做队列。可是在生产者线程和消费者线程之间将不用再继续互斥,一个生产者线程和一个消费者线程能够在同一时间操做同一阻塞队列了。因此,咱们在这里能够将互斥锁lock拆为两个,分别保证生产者线程和消费者线程的互斥性,咱们将它们命名为插入锁putLock和弹出锁takeLock。同时,原来的条件变量也要分别对应于不一样的互斥锁了,notFull要对应于putLock,由于插入元素的生产者线程须要等待队列未满条件,那么notEmpyt天然就要对应于takeLock了。

/** 插入锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 队列未满的条件变量 */
    private final Condition notFull = putLock.newCondition();

    /** 弹出锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 队列非空的条件变量 */
    private final Condition notEmpty = takeLock.newCondition();
复制代码

最后咱们要对put()take()方法中的signal()调用作出一些调整。由于在上文中提到的,在使用条件变量时必定要先持有条件变量所对应的互斥锁,而在put()take()方法中,使用signal()方法唤醒的都是另外一种类型的线程,例如生产者线程唤醒消费者,消费者线程唤醒生产者。这样咱们调用signal()方法的条件变量就和try语句中持有的锁不一致了,因此咱们必须将直接的xxx.signal()调用替换为一个私有方法调用。而在私有方法中,咱们会先获取与条件变量对应的锁,而后再调用条件变量的signal()方法。好比在下面的signalNotEmpty()方法中,咱们就要先获取takeLock才能调用notEmpty.signal();而在signalNotFull()方法中,咱们就要先获取putLock才能调用notFull.signal()

/**
     * 唤醒等待队列非空条件的线程
     */
    private void signalNotEmpty() {
        // 为了唤醒等待队列非空条件的线程,须要先获取对应的takeLock
        takeLock.lock();
        try {
            // 唤醒一个等待非空条件的线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    
    /**
     * 唤醒等待队列未满条件的线程
     */
    private void signalNotFull() {
        // 为了唤醒等待队列未满条件的线程,须要先获取对应的putLock
        putLock.lock();
        try {
            // 唤醒一个等待队列未满条件的线程
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
复制代码

解决死锁问题

但直接把notFull.signal()换成signalNotFull(),把notEmpty.signal()换成signalNotEmpty()还不够,由于在咱们的代码中,原来的notFull.signal()notEmpty.signal()都是在持有锁的try语句块当中的。一旦咱们作了调用私有方法的替换,那么put()take()方法就会以相反的顺序同时获取putLocktakeLock两个锁。有一些读者可能已经意识到这样会产生死锁问题了,那么咱们应该怎么解决它呢?

最好的方法就是不要同时加两个锁,咱们彻底能够在释放前一个以后再使用signal()方法来唤醒另外一种类型的线程。就像下面的put()take()方法中所作的同样,咱们能够在执行完入队操做以后就释放插入锁putLock,而后才运行signalNotEmpty()方法去获取takeLock并调用与其对应的条件变量notEmptysignal()方法,在take()方法中也是同样的道理。

/**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        putLock.lockInterruptibly();
        try {
            while (count.get() == items.length) {
                // 队列已满时进入休眠
                // 等待队列未满条件获得知足
                notFull.await();
            }

            // 执行入队操做,将对象e实际放入队列中
            enqueue(e);
        } finally {
            putLock.unlock();
        }

        // 唤醒等待队列非空条件的线程
        // 为了防止死锁,不能在释放putLock以前获取takeLock
        signalNotEmpty();
    }
    
    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        Object e;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                // 队列为空时进入休眠
                // 等待队列非空条件获得知足
                notEmpty.await();
            }

            // 执行出队操做,将队列中的第一个元素弹出
            e = dequeue();
        } finally {
            takeLock.unlock();
        }

        // 唤醒等待队列未满条件的线程
        // 为了防止死锁,不能在释放takeLock以前获取putLock
        signalNotFull();

        return e;
    }
复制代码

到了这里咱们就顺利地把原来单一的一个lock锁拆分为了插入锁putLocktakeLock,这样生产者线程和消费者线程就能够同时运行了。

最后的细节优化——优化唤醒其余线程的效率

啊?咱们的阻塞队列到了这里还能再继续优化吗?其实咱们作的优化已经足够多了,基本上影响比较大的优化咱们都作了,可是还有一些细节是能够最后完善一下的。好比说若是队列并无为空或者已满时,咱们插入或者弹出了元素其实都是不须要唤醒任何线程的,多余的唤醒操做须要先获取ReentrantLock锁才能调用对应的条件变量的signal()方法,而获取锁是一个成本比较大的操做。因此咱们最好是能在队列真的为空或者已满之后,成功插入或弹出元素时,再去获取锁并唤醒等待的线程。

也就是说咱们会将signalNotEmpty();修改成if (c == 0) signalNotEmpty();,而把signalNotFull();修改成if (c == items.length) signalNotFull();,也就是只有在必要的时候才去唤醒另外一种类型的线程。可是这种修改又会引入另一种问题,例若有N个消费者线程在等待队列非空,这时有两个生产者线程插入了两个元素,可是这两个插入操做是连续发生的,也就是说只有第一个生产者线程在插入元素以后调用了signalNotEmpty(),第二个线程看到队列本来是非空的就不会调用唤醒方法。在这种状况下,实际就只有一个消费者线程被唤醒了,而实际上队列中还有一个元素可供消费。那么咱们如何解决这个问题呢?

比较简单的一种方法就是,生产者线程和消费者线程不止会唤醒另外一种类型的线程,并且也会唤醒同类型的线程。好比在生产者线程中若是插入元素以后发现队列还未满,那么就能够调用notFull.signal()方法来唤醒其余可能存在的等待状态的生产者线程,对于消费者线程所使用的take()方法也是相似的处理方式。相对来讲signal方法较低,而互斥锁的lock方法成本较高,并且会影响到另外一种类型线程的运行。因此经过这种方式尽量地少调用signalNotEmpty()signalNotFull()方法会是一种还不错的优化手段。

优化后的put()take()方法以下:

/**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        int c = -1;
        putLock.lockInterruptibly();
        try {
            while (count.get() == items.length) {
                // 队列已满时进入休眠
                // 等待队列未满条件获得知足
                notFull.await();
            }

            // 执行入队操做,将对象e实际放入队列中
            enqueue(e);

            // 增长元素总数
            c = count.getAndIncrement();

            // 若是在插入后队列仍然没满,则唤醒其余等待插入的线程
            if (c + 1 < items.length)
                notFull.signal();

        } finally {
            putLock.unlock();
        }

        // 若是插入以前队列为空,才唤醒等待弹出元素的线程
        // 为了防止死锁,不能在释放putLock以前获取takeLock
        if (c == 0)
            signalNotEmpty();
    }
    
    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        Object e;
        int c = -1;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                // 队列为空时进入休眠
                // 等待队列非空条件获得知足
                notEmpty.await();
            }

            // 执行出队操做,将队列中的第一个元素弹出
            e = dequeue();

            // 减小元素总数
            c = count.getAndDecrement();

            // 若是队列在弹出一个元素后仍然非空,则唤醒其余等待队列非空的线程
            if (c - 1 > 0)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }

        // 只有在弹出以前队列已满的状况下才唤醒等待插入元素的线程
        // 为了防止死锁,不能在释放takeLock以前获取putLock
        if (c == items.length)
            signalNotFull();

        return e;
    }
复制代码

成品出炉!

恭喜你们,通过一番漫长的探索,咱们终于完全完成了咱们的阻塞队列实现之旅。若是你能坚持到这里,我相信你已经对多线程编程的实践方法有了很是深入的理解。最后让咱们来看一看咱们最终完成的成品代码——一个完整的阻塞队列实现吧。

完整的阻塞队列代码

public class BlockingQueue {

    /** 存放元素的数组 */
    private final Object[] items;

    /** 弹出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;

    /** 队列中的元素总数 */
    private AtomicInteger count = new AtomicInteger(0);

    /** 插入锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 队列未满的条件变量 */
    private final Condition notFull = putLock.newCondition();

    /** 弹出锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 队列非空的条件变量 */
    private final Condition notEmpty = takeLock.newCondition();

    /**
     * 指定队列大小的构造器
     *
     * @param capacity  队列大小
     */
    public BlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        items = new Object[capacity];
    }

    /**
     * 入队操做
     *
     * @param e 待插入的对象
     */
    private void enqueue(Object e) {
        // 将对象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++putIndex == items.length)
            putIndex = 0;
    }

    /**
     * 出队操做
     *
     * @return  被弹出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 并将该位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向后移一位,若是已到末尾则返回队列开头(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 返回以前代码中取出的元素e
        return e;
    }

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        int c = -1;
        putLock.lockInterruptibly();
        try {
            while (count.get() == items.length) {
                // 队列已满时进入休眠
                // 等待队列未满条件获得知足
                notFull.await();
            }

            // 执行入队操做,将对象e实际放入队列中
            enqueue(e);

            // 增长元素总数
            c = count.getAndIncrement();

            // 若是在插入后队列仍然没满,则唤醒其余等待插入的线程
            if (c + 1 < items.length)
                notFull.signal();

        } finally {
            putLock.unlock();
        }

        // 若是插入以前队列为空,才唤醒等待弹出元素的线程
        // 为了防止死锁,不能在释放putLock以前获取takeLock
        if (c == 0)
            signalNotEmpty();
    }

    /**
     * 唤醒等待队列非空条件的线程
     */
    private void signalNotEmpty() {
        // 为了唤醒等待队列非空条件的线程,须要先获取对应的takeLock
        takeLock.lock();
        try {
            // 唤醒一个等待非空条件的线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {
        Object e;
        int c = -1;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                // 队列为空时进入休眠
                // 等待队列非空条件获得知足
                notEmpty.await();
            }

            // 执行出队操做,将队列中的第一个元素弹出
            e = dequeue();

            // 减小元素总数
            c = count.getAndDecrement();

            // 若是队列在弹出一个元素后仍然非空,则唤醒其余等待队列非空的线程
            if (c - 1 > 0)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }

        // 只有在弹出以前队列已满的状况下才唤醒等待插入元素的线程
        // 为了防止死锁,不能在释放takeLock以前获取putLock
        if (c == items.length)
            signalNotFull();

        return e;
    }

    /**
     * 唤醒等待队列未满条件的线程
     */
    private void signalNotFull() {
        // 为了唤醒等待队列未满条件的线程,须要先获取对应的putLock
        putLock.lock();
        try {
            // 唤醒一个等待队列未满条件的线程
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

}
复制代码

有兴趣的读者能够把咱们完成的这个阻塞队列类和JDK中的java.util.concurrent.LinkedBlockingQueue类作一个比较,相信你们能够发现这两个类很是的类似,这足以说明咱们费劲千辛万苦实现的这个阻塞队列类已经很是接近JDK中的阻塞队列类的质量了。

总结

恭喜你们终于完整地读完了这篇文章!在这篇文章中,咱们从一个最简单的阻塞队列版本开始,一路解决了各类问题,最终获得了一个完整、高质量的阻塞队列实现。咱们一块儿来回忆一下咱们解决的问题吧。从最简单的阻塞队列开始,咱们首先用互斥锁synchronized关键字解决了并发控制问题,保证了队列在多线程访问状况下的正确性。而后咱们用条件变量Object.wati()Object.notifyAll()解决了休眠唤醒问题,使队列的效率获得了飞跃性地提升。为了保障队列的安全性,不让外部代码能够访问到咱们所使用的对象锁和条件变量,因此咱们使用了显式锁ReentrantLock,并经过锁对象locknewCondition()方法建立了与其相对应的条件变量对象。最后,咱们对队列中的条件变量和互斥锁分别作了拆分,使队列的效率获得了进一步的提升。固然,最后咱们还加上了一点对唤醒操做的有条件调用优化,使整个阻塞队列的实现变得更加完善。

相关文章
相关标签/搜索