Java多线程编程(5)--线程间通讯

一.等待与通知

  某些状况下,程序要执行的操做须要知足必定的条件(下文统一将其称之为保护条件)才能执行。在单线程编程中,咱们可使用轮询的方式来实现,即频繁地判断是否知足保护条件,若不知足则继续判断,若知足则开始执行。但在多线程编程中,这种方式无疑是很是低效的。若是一个线程持续进行无心义的判断而不释放CPU,这就会形成资源的浪费;而若是定时去判断,不知足保护条件就释放CPU,又会形成频繁的上下文切换。总之,不推荐在多线程编程中使用轮询的方式。
  等待与通知是这样一种机制:当保护条件不知足时,能够将当前线程暂停;而当保护条件成立时,再将这个线程唤醒。一个线程因其保护条件未知足而被暂停的过程就被称为等待,一个线程使得其余线程的保护条件得以知足的时候唤醒那些被暂停的线程的过程就被称为通知。java

1.wait

  在Java平台中,Object.wait方法能够用来实现等待,下面是wait方法的三个重载方法:编程

  • void wait(long timeoutMillis)
    调用该方法会使线程进入TIMED_WAITING状态,当等待时间结束或其余线程调用了该对象的notify或notifyAll方法时会将该线程唤醒。
  • void wait(long timeoutMillis, int nanos)
    这个方法看上去能够精确到纳秒级别,但实际上并非。若是nanos的值在0~999999之间,就给timeoutMillis加1,而后调用wait(timeoutMillis)。
  • void wait()
    该方法至关于wait(0),即永不超时。调用后当前线程会进入WAITING状态,直到其余线程调用了该对象的notify或notifyAll方法。

  先经过一张图来介绍wait的实现机制:

  在上一篇文章中咱们了解到JVM会为每一个对象维护一个入口集(Entry Set)用于存储申请该对象内部锁的线程。此外,JVM还会为每一个对象维护一个被称为等待集(Wait Set)的队列,该队列用于存储该对象上的等待线程。当在线程中调用某个对象(这里咱们称之为对象A)的wait方法后,当前线程会释放内部锁并进入WAITING或TIMED_WAITING状态,而后进入等待集中。当其余线程调用对象A的notify方法后,等待集中的某个线程会被唤醒并被移出等待集。这个线程可能会立刻得到内部锁,也有可能因竞争内部锁失败而进入入口集,直到得到内部锁。当从新获取到内部锁后,wait方法才会返回,当前线程继续执行后面的代码。
  因为wait方法会释放内部锁,所以在wait方法中会判断当前线程是否持有被调用wait方法的对象的内部锁。若是当前线程没有持有该对象的内部锁,JVM会抛出一个IllegalMonitorStateException异常。所以,wait方法在调用时当前线程必须持有该对象的内部锁,即wait方法的调用必需要放在由该对象引导的synchronized同步块中。综上所述,使用wait方法实现等待的代码模板以下伪代码所示:安全

synchronized(someObject) {
    while(!someCondition) {
        someObject.wait();
    }
    doSomething();
}

  这里使用while而不是if的缘由是,通知线程可能只是更新了保护条件中的共享变量,但并不必定会使保护条件成立;即便通知线程能够保证保护条件成立,可是在线程从等待集进入入口集再到获取到内部锁的这段时间内,其余线程仍然可能更新共享变量而致使保护条件不成立。线程虽然由于保护条件不成立而进入wait方法,但wait方法的返回并不能说明保护条件已经成立。所以,在wait方法返回后须要再次进行判断,若保护条件成立则执行接下来的操做,不然应该继续进入wait方法。正是基于这种考虑,咱们应该将wait方法的调用放在while循环而不是if判断中。多线程

2.notify/notifyAll

  下图是notify的实现机制:

  和wait方法同样,notify方法在执行时也必须持有对象的内部锁,不然会抛出IllegalMonitorStateException异常,所以notify方法也必须放在由该对象引导的synchronized同步块中。notify方法会将等待集中的任意一个线程移出队列。和wait方法不一样的是,notify方法自己不会释放内部锁,而是在临界区代码执行完成后自动释放。所以,为了使等待线程在其被唤醒以后可以尽快得到内部锁,应该尽量地将notify调用放在靠近临界区结束的地方。
  调用notify方法所唤醒的线程是相应对象上的一个任意等待线程,可是这个被唤醒的线程可能不是咱们真正想要唤醒的那个线程。所以,有时候咱们须要借助notifyAll,它和notify方法的惟一不一样之处在于它能够唤醒相应对象上的全部等待线程。dom

