Java中的线程协做之Condition

1、Condition接口

一、Condition接口的经常使用方法介绍

 1 /**
 2  * 已经获取到锁的线程调用该方法会进入等待状态,知道其余持有锁的线程通知(signal)等待队列中的线程或者被中断退出等待队列;  3  * 若是该线程已经从该方法中返回,表名线程已经获取到了Condition对象对应的锁  4  */
 5 public final void await() throws InterruptedException {...}  6 /**
 7  * 仍是进入等待状态的方法,只是该方法对中断不敏感:当前调用该方法的线程只有被通知(signal)才能从等待队列中退出  8  */
 9 public final void awaitUninterruptibly() {...} 10 /**
11  * 当前线程进入等待状态,被通知、中断或者超时以后被唤醒。返回值就是表示剩余的时间,即 12  * 若是在nanosTimeout纳秒以前被唤醒,返回值就是实际耗时;若是返回值是0或者负数,就认为是超时了 13  */
14 public final long awaitNanos(long nanosTimeout) {...} 15 /**
16  * 调用该方法的线程会进入等待状态直到被通知、中断或者到达某个超时时间。 17  * 意味着没有到达指定的某个时间被通知,就会返回true;若是到达指定时间,返回false 18  */
19 public final boolean awaitUntil(Date deadline){} 20 /**
21  * 当前持有Condition对象对应锁的线程,调用该方法以后会唤醒一个等待在Condition上的线程 22  */
23 public final void signal() {} 24 /**
25  * 当前持有Condition对象对应锁的线程,调用该方法以后会唤醒等待在Condition上的全部线程 26  */
27 public final void signalAll() {}

  Condition的使用模板:Condition的获取必须经过Lock的newCondition方法,表示Condition对象与该锁关联,通常讲Condition对象做为成员变量,调用上面的await方法以后当前线程才会释放锁并在等待队列中进行等待;当其余的线程(在没有中断的状况下)调用该condition对象的signal方法的时候就会通知等待队列中的等待线程从await方法返回(返回以前已经获取锁)。java

 1 Lock lock = new ReentrantLock();  2 Condition  con = lock.newCondition();  3 public void conWait() {  4  lock.lock();  5     try {  6  con.await();  7     } catch(InterruptedException e) {  8  ...  9     }finally { 10  lock.unlock(); 11  } 12 } 13 
14 public void conSignal() { 15  lock.lock(); 16     try { 17  con.signal(); 18     } catch(InterruptedException e) { 19  ... 20     }finally { 21  lock.unlock(); 22  } 23 }

二、Condition的实现分析

a)源码流程分析

  咱们经过跟踪源码能够看出来,首先建立锁对象(new ReentrantLock()),而后根据锁对象关联响应的Condition对象,而后经过Condition对象中维护的等待队列实现等待(await)通知(signal)机制。node

 1 public Condition newCondition() { //ReentrantLock类中的方法
 2     return sync.newCondition();  3 }  4 //ConditionObject类实现Condition接口,除此室外ConditionObject也是AQS的一个内部类,Condition的操做须要与锁关联起来
 5 final ConditionObject newCondition() {  6     return new ConditionObject();  7 }  8 //AQS的内部类ConditionObject,其中维护了一个等待队列,经过该队列实现等待通知机制
 9 public class ConditionObject{ 10     /**
11  * 返回等待队列中的线程集合 12  * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 13  * returns {@code false} 14      */
15     protected final Collection<Thread> getWaitingThreads() { 16         if (!isHeldExclusively()) 17             throw new IllegalMonitorStateException(); 18         ArrayList<Thread> list = new ArrayList<Thread>(); 19         for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 20             if (w.waitStatus == Node.CONDITION) { 21                 Thread t = w.thread; 22                 if (t != null) 23  list.add(t); 24  } 25  } 26         return list; 27  } 28 }

b)具体实现

  上面说到了Condition是经过等待队列来实现等待通知功能的,那么就分析等待队列和等待通知机制的实现ide

