死磕java concurrent包系列(三)基于ReentrantLock理解AQS的条件队列

基于Codition分析AQS的条件队列

前言

上一篇咱们讲了AQS中的同步队列队列,如今咱们研究一下条件队列。java

在java中最多见的加锁方式就是synchorinzed和Reentrantlock,咱们都说Reentrantlock比synchorinzed更加灵活,其实就灵活在Reentrantlock中的条件队列的用法上。node

Condition接口

它是在java1.5中引入的一个接口,主要是为了替代object类中的wait、notify方法,以一种更灵活的方式解决线程之间的通讯问题:spring

public interface Condition {

 //使当前线程进入等待状态直到被通知(signal)
 void await() throws InterruptedException;

 //当前线程进入等待状态,直到被唤醒,该方法不响应中断要求
 void awaitUninterruptibly();

 //调用该方法,当前线程进入等待状态,直到被唤醒或被中断或超时
 //其中nanosTimeout指的等待超时时间,单位纳秒
 long awaitNanos(long nanosTimeout) throws InterruptedException;

  //同awaitNanos,但能够指明时间单位
  boolean await(long time, TimeUnit unit) throws InterruptedException;

 //调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时
 //间期限(deadline),若是没到指定时间就被唤醒,返回true,其余状况返回false
  boolean awaitUntil(Date deadline) throws InterruptedException;

 //唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须
 //获取与Condition相关联的锁,功能与notify()相同
  void signal();

 //唤醒全部等待在Condition上的线程,该线程从等待方法返回前必须
 //获取与Condition相关联的锁,功能与notifyAll()相同
  void signalAll();
}
复制代码

最重要的是await方法使线程进入等待状态,再经过signal方法唤醒。接下来咱们结合实际例子分析。bash

Condition能够解决什么问题

假设有一个生产者-消费者的场景:数据结构

一、生产者有两个线程产生烤鸡;消费者有两个线程消费烤鸡并发

二、四个线程一块儿执行,但同时只能有一个生产者线程生成烤鸡,一个消费者线程消费烤鸡。ui

三、只有产生了烤鸡,才能通知消费线程去消费,不然只能等着;this

四、只有消费了烤鸡,才能通知生产者线程去生产,不然只能等着spa

因而乎,咱们使用ReentrantLock控制并发,并使用它生成两组Condition对象,productCondition和consumeCondition:前者控制生产者线程,后者控制消费者线程。当isHaveChicken为true时,表明烤鸡生成完毕,生产线程必须进入等待状态同时唤醒消费线程进行消费,消费线程消费完毕后将flag设置为false,表明烤鸡消费完成,进入等待状态,同时唤醒生产线程生产烤鸡。。。。。。线程