3.过早唤醒问题

  假设通知线程N和等待线程W1和W2同步在对象obj上,W1和W2的保护条件C1和C2均依赖于obj的实例变量state,但C1和C2判断的内容并不相同。初始状态下C1和C2均不成立。某一时刻,当线程N更新了共享变量state使得保护条件C1得以成立,此时为了唤醒W1而执行了obj.notifyAll()方法(调用obj.notify()并不必定会唤醒W1)。因为notifyAll唤醒的是obj上的全部等待线程,所以W2也会被唤醒,即便W2的保护条件并未成立。这就使得W2在被唤醒以后仍然须要继续等待。这种等待线程在保护条件并未成立的状况下被唤醒的现象被称为过早唤醒。过早唤醒使得那些无需被唤醒的等待线程也被唤醒了,形成了资源的浪费。过早唤醒问题能够利用下一节中介绍的Condition接口来解决。ide

二.条件变量Condition

  总的来讲,Object.wait()/notify()过于底层,且Object.wait(long timeout)还存在过早唤醒和没法区分其返回是因为等待超时仍是被通知线程唤醒的问题。不过,了解wait/notify有助于咱们阅读部分源码,以及学习和使用Condition接口。
  Condition接口能够做为wait/notify的替代品来实现等待/通知,它为解决过早唤醒问题提供了支持,并解决了Object.wait(long timeout)没法区分其返回是因为等待超时仍是被通知线程唤醒的问题。Condition接口中定义了如下方法:

  在上一篇文章中,咱们在介绍Lock接口时曾经提到过它的newCondition方法,它返回的就是一个Condition实例。相似于Object.wait()/notify()要求其执行线程必须持有这些方法所属对象的内部锁,Condition.await()/signal()也要求其执行线程持有建立该Condition实例的显式锁。每一个Condition实例内部都维护了一个用于存储等待线程的队列。设condition1和condition2是从一个显式锁上获取的两个不一样的Condition实例,一个线程执行condition1.await()会致使其被暂停并进入condition1的等待队列。condition1.signal()会使condition1的等待队列中的一个任意线程被唤醒,而condition1.signaAll()则会使condition1的等待队列中的全部线程被唤醒,而condition2的等待队列中的线程则不受影响。
  和wait/notify相似,await/signal的使用方法以下:工具

public class ConditionUsage {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public void waitMethod() throws InterruptedException {
        lock.lock();
        try {
            while (保护条件不成立) {
                condition.await();
            }
            // 业务逻辑
        } finally {
            lock.unlock();
        }
    }
    