①等待队列实现

  等待队列是一个FIFO的队列,其中每一个结点都包含一个处于Condition对象上等待的线程引用(当一个获取到锁的线程调用await方法,就会释放锁资源,被包装成一个Node而后添加到等待队列中进入等待状态;这里面的Node结点仍是和AQS中的实现机理同样,Node是AQS中的静态内部类)。ui

  ConditionObject类中有下面两个属性,分别表明一个Condition对应的等待队列的首节点和尾结点。当前线程调用await方法以后就会被构形成一个Node结点而后加入到等待队列的尾部。this

1 /** Condition等待队列头结点 */
2 private transient Node firstWaiter; 3 /** Condition等待队列尾结点 */
4 private transient Node lastWaiter;

   下面是等待队列的基本结构,Condition对象中有首尾结点的引用。新增长的结点须要将原有的尾结点的下一节点指向它,而后更新lastWaiter便可。spa

  上面的状况是一个Condition对象对应一个等待队列和一个同步队列(上面新添加的Node3就是从同步队列中移除而后添加过来的),在同步器组件实现中,会拥有一个同步队列和多个等待队列。线程

②等待操做的实现

  持有锁的线程调Condition的await方法以后会释放锁,而后进入等待状态。既然是持有锁的线程,那么该线程应该位于同步队列的首节点位置,其调用await方法以后就会从同步队列首节点移到等待队列的尾结点等待。具体将其移到等待队列是addConditionWaiter方法实现。下面是await方法和addConditionWaiter方法的实现分析。code

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //将当前线程加入等待队列
    int savedState = fullyRelease(node); //释放当前线程持有的锁
    int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled
 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; /** * waitStatus值表示线程正在等待条件(本来结点在等待队列中,结点线程等待在Condition上,当其余线程对 * Condition调用了signal()方法以后)该结点会从等待队列中转移到同步队列中,进行同步状态的获取 * static final int CONDITION = -2; */
    if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); //构形成Condition的等待队列中的对应的结点 //增长的结点须要将原有的尾结点的下一节点指向它,而后更新lastWaiter
    if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

③通知操做的实现

   通知操做的实现机制就是将当前等待队列中的首节点中的线程唤醒,将其加入同步队列中。对象

