Java并发编程之线程间通信(下)-生产者与消费者

前文回顾

上一篇文章重点唠叨了java中协调线程间通讯的wait/notify机制,它有力的保证了线程间通讯的安全性以及便利性。本篇将介绍wait/notify机制的一个应用以及更多线程间通讯的内容。java

生产者-消费者模式

目光从厕所转到饭馆,一个饭馆里一般都有好多厨师以及好多服务员,这里咱们把厨师称为生产者,把服务员称为消费者,厨师和服务员是不直接打交道的,而是在厨师作好菜以后放到窗口,服务员从窗口直接把菜端走给客人就行了,这样会极大的提高工做效率,由于省去了生产者和消费者之间的沟通成本。从java的角度看这个事情,每个厨师就至关于一个生产者线程,每个服务员都至关于一个消费者线程,而放菜的窗口就至关于一个缓冲队列生产者线程不断把生产好的东西放到缓冲队列里,消费者线程不断从缓冲队列里取东西,画个图就像是这样:程序员

图片描述

现实中放菜的窗口能放的菜数量是有限的,咱们假设这个窗口只能放5个菜。那么厨师在作完菜以后须要看一下窗口是否是满了,若是窗口已经满了的话,就在一旁抽根烟等待,直到有服务员来取菜的时候通知一下厨师窗口有了空闲,能够放菜了,这时厨师再把本身作的菜放到窗口上去炒下一个菜。从服务员的角度来讲,若是窗口是空的,那么也去一旁抽根烟等待,直到有厨师把菜作好了放到窗口上,而且通知他们一下,而后再把菜端走。数组

咱们先用java抽象一下菜:安全

public class Food {

    private static int counter = 0;

    private int i;  //表明生产的第几个菜

    public Food() {
        i = ++counter;
    }

    @Override
    public String toString() {
        return "第" + i + "个菜";
    }
}

每次建立Food对象,字段i的值都会加1,表明这是建立的第几道菜。多线程

为了故事的顺利进行,咱们首先定义一个工具类:并发

class SleepUtil {

    private static Random random = new Random();

