Condition 接口

Condition 接口

任意一个java对象,都拥有一组监视器方法,主要包括 wait(),wait(long timeout)、notify 以及 notifyAll 方法,这些方法与synchronized 同步关键字配合,能够实现等待/通知模式。Condition 接口也提供了类型Object的监视器方法,与Lock配合能够实现等待/通知模式。java

condition 接口与示例

Condition 定义了等待/通知两种类型的方法,当前线程调用这些方法时,须要提早获取到Condition对象关联的锁。Condition对象由Lock对象(调用Lock对象的 newCondition方法)建立出来的。Condition是依赖Lock对象的。node

package com.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by cxx on 2018/1/18.
 */
public class ConditionOption {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    
    public void conditonWait() throws InterruptedException{
        lock.lock();
        
        try {
            condition.await();
        }finally {
            lock.unlock();
        }
        
    }
    
    public void conditonSignal() throws InterruptedException{
        lock.lock();
        try {
            condition.signal();
        }finally {
            lock.unlock();
        }
        
        
        
    }
            
}

获取一个Condition 必须经过Lock的newCondition() 方法。有界队列是一种特殊的队列,当队列为空时,队列的获取操做将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操做将会阻塞插入线程,知道队列出现“空位。”数组

package com.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by cxx on 2018/1/18.
 */
public class BoundedQueue <T> {
    private Object[] items;

    private int addIndex,removeIndex,count;

    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size){
        items = new Object[size];
    }

    //添加一个元素,若是数组满,则添加线程进入等待状态,直到有空位
    public void add(T t) throws InterruptedException{
        lock.lock();
        try {
            while (count == items.length){
                notFull.await();
            }
            items[addIndex] = t;

            if (++addIndex == items.length){
                addIndex = 0;
            }
            ++count;
            notEmpty.signal();

        }finally {
            lock.unlock();
        }

    }

    //由头部删除一个元素,若是数组空,则删除线程进入等待状态,直到有添加元素。
    public T remove() throws InterruptedException{
        lock.lock();

        try {
            while (count == 0){
                notEmpty.await();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length){
                removeIndex = 0;
            }
            --count;
            notFull.signal();
            return (T) x;

        }finally {
            lock.unlock();
        }
    }
}

在添加和删除方法中使用 while 循环而非 if判断,目的是防止过早或意外的通知,只有条件符合才可以退出循环。安全

Condition 的实现分析

ConditionObject 是同步器 AbstractQueuedSynchronizer的内部类,由于Condition的操做须要获取相关联的锁。每一个Condition对象都包含着一个队列,该队列是Condition对象实现 等待/通知功能的关键。并发

等待队列

等待队列是一个FIFO的队列,在队列中的每一个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,若是一个线程调用了Condition.await()方法,那么该线程将会释放锁、构形成节点加入等待队列并进入等待状态。ui

一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。 Condition拥有首尾节点的引用,而新增节点只须要将原有的尾节点 nextWaiter 指向它,而且更新尾节点便可。上述节点应用更新的过程并无使用CAS保证,缘由在于调用 await()方法的线程一定是获取了锁的线程,这个过程是由锁来保证线程安全的。this

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列。线程

等待

调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程必定获取了Condition相关联的锁。code

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法将会当前线程构形成节点并加入等待队列中,而后释放同步状态,唤醒同步队列中的后继节点,而后当前线程会进入等待状态。对象

通知

调用 Condition的signal() 方法,将会唤醒在等待队列中等待时间最长的节点,在唤醒节点以前,会将节点移到同步队列中。

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

调用该方法的前置条件是当前线程必须获取了锁,能够看到signal()方法进行了IsHeldExclusively()检查,也就是当前线程必须是获取了锁的线程,接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程在使用LockSupport唤醒该节点的线程。

被唤醒后的线程,将从await()方法中的while循环中退出,进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。

成功获取同步状态以后,被唤醒的线程就爱哪根葱先前调用的await()方法返回,此时该线程已经成功地获取了锁。

Condition的signalAll()方法,至关于对等待队列中的每一个节点均执行一次signal()方法,效果就是将等待队列中全部节点所有移动到同步队列中,并唤醒每一个节点的线程。

相关文章
相关标签/搜索