基于AQS的Condition接口

  • 简介

Condition接口实现的功能与Object类中的wait/notify(等待/通知机制)相似,Object的wait和notify/notify是与对象监视器(java线程状态操做和锁与监视器)配合完成线程间的等待/通知机制,而Condition与Lock配合完成等待通知机制,前者是java底层级别的,后者是语言级别的,具备更高的可控制性和扩展性java


  • 区别
  1. Condition可以支持不响应中断,而经过使用Object方式不支持;
  2. Condition可以支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
  3. Condition可以支持超时时间的设置,而Object不支持

  • 主要方法
  1. void await() 形成当前线程在接到信号或被中断以前一直处于等待状态
  2. boolean await(long time, TimeUnit unit) 形成当前线程在接到信号、被中断或到达指定等待时间以前一直处于等待状态
  3. void signal() 唤醒一个线程
  4. void signalAll() 唤醒所有线程
  • 线程必须在获取到锁的前提下调用await,图中展现了线程获取到锁后的操做
  1. AQS同步队列的头结点(head)获取到锁后,从AQS同步队列中被剔除,而且切断了与后续节点的联系
  2. 由于Condition基于AQS,因此AQS同步队列与Condition等待队列用的Node类是同一个
  3. 将获取到锁的线程包装成Node对象后加入到Condition等待队列的尾部(lastWaiter)

// 与Object wait()同样,调用await()方法的线程必须先获取锁
public final void await() throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	// 将线程封装成Node,加入到Condition等待队列的尾部
	Node node = addConditionWaiter();
	// 释放当前线程所占用的lock,在释放的过程当中会唤醒同步队列中的下一个节点
	// 与Object wait()同样,await也会释放当前获取的锁
	int savedState = fullyRelease(node);
	int interruptMode = 0;
	// 判断当前node是否在AQS同步队列中,若是不在就阻塞等待加入AQS同步队列后唤醒
	while (!isOnSyncQueue(node)) {
		LockSupport.park(this);
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	// 从AQS中获取锁后继续执行
	// acquireQueued方法参考这篇博客
	// [AQS](https://my.oschina.net/kdy1994/blog/3022593 "AQS")
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		interruptMode = REINTERRUPT;
	if (node.nextWaiter != null) // clean up if cancelled
		unlinkCancelledWaiters();
	if (interruptMode != 0)
		reportInterruptAfterWait(interruptMode);
}

// 将线程封装成Node对象,加入到Condition等待队列的尾部
// firstWaiter 表示Conditon等待队列的头结点,lastWaiter是尾结点
private Node addConditionWaiter() {
	Node t = lastWaiter;
	// If lastWaiter is cancelled, clean out.
	if (t != null && t.waitStatus != Node.CONDITION) {
		unlinkCancelledWaiters();
		t = lastWaiter;
	}
	// Node.CONDITION = -2
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	if (t == null)
		firstWaiter = node;
	else
		t.nextWaiter = node;
	lastWaiter = node;
	return node;
}

public final void signal() {
	// 判断是否获取了锁
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	// firstWaiter Condition等待队列的头结点
	Node first = firstWaiter;
	if (first != null)
		doSignal(first);
}

private void doSignal(Node first) {
	do {
		if ( (firstWaiter = first.nextWaiter) == null)
			lastWaiter = null;
		// 将头结点从等待队列中移除
		first.nextWaiter = null;
	} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	//将node waitStatus设置为0
	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
		return false;
	// 将node加入AQS同步队列尾部,加入后await()的while循环条件不成立了
	Node p = enq(node);
	int ws = p.waitStatus;
	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
		LockSupport.unpark(node.thread);
	return true;
}

  • 图中主要想表达的是AQS同步队列与Condition等待队列的头和尾的变化状况
  1. AQS同步队列的尾(tail)变成了Condition等待队列的头(firstWaiter)node

  2. Condition等待队列的firstWaiter变成了以前firstWaiter的nextWaiter(doSigna()方法的if判断)ui

    if ( (firstWaiter = first.nextWaiter) == null)this

  3. Condition等待队列的firstWaiter与next的联系被中断了(doSignal()方法中.net

    first.nextWaiter = null线程

  • Condition的await与signal流程
  • awaitThread
  1. awaitThread获取锁成功,则执行await()方法进入condition等待队列尾部,调用LockSupport.park(this)阻塞线程
  2. awaitThread获取锁失败,则进入AQS同步队列尾部,等待其余线程释放锁后,从新竞争锁成功后执行上一步
  3. signalThread执行signal()后,awaitThread从Condition等待队列中加入到了AQS同步队列尾部后,当await线程unPark后,while循环条件不成立跳出循环while (!isOnSyncQueue(node)) 接下来执行acquireQueued(node, savedState),从AQS同步队列中获取锁后结束退出
  • signalThread
  1. signalThread获取锁成功,则执行signal()将Condition等待队列头结点(firstWaiter )加入到AQS同步队列尾部(tail),结束退出
  2. signalThread获取锁失败,则进入AQS同步队列尾部,等待其余线程释放锁后,从新竞争锁成功后执行上一步

  • 基于Condition的写入与消费示例
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 若是“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
            while (count == items.length)
                notFull.await();
            // 将x添加到缓冲中
            items[putptr] = x;
            // 将“put统计数putptr+1”;若是“缓冲已满”,则设putptr为0。
            if (++putptr == items.length) putptr = 0;
            // 将“缓冲”数量+1
            ++count;
            // 唤醒take线程,由于take线程经过notEmpty.await()等待
            notEmpty.signal();

            // 打印写入的数据
            System.out.println(Thread.currentThread().getName() + " put  " + (Integer) x);
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 若是“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
            while (count == 0)
                notEmpty.await();
            // 将x从缓冲中取出
            Object x = items[takeptr];
            // 将“take统计数takeptr+1”;若是“缓冲为空”,则设takeptr为0。
            if (++takeptr == items.length) takeptr = 0;
            // 将“缓冲”数量-1
            --count;
            // 唤醒put线程,由于put线程经过notFull.await()等待
            notFull.signal();

            // 打印取出的数据
            System.out.println(Thread.currentThread().getName() + " take " + (Integer) x);
            return x;
        } finally {
            lock.unlock();    // 释放锁
        }
    }
}

public class ConditionTest {
    private static BoundedBuffer bb = new BoundedBuffer();

    public static void main(String[] args) {
        // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
        // 启动10个“读线程”,从BoundedBuffer中不断的读数据。
        for (int i = 0; i < 10; i++) {
            new PutThread("p" + i, i).start();
            new TakeThread("t" + i).start();
        }
    }

    static class PutThread extends Thread {
        private int num;

        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }

        public void run() {
            try {
                Thread.sleep(1);    // 线程休眠1ms
                bb.put(num);        // 向BoundedBuffer中写入数据
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }

        public void run() {
            try {
                Thread.sleep(10);                    // 线程休眠1ms
                Integer num = (Integer) bb.take();    // 从BoundedBuffer中取出数据
            } catch (InterruptedException e) {
            }
        }
    }
相关文章
相关标签/搜索