1 public final void signal() { 2     if (!isHeldExclusively()) //检查当前线程是否获取锁
3         throw new IllegalMonitorStateException(); 4     Node first = firstWaiter; 5     if (first != null) 6  doSignal(first); 7 }

   唤醒线程使其进入同步队列以后,咱们再来看await方法中那些没有执行的代码。blog

 1 public final void await() throws InterruptedException {  2     if (Thread.interrupted())  3         throw new InterruptedException();  4     Node node = addConditionWaiter(); //将当前线程加入等待队列
 5     int savedState = fullyRelease(node); //释放当前线程持有的锁
 6     int interruptMode = 0;  7     //根据下面的源码能够看出,当前线程若是掉用await方法以后会进入等待队列,那么在退出等待队列以前会一直执行这个循环
 8     while (!isOnSyncQueue(node)) {  9         LockSupport.park(this); //唤醒节点中的线程
10         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 11             break; 12  } 13     //经过acquireQueued源码能够发现,获取锁的流程和ReentrantLock这种独占式获取同步状态的流程基本一致
14     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 15         interruptMode = REINTERRUPT; 16     if (node.nextWaiter != null) // clean up if cancelled
17  unlinkCancelledWaiters(); 18     if (interruptMode != 0) 19  reportInterruptAfterWait(interruptMode); 20 } 21 final boolean isOnSyncQueue(Node node) { 22     if (node.waitStatus == Node.CONDITION || node.prev == null) //判断当前队列是否在等待队列中
23         return false; 24     if (node.next != null) // If has successor, it must be on queue
25         return true; 26     return findNodeFromTail(node); 27 } 28 //竞争锁资源的同步队列
29 final boolean acquireQueued(final Node node, int arg) { 30     boolean failed = true; 31     try { 32         boolean interrupted = false; 33         for (;;) { 34             final Node p = node.predecessor(); //获得当前结点的前驱结点
35             if (p == head && tryAcquire(arg)) { //当前结点的前驱结点为头结点,而且尝试获取锁成功
36                 setHead(node); //将当前获取到锁的结点设置为头结点
37                 p.next = null; // help GC
38                 failed = false; 39                 return interrupted; 40  } 41              //若是获取同步状态失败,应该自旋等待继续获取而且校验本身的中断标志位信息
42             if (shouldParkAfterFailedAcquire(p, node) &&
43  parkAndCheckInterrupt()) 44                 interrupted = true; 45  } 46     } finally { 47         if (failed) 48  cancelAcquire(node); 49  } 50 }

  从上面的代码中咱们能够看出,当调用await方法的线程在没有回到同步队列以前,都会一直在while (!isOnSyncQueue(node)){...}循环中,只有被唤醒退出等待队列进入同步队列才会从循环中退出;以后调用acquireQueued()开始自旋等待锁的获取,这个自旋的过程和前面介绍的AQS中独占式锁的获取流程同样;最后,若是线程从这个自旋的过程退出了,就表明当前线程再次获取了锁资源,最后也从await方法返回。因此,一个线程调用await方法以后,只有最终获取到锁才会从该方法返回。而对于signalAll而言就是对等待队列中的每一个线程通知(signal)一次,这样就能够将等待队列中的全部线程移到同步队列中进行锁资源的获取。

2、Condition接口使用

一、Condition接口搭配ReentrantLock实现生产者消费者模式

 1 package cn.source.condition;  2 
 3 import java.util.LinkedList;  4 import java.util.concurrent.TimeUnit;  5 import java.util.concurrent.locks.Condition;  6 import java.util.concurrent.locks.Lock;  7 import java.util.concurrent.locks.ReentrantLock;  8 
 9 public class ConditionProducerAndConsumer<E> { 10     
11     private LinkedList<E> list = new LinkedList<E>(); 12     private static final int MAX_NUM = 10; //容器的最大数量
13     private int count = 0; //容器中实际数量
14     
15     private Lock lock = new ReentrantLock(); 16     private Condition producer = lock.newCondition(); 17     private Condition consumer = lock.newCondition(); 18     
19     private int getCount() { 20         return count; 21  } 22     
23     private void put(E e) { 24         lock.lock(); //首先须要获取锁
25         try { 26             //这里是判断容器是否已满,注意须要使用while:若是使用if的话可能致使全部的消费线程都处于等待状态
27             while(list.size() == MAX_NUM) { 28                 System.out.println(Thread.currentThread().getName() + "正在等待中"); 29                 producer.await(); //生产者线程进入等待状态
30  } 31             //添加元素
32  list.add(e); 33             count ++; 34             consumer.signalAll();//将消费者线程唤醒
35         } catch (InterruptedException e1) { 36  e1.printStackTrace(); 37         } finally { 38  lock.unlock(); 39  } 40  } 41     
42     private E get() { 43         E e = null; 44  lock.lock(); 45         try { 46             while(list.size() == 0) { 47                 System.out.println(Thread.currentThread().getName() + "正在等待"); 48                 consumer.await(); //消费者线程进入等待状态
49  } 50             e = list.removeFirst(); 51             count --; 52             producer.signalAll(); //消费元素以后,将生产者线程唤醒
53         } catch (InterruptedException e1) { 54  e1.printStackTrace(); 55         } finally { 56  lock.unlock(); 57  } 58         return e; 59  } 60     
61     public static void main(String[] args) { 62         SyncProducerAndConsumer<String> syncProducerAndConsumer = new SyncProducerAndConsumer<>(); 63         for (int i = 0; i < 10; i++) { //开启10个线程
64             new Thread(new Runnable() { 65  @Override 66                 public void run() { 67                     for (int j = 0; j < 5; j++) { //每一个线程从容器中获取5次数据
68  System.out.println(syncProducerAndConsumer.get()); 69  } 70  } 71                 
72             }, "消费者线程" + i).start();; 73  } 74         //休眠2秒,全部的消费者线程都已经启动而且处于等待状态
75         try { 76             TimeUnit.SECONDS.sleep(2); 77         } catch (InterruptedException e) { 78  e.printStackTrace(); 79  } 80         
81         for (int i = 0; i < 2; i++) { //开启两个生产者线程
82             new Thread(new Runnable() { 83  @Override 84                 public void run() { 85                     for (int j = 0; j < 25; j++) { //每一个生产者线程想容器中添加25个数据,当容器中数据到达10个的时候生产者线程会阻塞
86                         syncProducerAndConsumer.put("add value " + j); 87  } 88  } 89             }, "生产者线程"+i).start(); 90  } 91  } 92 
93 }

