浅谈Java中的Condition条件队列,手摸手带你实现一个阻塞队列!

条件队列是什么?可能不少人和我同样答不出来,不过今天终于搞清楚了!java

什么是条件队列

条件队列:当某个线程调用了wait方法,或者经过Condition对象调用了await相关方法,线程就会进入阻塞状态,并加入到对应条件队列中。node

等待唤醒机制相关文章中咱们提到了条件队列,即当对象获取到同步锁以后,若是调用了wait方法,当前线程会进入到条件队列中,并释放锁。并发

synchronized(对象){ // 获取锁失败,线程会加入到同步队列中 
    while(条件不知足){
        对象.wait();// 调用wait方法当前线程加入到条件队列中
    }
}

基于synchcronized的内置条件队列存在一些缺陷。每一个内置锁都只能有一个相关联的条件队列,于是存在多个线程可能在同一个条件队列上等待不一样的条件谓词,而且在最多见的加锁模式下公开条件队列对象。框架

Java中的锁的实现能够分为两种,一种是基于synchronized的隐式锁,它是基于JVM层面实现的;而另外一种则是基于AQS框架在代码层面实现的锁,如ReentrantLock等,在进行并发控制过程当中,不少状况下他们均可以相互替代。ide

其中同步队列和条件队列是AQS中两个比较核心的概念,它们是代码层面实现锁的关键。关于同步队列的内容,咱们已经在图解AQS的设计与实现,手摸手带你实现一把互斥锁!中进行了详细的介绍。ui

与Object配合synchronized相比,基于AQS的Lock&Condition实现的等待唤醒模式更加灵活,支持多个条件队列,支持等待状态中不响应中断以及超时等待功能; 其次就是基于AQS实现的条件队列是"肉眼可见"的,咱们能够经过源代码进行debug,而synchronized则是彻底隐式的。this

同步队列和条件队列

与条件队列密不可分的类则是ConditionObject, 是AQS中实现了Condition接口的内部类,一般配合基于AQS实现的锁一同使用。当线程获取到锁以后,能够调用await方法进入条件队列并释放锁,或者调用singinal方法唤醒对应条件队列中等待时间最久的线程并加入到等待队列中。线程

在AQS中,线程会被封装成Node对象加入队列中,而条件队列中则复用了同步队列中的Node对象debug

Condition相关方法和描述

Condition接口一共定义了如下几个方法:设计

await(): 当前线程进入等待状态,直到被通知(siginal)或中断【和wait方法语义相同】。

awaitUninterruptibly(): 当前线程进入等待状态,直到被通知,对中断不敏感。

awaitNanos(long timeout): 当前线程进入等待状态直到被通知(siginal),中断或超时。

awaitUnitil(Date deadTime): 当前线程进入等待状态直到被通知(siginal),中断或到达某个时间。

signal(): 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须得到与Condition关联的锁【和notify方法语义相同】

signalAll(): 唤醒全部等待在Condition上的线程,可以从等待方法返回的线程必须得到与Condition关联的锁【和notifyAll方法语义相同】。

条件队列入队操做

当线程获取到锁以后,Condition对象调用await相关的方法,线程会进入到对应的条件队列中。

/**
  * 若是当前线程被终端,抛出 InterruptedException 异常
  */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加当前线程到【条件队列】
    Node node = addConditionWaiter();
    // 释放已经获取的锁资源,并返回释放前的同步状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 若是当前节点不在【同步队列】中, 线程进入阻塞状态,等待被唤醒
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

条件队出队操做

Condition对象调用signal或者signalAll方法时,

/**
 * 将【条件队列】中第一个有效的元素移除而且添加到【同步队列】中
 * 所谓有效指的是非null,而且状态吗
 * @param first 条件队列中第一个非空的元素
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    // 将条件队列中等待最久的那个有效元素添加到同步队列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
 * 将条件队列中的节点转换到同步队列中
 */
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     * 若是节点的等待状态不能被修改,说明当前线程已经被取消等待【多个线程执行siginal时会出现的状况】
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * 加入到【同步队列】中,而且尝试将前驱节点设置为可唤醒状态
     */
    Node p = enq(node); // 将node添加到同步队列中,并返回它的前驱节点
    int ws = p.waitStatus;
    // 若是前驱节点不须要唤醒,或者设置状态为‘唤醒’失败,则唤醒线程时期从新争夺同步状态
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