package com.springsingleton.demo.Chicken;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ChikenStore {

  ReentrantLock reentrantLock = new ReentrantLock();

  Condition productCondition = reentrantLock.newCondition();

  Condition consumeCondition = reentrantLock.newCondition();

  private int count = 0;

  private volatile boolean isHaveChicken = false;

  //生产
  public void ProductChicken() {
    reentrantLock.lock();
    while (isHaveChicken) {
      try {
        System.out.println("有烤鸡了" + Thread.currentThread().getName() + "不生产了");
        productCondition.await();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count++;
    System.out.println(Thread.currentThread().getName() + "产生了第" + count + "个烤鸡,赶忙开始卖");
    isHaveChicken = true;
    consumeCondition.signal();
    reentrantLock.unlock();
  }

  public void SellChicken() {
    reentrantLock.lock();
    while (!isHaveChicken) {
      try {
        System.out.println("没有烤鸡了" + Thread.currentThread().getName() + "不卖了");
        consumeCondition.await();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count--;
    isHaveChicken = false;
    System.out.println(Thread.currentThread().getName() + "卖掉了第" + count + 1 + "个烤鸡,赶忙开始生产");
    productCondition.signal();
    reentrantLock.unlock();
  }

  public static void main(String[] args) {
    ChikenStore chikenStore = new ChikenStore();
    new Thread(() -> {
      Thread.currentThread().setName("生产者1号");
      while (true) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("生产者2号");
      for (; ; ) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消费者1号");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消费者2号");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();

  }
}

复制代码

输出:

生产者1号产生了第1个烤鸡,赶忙开始卖
有烤鸡了生产者1号不生产了
有烤鸡了生产者2号不生产了
消费者1号卖掉了第01个烤鸡,赶忙开始生产
没有烤鸡了消费者1号不卖了
生产者1号产生了第1个烤鸡,赶忙开始卖
有烤鸡了生产者1号不生产了
消费者1号卖掉了第01个烤鸡,赶忙开始生产
没有烤鸡了消费者1号不卖了
没有烤鸡了消费者2号不卖了
生产者2号产生了第1个烤鸡,赶忙开始卖
有烤鸡了生产者2号不生产了
消费者1号卖掉了第01个烤鸡,赶忙开始生产
没有烤鸡了消费者1号不卖了
生产者1号产生了第1个烤鸡,赶忙开始卖
有烤鸡了生产者1号不生产了
消费者2号卖掉了第01个烤鸡,赶忙开始生产
没有烤鸡了消费者2号不卖了
复制代码

若是用synchorinzed的话:

package com.springsingleton.demo.Chicken;

public class ChickenStoreSync {

  private int count = 0;

  private volatile boolean isHaveChicken = false;

  public synchronized void ProductChicken() {
    while (isHaveChicken) {
      try {
        System.out.println("有烤鸡了" + Thread.currentThread().getName() + "不生产了");
        this.wait();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count++;
    System.out.println(Thread.currentThread().getName() + "产生了第" + count + "个烤鸡,赶忙开始卖");
    isHaveChicken = true;
    notifyAll();
  }

  public synchronized void SellChicken() {
    while (!isHaveChicken) {
      try {
        System.out.println("没有烤鸡了" + Thread.currentThread().getName() + "不卖了");
        this.wait();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count--;
    isHaveChicken = false;
    System.out.println(Thread.currentThread().getName() + "卖掉了第" + count + 1 + "个烤鸡,赶忙开始生产");
    notifyAll();
  }

  public static void main(String[] args) {
    ChickenStoreSync chikenStore = new ChickenStoreSync();
    new Thread(() -> {
      Thread.currentThread().setName("生产者1号");
      while (true) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("生产者2号");
      for (; ; ) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消费者1号");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消费者2号");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();

  }
}
复制代码

如上代码,在调用notify()或者 notifyAll()方法时,因为synchronized等待队列中同时存在生产者线程和消费者线程,因此咱们并不能保证被唤醒的究竟是消费者线程仍是生产者线程,而Codition则能够避免这种状况。

AQS中Condition的实现原理

Condition的具体实现类是AQS的内部类ConditionObject,以前咱们分析过AQS中存在两种队列,一种是同步队列,一种是条件队列,而条件队列是基于Condition实现的。注意在使用Condition前必须得到锁(由于condition通常是由lock构造出来的,它依赖于lock),同时在Condition的条件队列上的也有一个Node节点,其结点的waitStatus的值为CONDITION。在实现类ConditionObject中有两个结点分别是firstWaiter和lastWaiter,firstWaiter表明等待队列第一个等待结点,lastWaiter表明等待队列最后一个等待结点

public class ConditionObject implements Condition, java.io.Serializable {
    //等待队列第一个等待结点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
    //省略.......
}
复制代码

每一个Condition都对应着一个条件队列;一个锁上能够建立多个Condition对象,那么也就存在多个条件队列。条件队列一样是一个FIFO的队列,在队列中每个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。

当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到条件队列中进行等待,直到被唤醒、中断、超时才从队列中移出。Condition中的等待队列模型以下


正如图所示,Node节点的数据结构,和同步队列的node相比,Condtion中等待队列的是一个单向的,并且使用的变量是nextWaiter而不是next,这点咱们在前面分析结点Node的数据结构时讲过。firstWaiter指向条件队列的头结点,lastWaiter指向条件队列的尾结点,条件队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束须要从等待队列中移除,后者表示条件结点等待被唤醒。

每一个Codition对象对于一个条件队列,也就是说AQS中只能存在一个同步队列,但可拥有多个条件队列(以前烤鸡的例子就有两个new出来的condition的队列)。下面从代码层面看看被调用await()方法(其余await()实现原理相似)的线程是如何加入等待队列的,而又是如何从等待队列中被唤醒的。

public final void await() throws InterruptedException {
      //判断线程是否被中断
      if (Thread.interrupted())
          throw new InterruptedException();
      //建立新结点加入等待队列并返回
      Node node = addConditionWaiter();
      //释放当前线程锁即释放同步状态
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      //判断结点是否同步队列(SyncQueue)中,便是否被唤醒
      while (!isOnSyncQueue(node)) {
          //挂起线程
          LockSupport.park(this);
          //判断是否被中断唤醒,若是是退出循环。
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }
      //被唤醒后 自旋操做争取得到锁,同时判断线程是否被中断
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
          interruptMode = REINTERRUPT;
       // clean up if cancelled
      if (node.nextWaiter != null) 
          //清理等待队列中不为CONDITION状态的结点
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          reportInterruptAfterWait(interruptMode);
  }
复制代码

再看看addConditionWaiter方法,添加到等待队列:

private Node addConditionWaiter() {
    Node t = lastWaiter;
      // 判断是否为结束状态的结点并移除
      if (t != null && t.waitStatus != Node.CONDITION) {
          unlinkCancelledWaiters();
          t = lastWaiter;
      }
      //建立新结点状态为CONDITION
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
      //加入等待队列
      if (t == null)
          firstWaiter = node;
      else
          t.nextWaiter = node;
      lastWaiter = node;
      return node;
}
复制代码

await()方法主要作了3件事:

一是调用addConditionWaiter()方法将当前线程封装成node结点加入等待队列。

二是调用fullyRelease(node)方法释放同步状态并唤醒后继结点的线程。

三是调用isOnSyncQueue(node)方法判断结点是否在同步队列中,这里是个while循环,若是同步队列中没有该结点就直接挂起该线程,须要明白的是若是线程被唤醒后就调用acquireQueued(node, savedState)执行自旋操做争取锁,即当前线程结点从等待队列转移到同步队列并开始努力获取锁。

接下来看看Singnal

public final void signal() {
     //判断是否持有独占锁,若是不是抛出异常
   if (!isHeldExclusively())
          throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      //唤醒等待队列第一个结点的线程
      if (first != null)
          doSignal(first);
 }

复制代码

这里signal()方法作了两件事:

一是判断当前线程是否持有独占锁,没有就抛异常。

二是唤醒等待队列的第一个结点,即执行doSignal(first)

private void doSignal(Node first) {
     do {
             //移除条件等待队列中的第一个结点,
             //若是后继结点为null,那么说明没有其余结点了,因此将尾结点也设置为null
            if ( (firstWaiter = first.nextWaiter) == null)
                 lastWaiter = null;
             first.nextWaiter = null;
          //若是被通知节点没有进入到同步队列而且条件等待队列还有不为空的节点,则继续循环通知后续结点
         } while (!transferForSignal(first) &&
                  (first = firstWaiter) != null);
        }

//transferForSignal方法
final boolean transferForSignal(Node node) {
    //尝试设置唤醒结点的waitStatus为0,即初始化状态
    //若是compareAndSetWaitStatus返回false,说明当期结点node的waitStatus已不为
    //CONDITION状态,那么只能是结束状态了,因此返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
         return false;
    }
    //加入同步队列并返回前驱结点p
    Node p = enq(node);
    int ws = p.waitStatus;
    //判断前驱结点是否为结束结点(CANCELLED=1)或者在设置
    //前驱节点状态为Node.SIGNAL状态失败时,唤醒被通知节点表明的线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
        //唤醒node结点的线程
        LockSupport.unpark(node.thread);
        return true;
    }
}

复制代码

doSignal(first)方法中作了2件事:

一是从条件队列移除被唤醒的节点,而后从新维护条件条件队列的firstWaiter和lastWaiter的指向。

二是将从条件队列移除的结点加入同步队列(在transferForSignal()方法中完成的),若是进入到同步队列失败而且条件队列还有不为空的节点,则继续循环唤醒后续其余结点的线程。

总结:

signal()被调用后,先判断当前线程是否获取锁,若是有,那么唤醒当前Condition对象中条件队列的第一个结点的线程,并从条件队列中移除该结点,移动到同步队列中,若是加入同步队列失败(此时只有可能线程被取消),那么继续循环唤醒条件队列中的其余结点的线程,若是成功加入同步队列,那么若是其前驱结点是否已结束或者设置前驱节点状态为Node.SIGNAL状态失败,则经过LockSupport.unpark()唤醒被通知节点表明的线程,到此signal()任务完成,注意被唤醒后的线程,将从前面的await()方法中的while循环中退出,由于此时该线程的结点已在同步队列中,那么while (!isOnSyncQueue(node))将不在符合循环条件,进而调用AQS的acquireQueued()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理,流程以下图(注意不管是同步队列仍是条件队列使用的Node数据结构都是同一个,不过是使用的内部变量不一样罢了)

相关文章
相关标签/搜索