二、synchronized组合wait/notify实现生产者消费者模式

 1 package cn.source.condition;  2 
 3 import java.util.LinkedList;  4 import java.util.concurrent.TimeUnit;  5 
 6 public class SyncProducerAndConsumer<E> {  7 
 8     private LinkedList<E> list = new LinkedList<E>();  9     private static final int MAX_NUM = 10; //容器的最大数量
10     private int count = 0; //容器中实际数量
11     
12     public synchronized int getCount() { 13         return count; 14  } 15     
16     public synchronized void put(E e) { 17         while(list.size() == MAX_NUM) { //这里是判断容器是否已满,注意须要使用while:若是使用if的话可能致使全部的消费线程都处于等待状态
18             try { 19                 this.wait(); //容器满了以后,生产者线程进入等待状态
20             } catch (InterruptedException e1) { 21  e1.printStackTrace(); 22  } 23  } 24         //容器未满,生产者线程就想容器中添加数据
25  list.add(e); 26         count ++; 27         this.notifyAll(); //此时容器中已经存在数据,唤醒等待的消费者线程
28  } 29     
30     public synchronized E get() { 31         E e = null; 32         while(list.size() == 0) { //判断容器是否为空,若是为空就进入等待状态,这里也使用while
33             try { 34                 this.wait(); 35             } catch (InterruptedException e1) { 36  e1.printStackTrace(); 37  } 38  } 39         e = list.removeFirst(); 40         count --; 41         this.notifyAll(); 42         return e; 43  } 44     
45     public static void main(String[] args) { 46         SyncProducerAndConsumer<String> syncProducerAndConsumer = new SyncProducerAndConsumer<>(); 47         for (int i = 0; i < 10; i++) { //开启10个线程
48             new Thread(new Runnable() { 49  @Override 50                 public void run() { 51                     for (int j = 0; j < 5; j++) { //每一个线程从容器中获取5次数据
52  System.out.println(syncProducerAndConsumer.get()); 53  } 54  } 55                 
56             }, "消费者线程" + i).start();; 57  } 58         //休眠2秒,全部的消费者线程都已经启动而且处于等待状态
59         try { 60             TimeUnit.SECONDS.sleep(2); 61         } catch (InterruptedException e) { 62  e.printStackTrace(); 63  } 64         
65         for (int i = 0; i < 2; i++) { //开启两个生产者线程
66             new Thread(new Runnable() { 67  @Override 68                 public void run() { 69                     for (int j = 0; j < 25; j++) { //每一个生产者线程想容器中添加25个数据,当容器中数据到达10个的时候生产者线程会阻塞
70                         syncProducerAndConsumer.put("add value " + j); 71  } 72  } 73             }, "生产者线程"+i).start(); 74  } 75  } 76     
77     
78 }

三、Object中的等待唤醒机制和Condition的等待通知机制对比

相关文章
相关标签/搜索