实现阻塞队列

  1. 自定义互斥锁升级,增长获取Condition对象接口。
/**
 * 自定义互斥锁
 *
 * @author cruder
 * @time 2019/11/29 9:43
 */
public class MutexLock {

    private static final Sync STATE_HOLDER = new Sync();

    /**
     * 经过Sync内部类来持有同步状态, 当状态为1表示锁被持有,0表示锁处于空闲状态
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 是否被独占, 有两种表示方式
         *  1. 能够根据状态,state=1表示锁被占用,0表示空闲
         *  2. 能够根据当前独占锁的线程来判断,即getExclusiveOwnerThread()!=null 表示被独占
         */
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() != null;
        }

        /**
         * 尝试获取锁,将状态从0修改成1,操做成功则将当前线程设置为当前独占锁的线程
         */
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 释放锁,将状态修改成0
         */
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //【新增代码】
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }

    /**
     * 下面的实现Lock接口须要重写的方法,基本是就是调用内部内Sync的方法
     */
    public void lock() {
        STATE_HOLDER.acquire(1);
    }

    public void unlock() {
        STATE_HOLDER.release(1);
    }

    // 【新增代码】 获取条件队列
    public Condition newCondition(){
        return STATE_HOLDER.newCondition();
    }

}
  1. 基于自定义互斥锁,实现阻塞队列。阻塞队列具备两个特色:
  • 添加元素到队列中, 若是队列已满会使得当前线程阻塞【加入到条件队列-队列不满】,直到队列不满为止
  • 移除队列中的元素,当队列为空时会使当前线程阻塞【加入到条件队列-队列不空】,直到队列不为空为止
/**
 *  有界阻塞阻塞队列
 *
 * @author Jann Lee
 * @date 2019-12-11 22:20
 **/
public class BoundedBlockingQueue<T> {
    /**
     * list做为底层存储结构
     */
    private List<T> dataList;
    /**
     * 队列的大小
     */
    private int size;

    /**
     * 锁,和条件变量
     */
    private MutexLock lock;
    /**
     * 队列非空 条件变量
     */
    private Condition notEmpty;
    /**
     * 队列未满 条件变量
     */
    private Condition notFull;

    public BoundedBlockingQueue(int size) {
        dataList = new ArrayList<>();
        lock = new MutexLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
        this.size = size;
    }


    /**
     * 队列中添加元素 [只有队列未满时才能够添加,不然须要等待队列变成未满状态]
     */
    public void add(T data) throws InterruptedException {
        lock.lock();
        try {
            // 若是队列已经满了, 须要在等待“队列未满”条件知足
            while (dataList.size() == size) {
                notFull.await();
            }
            dataList.add(data);
            Thread.sleep(2000);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 移除队列的第一个元素[只有队列非空才能够移除,不然须要等待变成队列非空状态]
     */
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            // 若是为空, 须要在等待“队列非空”条件知足
            while (dataList.isEmpty()) {
                notEmpty.await();
            }
            T result = dataList.remove(0);
            notFull.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

}

总结

  1. 条件队列和同步队列在Java中有两种实现,synchronized关键字以及基于AQS
  2. 每一个(基于synchronized的)内置锁都只能有一个相关联的条件队列,会存在多个线程可能在同一个条件队列上等待不一样的条件谓词;而(基于AQS实现的)显式锁支持多个条件队列
  3. 与wait,notify,notifyAll 对应的方法时Conditoin接口中的await,signal,signalAll,他们具备相同的语义

最后敬上一个关注了也没有错的公众号~

相关文章
相关标签/搜索