    public static void randomSleep() {
        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

SleepUtil的静态方法randomSleep表明当前线程随机休眠一秒内的时间。dom

而后咱们再用java定义一下厨师:ide

public class Cook extends Thread {

    private Queue<Food> queue;

    public Cook(Queue<Food> queue, String name) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            SleepUtil.randomSleep();    //模拟厨师炒菜时间

            Food food = new Food();
            System.out.println(getName() + " 生产了" + food);

            synchronized (queue) {
                while (queue.size() > 4) {
                    try {
                        System.out.println("队列元素超过5个,为:" + queue.size() + " " + getName() + "抽根烟等待中");
                        queue.wait();

                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

                queue.add(food);
                queue.notifyAll();
            }
        }
    }
}

咱们说每个厨师Cook都是一个线程,内部维护了一个名叫queue的队列。在run方法中是一个死循环,表明不断的生产Food。他每生产一个Food后,都要判断queue队列中元素的个数是否是大于4,若是大于4的话,就调用queue.wait()等待,若是不大于4的话,就把建立号的Food对象放到queue队列中,因为可能多个线程同时访问queue的各个方法,因此对这段代码用queue对象来加锁保护。当向队列添加完刚建立的Food对象以后,就能够通知queue这个锁对象关联的等待队列中的服务员线程们能够继续端菜了。工具

而后咱们再用java定义一下服务员:this

class Waiter extends Thread {

    private Queue<Food> queue;

    public Waiter(Queue<Food> queue, String name) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {

            Food food;
            synchronized (queue) {
                while (queue.size() < 1) {
                    try {
                        System.out.println("队列元素个数为: " + queue.size() + "," + getName() + "抽根烟等待中");
                        queue.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

                food = queue.remove();
                System.out.println(getName() + " 获取到:" + food);
                queue.notifyAll();
            }

            SleepUtil.randomSleep();    //模拟服务员端菜时间
        }
    }
}

每一个服务员也是一个线程,和厨师同样,都在内部维护了一个名叫queue的队列。在run方法中是一个死循环,表明不断的从队列中取走Food。每次在从queue队列中取Food对象的时候,都须要判断一下队列中的元素是否小于1,若是小于1的话,就调用queue.wait()等待,若是不小于1的话,也就是队列里有元素,就从队列里取走一个Food对象,而且通知与queue这个锁对象关联的等待队列中的厨师线程们能够继续向队列里放入Food对象了。

在厨师和服务员线程类都定义好了以后,咱们再建立一个Restaurant类,来看看在餐馆里真实发生的事情:

public class Restaurant {

    public static void main(String[] args) {

        Queue<Food> queue = new LinkedList<>();
        new Cook(queue, "1号厨师").start();
        new Cook(queue, "2号厨师").start();
        new Cook(queue, "3号厨师").start();
        new Waiter(queue, "1号服务员").start();
        new Waiter(queue, "2号服务员").start();
        new Waiter(queue, "3号服务员").start();
    }
}

咱们在Restaurant中安排了3个厨师和3个服务员,你们执行一下这个程序,会发如今若是厨师生产的过快,厨师就会等待,若是服务员端菜速度过快,服务员就会等待。可是整个过程厨师和服务员是没有任何关系的,它们是经过队列queue实现了所谓的解耦。

这个过程虽然不是很复杂,可是使用中仍是须要注意一些问题:

  • 咱们这里的厨师和服务员使用同一个锁queue

使用同一个锁是由于对queue的操做只能用同一个锁来保护,假设使用不一样的锁,厨师线程调用queue.add方法,服务员线程调用queue.remove方法,这两个方法都不是原子操做,多线程并发执行的时候会出现不可预测的结果,因此咱们使用同一个锁来保护对queue这个变量的操做,这一点咱们在唠叨设计线程安全类的时候已经强调过了。

  • 厨师和服务员线程使用同一个锁queue的后果就是厨师线程和服务员线程使用的是同一个等待队列。

可是同一时刻厨师线程和服务员线程不会同时在等待队列中,由于当厨师线程在wait的时候,队列里的元素确定是5,此时服务员线程确定是不会wait的,可是消费的过程是被锁对象queue保护的,因此在一个服务员线程消费了一个Food以后,就会调用notifyAll来唤醒等待队列中的厨师线程们;当消费者线程在wait的时候,队列里的元素确定是0,此时厨师线程确定是不会wait的,生产的过程是被锁对象queue保护的,因此在一个厨师线程生产了一个Food对象以后,就会调用notifyAll来唤醒等待队列中的服务员线程们。因此同一时刻厨师线程服务员线程不会同时在等待队列中。

  • 在生产和消费过程,咱们都调用了SleepUtil.randomSleep();。

咱们这里的生产者-消费者模型是把实际使用的场景进行了简化,真正的实际场景中生产过程和消费过程通常都会很耗时,这些耗时的操做最好不要放在同步代码块中,这样会形成别的线程的长时间阻塞。若是把生产过程和消费过程都放在同步代码块中,也就是说在一个厨师炒菜的同时不容许别的厨师炒菜,在一个服务员端菜的同时不容许别的程序员端菜,这个显然是不合理的,你们须要注意这一点。

以上就是wait/notify机制的一个现实应用:生产者-消费者模式的一个简介。

管道输入/输出流

还记得在唠叨I/O的时候提到的管道流么,这些管道流就是用于在不一样线程之间的数据传输,一共有四种管道流:

  • PipedInputStream:管道输入字节流
  • PipedOutputStream:管道输出字节流
  • PipedReader:管道输入字符流
  • PipedWriter:管道输出字符流

字节流和字符流的用法是差很少的,咱们下边以字节流为例来唠叨一下管道流的用法。

一个线程能够持有一个PipedInputStream对象,这个PipedInputStream对象在内部维护了一个字节数组,默认大小为1024字节。它并不能单独使用,须要与另外一个线程持有的一个PipedOutputStream创建关联,PipedOutputStream往该字节数组中写数据,PipedInputStream从该字节数组中读数据,从而实现两个线程的通讯。

PipedInputStream

先看一下它的几个构造方法:

图片描述
它有一个特别重要的方法就是:
图片描述

PipedOutputStream

看一下它的构造方法:
图片描述
它也有一个链接到管道输入流的方法:
图片描述

使用示例

管道流的一般使用场景就是一个线程持有一个PipedInputStream对象,另外一个线程持有一个PipedOutputStream对象,而后把这两个输入输出管道流经过connect方法创建链接,此后从管道输出流写入的数据就能够经过管道输入流读出,从而实现了两个线程间的数据交换,也就是实现了线程间的通讯

public class PipedDemo {

    public static void main(String[] args){

        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream();

        try {
            in.connect(out);    //将输入流和输出流创建关联
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        new ReadThread(in).start();
        new WriteThread(out).start();
    }
}

class ReadThread extends Thread {

    private PipedInputStream in;

    public ReadThread(PipedInputStream in) {
        this.in = in;
    }

    @Override
    public void run() {

        int i = 0;
        try {
            while ((i=in.read()) != -1) {   //从输入流读取数据
                System.out.println(i);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

class WriteThread extends Thread {

    private PipedOutputStream out;

    public WriteThread(PipedOutputStream out) {
        this.out = out;
    }

    @Override
    public void run() {
        byte[] bytes = {1, 2, 3, 4, 5};
        try {
            out.write(bytes);   //向输出流写入数据
            out.flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

执行结果是:

1
2
3
4
5

join方法

咱们前边说过这个方法,好比有代码是这样:

public static void main(String[] args) {

    Thread t = new Thread(new Runnable() {

        @Override
        public void run() {
            // ... 线程t执行的具体任务
        }
    }, "t");

    t.start();

    t.join();
    System.out.println("t线程执行完了,继续执行main线程");
}

main线程中调用t.join(),表明main线程须要等待t线程执行完成后才能继续执行。也就是说,这个join方法能够协调各个线程之间的执行顺序。它的实现其实很简单:

public final synchronized void join() throws InterruptedException {
    while (isAlive()) {
        wait();
    }
}

须要注意的是,join方法Thread类的成员方法。上边例子中在main线程中调用t.join()的意思就是,使用Thread对象t做为锁对象,若是t线程还活着,就调用wait(),把main线程放到与t对象关联的等待队列里,直到t线程执行结束,系统会主动调用一下t.notifyAll(),把与t对象关联的等待队列中的线程所有移出,从而main线程能够继续执行~

固然它还有两个指定等待时间的重载方法:
图片描述

java线程的状态

java为了方便的管理线程,对底层的操做系统的线程状态作了一些抽象封装,定义了以下的线程状态:
图片描述

须要注意的是:

  • 对于在操做系统中线程的运行/就绪状态,java语言中统一用RUNNABLE状态来表示。
  • 对于在操做系统中线程的阻塞状态,java语言中用BLOCKEDWAITINGTIME_WAITING这三个状态分别表示。
  • 也就是对阻塞状态进行了进一步细分。对于由于获取不到锁而产生的阻塞称为BLOCKED状态,由于调用wait或者join方法而产生的阻塞称为WAITING状态,由于调用有超时时间的waitjoin或者sleep方法而产生的在有限时间内阻塞称为TIME_WAITING状态。

你们能够经过这个图来详细的看一下各个状态之间的转换过程:
图片描述

java这么划分线程的状态纯属于方便本身的管理,好比它会给在WAITINGTIMED_WAITING状态的线程分别创建不一样的队列,来方便实施不一样的恢复策略~因此你们也不用纠结为啥和操做系统中定义的不同,其实操做系统中对各个状态的线程仍然有各类细分来方便管理,若是是你去设计一个语言或者一个操做系统,你也能够为了本身的方便来定义一下线程的各类状态。咱们做为语言的使用者,首先仍是把这些状态记住了再说哈🙄~

获取线程执行状态

java中定义了一个State枚举类型,来表示线程的状态:

public class Thread implements Runnable {
    // ... 为节省篇幅,省略其它方法和字段

    public enum State { 
        NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED;
    }
}

而后又在Thread类里定义了一个成员方法:

public State getState() {
    //省略了具体的实现
}

咱们能够经过这个getState方法来获取到对应的线程状态,下边来举个例子获取上边列举的6种状态,为了故事的顺利发展,咱们先定义一个工具类:

public class LockUtil {

    public static void sleep(long mill) {
        try {
            Thread.sleep(mill);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void wait(Object obj) {
        try {
            obj.wait();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

由于每次调用sleepwait操做的时候都会有InterruptedException的异常说明,咱们都须要try...catch一下,会致使代码结构会很混乱,因此咱们写了个工具类来把InterruptedException的异常转为运行时异常。注意,咱们这里转为运行时异常只是为了代码结构清晰,真实状况须要认真处理InterruptedException的异常说明,具体怎么使用咱们后边会详细唠叨。

而后接着写具体的获取状态的代码:

public class ThreadStateDemo {

    private static Object lock = new Object();  //锁对象

    public static void main(String[] args) {

        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {

                double d = 0.1;
                int i = 0;
                while (i++ < 100000) {  //模仿一个耗时操做
                    d = d*0.3d;
                }

                SleepUtil.sleep(2000L);    //休眠2秒钟
                synchronized (lock) {
                    LockUtil.wait(lock);
                }
                synchronized (lock) {   //尝试获取lock锁

                }
            }
        }, "t");

        System.out.println("初始状态:" + t.getState());
        t.start();
        System.out.println("运行一个耗时操做时的状态:" + t.getState());

        SleepUtil.sleep(1000L);

        System.out.println("休眠时的状态:" + t.getState());


        SleepUtil.sleep(2000L);

        System.out.println("wait的状态:" + t.getState());
        synchronized (lock) {
            lock.notifyAll();
        }
        System.out.println("被notify后的状态:" + t.getState());

        synchronized (lock) {
            SleepUtil.sleep(1000L); //调用sleep方法不会释放锁
            System.out.println("由于获取锁而阻塞的状态:" + t.getState());
        }
    }
}

咱们在程序里用了一系列的sleep方法来控制程序的执行顺序,这只是为了简单的说明线程的各个状态的产生缘由,在真实环境中是不容许使用sleep方法来控制线程间的执行顺序的,应该使用同步或者咱们上边介绍的一系列线程通讯的方式。这个程序的执行结果是:

初始状态:NEW
运行一个耗时操做时的状态:RUNNABLE
休眠时的状态:TIMED_WAITING
wait的状态:WAITING
被notify后的状态:BLOCKED
由于获取锁而阻塞的状态:TERMINATED
线程的各个状态都获取到了哈。

总结

  1. 基于wait/notify机制的生产者-消费者模式很重要,务必认真看几遍~
  2. 一个线程能够持有一个PipedInputStream对象,这个PipedInputStream对象在内部维护了一个字节数组,默认大小为1024字节。它并不能单独使用,须要与另外一个线程持有的一个PipedOutputStream创建关联,PipedOutputStream往该字节数组中写数据,PipedInputStream从该字节数组中读数据,从而实现两个线程的通讯。
  3. 使用join方法能够实现一个线程在另外一个线程执行完毕后才继续执行的功能。
  4. java为了方便的管理线程,对底层的操做系统的线程状态作了一些抽象封装,定义了NEWRUNNABLEBLOCKEDWAITINGTIME_WAITINGTERMINATED这些线程状态,与操做系统中的线程有一些区别:
  • 对于在操做系统中线程的运行/就绪状态,java语言中统一用RUNNABLE状态来表示。
  • 对于在操做系统中线程的阻塞状态,java语言中用BLOCKEDWAITINGTIME_WAITING这三个状态分别表示。

题外话

写文章挺累的,有时候你以为阅读挺流畅的,那实际上是背后无数次修改的结果。若是你以为不错请帮忙转发一下,万分感谢~

相关文章
相关标签/搜索