    public void notifyMethod() {
        lock.unlock();
        try {
            // 更新共享变量
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

  最后,以一个例子来结束本小节。这里咱们以经典的生产者-消费者模型来举例。假设有一个生产整数的生产者,一个消费奇数的消费者和一个消费偶数的消费者。当生产奇数时,生产者会通知奇数消费者,偶数同理。下面是完整代码:
学习


展开查看

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {
    private final Lock lock = new ReentrantLock();
    private final Condition oddCondition = lock.newCondition();
    private final Condition evenCondition = lock.newCondition();
    private final Random random = new Random();
    private volatile Integer message;
    private AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        ConditionDemo demo = new ConditionDemo();
        Thread producer = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                demo.produce();
            }
        });
        producer.start();
        Thread oddConsumer = new Thread(() -> {
            while (true) {
                try {
                    demo.consumeOdd();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread evenConsumer = new Thread(() -> {
            while (true) {
                try {
                    demo.consumeEven();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        oddConsumer.start();
        evenConsumer.start();
    }

    public void produce() {
        lock.lock();
        if (message == null) {
            message = random.nextInt(100) + 1;
            count.incrementAndGet();
            if (message % 2 == 0) {
                evenCondition.signal();
                System.out.println("Produce even : " + message);
            } else {
                oddCondition.signal();
                System.out.println("Produce odd : " + message);
            }
        }
        lock.unlock();
    }

    public void consumeOdd() throws InterruptedException {
        lock.lock();
        while (message == null) {
            oddCondition.await();
        }
        System.out.println("Consume odd : " + message);
        message = null;
        lock.unlock();
    }

    public void consumeEven() throws InterruptedException {
        lock.lock();
        while (message == null) {
            evenCondition.await();
        }
        System.out.println("Consume even : " + message);
        message = null;
        lock.unlock();
    }
}

  该程序的输出以下:this

Produce even : 34
Consume even : 34
Produce odd : 43
Consume odd : 43
Produce even : 28
Consume even : 28
Produce odd : 27
Consume odd : 27
Produce even : 92
Consume even : 92
...

三.倒数计数器CountDownLatch

  有时候,咱们但愿一个线程在另外一个或多个线程结束以后再继续执行,这时候咱们最早想到的确定是Thread.join()。有时咱们又但愿一个线程不必定须要其余线程结束,而只是等其余线程执行完特定的操做就继续执行。这种状况下没法使用Thread.join(),由于它会致使当前线程等待其余线程彻底结束。固然,此时能够用共享变量来实现。不过,Java为咱们提供了更加方便的工具类来解决上面说的这些状况,那就是CountDownLatch。
  能够将CountDownLatch理解为一个能够在多个线程之间使用的计数器。这个类提供了如下方法:

  CountDownLatch内部也维护了一个用于存放等待线程的队列。当计数器不为0时,调用await方法的线程会被暂停并进入该队列。当某个线程调用countDown方法的时候,计数器会减1。当计数器到0的时候,等待队列中的全部线程都会被唤醒。计数器的初始值是在CountDownLatch的构造方法中指定的:atom

public CountDownLatch(int count)

  当计数器的值达到0以后就不会再变化。此时,调用countDown方法并不会致使异常的抛出,而且后续执行await方法的线程也不会被暂停。所以,CountDownLatch的使用是一次性的。此外,因为CountDownLatch是线程安全的,所以在调用await、countDown方法时无需加锁。
  下面的例子中,主线程等待两个子线程结束以后再继续执行。这里使用了CountDownLatch来实现:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(2);
        Runnable task = () -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " finished.");
            latch.countDown();
        };
        new Thread(task, "Thread 1").start();
        new Thread(task, "Thread 2").start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            return;
        }
        System.out.println("Main thread continued.");
    }
}

  该程序输出以下:

Thread 2 finished.
Thread 1 finished.
Main thread continued.

  能够看到,当线程1和线程2执行完成后,主线程才开始继续执行。
  若是CountDownLatch内部计数器因为程序的错误而永远没法达到0,那么相应实例上的等待线程会一直处于WAITING状态。避免该问题的出现有两种方法:一是确保全部对countDown方法的调用都位于代码中正确的位置,例如放在finally块中。二是使用带有时间限制的await方法。若是在规定时间内计时器值未达到0,该CountDownLatch实例上的等待线程也会被唤醒。该方法的返回值能够用于区分其返回是不是因为等待超时。

四.循环屏障CyclicBarrier

  有时候多个线程可能须要互相等待对方执行到代码中的某个地方才能继续执行。这就相似于咱们在开会的时候必须等待全部与会人员都到场以后才能开始。Java中为咱们提供了一个工具类CyclicBarrier,该类能够用来实现这种等待。
  使用CyclicBarrier实现等待的线程被称为参与方(Party)。参与方只须要CyclicBarrier.await()就能够实现等待。和CountDownLatch相似,CyclicBarrier也有一个计数器。当最后一个线程调用CyclicBarrier.await()时,以前的等待线程都会被唤醒,而最后一个线程自己并不会被暂停。和CountDownLatch不一样的是,CyclicBarrier是能够重复使用的,这也是为何它的类名中含有Cyclic。当全部参与方被唤醒的时候,任何线程再次执行await方法又会致使该线程被暂停。
  CyclicBarrier提供了两个构造器:

public CyclicBarrier​(int parties)
public CyclicBarrier​(int parties, Runnable barrierAction)

  能够看到,在构造CyclicBarrier​时,必须提供参与方的数量。第二个构造器还容许咱们指定一个被称为barrierAction的任务(Runnable接口实例),该任务会被最后一个执行await方法的线程执行。所以,若是有须要在唤醒全部线程前执行的操做,可使用这个构造器。
  CyclicBarrier提供了如下6个方法:

1.public int await() throws InterruptedException,BrokenBarrierException

  若是当前线程不是最后一个参与方,那么该线程在调用await()后将持续等待直到如下状况发生:

  • 最后一个线程到达;
  • 当前线程被中断;
  • 其余正在等待的线程被中断;
  • 其余线程等待超时;
  • 其余线程调用了当前屏障的reset()。

  若是当前线程在进入await()方法使已经被标记中断状态或在等待时被中断,那么await()将会抛出InterruptedException并清除当前线程的中断状态。
  若是屏障在参与方等待时被重置或被破坏,或者在调用await()时屏障已经被破坏,那么await()将会抛出BrokenBarrierException。
  若是某个线程在等待时被中断,那么其余等待线程将会抛出BrokenBarrierException而且屏障也会被标记为broken状态。
  该方法的返回值表示当前线程的到达索引,getParties()-1表示第一个到达,0表示最后一个到达。

2.public int await​(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException

  该方法与至关于有时间限制的await(),等待时间结束以后该线程将会抛出TimeOutException,屏障会被标记为broken状态,其余正在等待的线程则会抛出BrokenBarrierException。

3.public int getNumberWaiting()

  返回当前正在等待的参与方的数量。

4.public int getParties()

  返回总的参与方的数量。

5.public boolean isBroken()

  若是该屏障已经被破坏则返回true,不然返回false。当等待线程超时或被中断,或者在执行barrierAction时出现异常,屏障将会被破坏。

6.public void reset()

  将屏障恢复到初始状态,若是有正在等待的线程,这些线程会抛出BrokenBarrierException异常。

  下面咱们经过一个例子来学习如何使用CyclicBarrier。假设如今正在举行短跑比赛,共有8名参赛选手,而场地上只有4条赛道,所以须要分为两场比赛。每场比赛必须等4名选手全都就绪才能够开始,而上一场比赛结束以后即所有选手离开赛道以后才能进行下一场比赛。该示例代码以下所示:


展开查看

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class CyclicBarrierDemo {
    private CyclicBarrier startBarrier = new CyclicBarrier(4, () -> System.out.println("比赛开始!"));
    private CyclicBarrier shiftBarrier = new CyclicBarrier(4, () -> System.out.println("比赛结束!"));
    private Runner[] runners = new Runner[8];
    private AtomicInteger next = new AtomicInteger(0);

    CyclicBarrierDemo() {
        for (int i = 0; i < 8; i++) {
            runners[i] = new Runner(i / 4 + 1, i % 4 + 1);
        }
    }

    public static void main(String[] args) {
        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        for (int i = 0; i < 4; i++) {
            demo.new Track().start();
        }
    }

    private class Track extends Thread {
        private Random random = new Random();

        @Override
        public void run() {
            for (int i = 0; i < 2; i++) {
                try {
                    Runner runner = runners[next.getAndIncrement()];
                    System.out.println(runner.getGroup() + "组" + runner.getNumber() + "号准备就绪!");
                    startBarrier.await();
                    System.out.println(runner.getGroup() + "组" + runner.getNumber() + "号出发!");
                    Thread.sleep((random.nextInt(5) + 1) * 1000);
                    System.out.println(runner.getGroup() + "组" + runner.getNumber() + "号到达终点!");
                    shiftBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static class Runner {
        private int group;
        private int number;

        Runner(int group, int number) {
            this.group = group;
            this.number = number;
        }

        int getGroup() {
            return group;
        }

        int getNumber() {
            return number;
        }
    }
}

  该程序输出以下:


展开查看

1组4号准备就绪!
1组2号准备就绪!
1组3号准备就绪!
1组1号准备就绪!
比赛开始!
1组4号出发!
1组2号出发!
1组1号出发!
1组3号出发!
1组3号到达终点!
1组2号到达终点!
1组4号到达终点!
1组1号到达终点!
比赛结束!
2组1号准备就绪!
2组2号准备就绪!
2组3号准备就绪!
2组4号准备就绪!
比赛开始!
2组4号出发!
2组1号出发!
2组3号出发!
2组2号出发!
2组1号到达终点!
2组4号到达终点!
2组3号到达终点!
2组2号到达终点!
比赛结束!

五.总结

  实际上,线程间的通讯方式远不止上面介绍的这些,还有不少手段能够在线程间传递信息,例如阻塞队列、信号量、线程中断机制等,咱们将会在以后的文章中进一步学习这部份内容。

相关文章
相关标签/搜索