多线程设计模式——Read-Write Lock模式和Future模式分析

本文内全部实现的代码均附在文末,有须要能够参考。(好奇宝宝们能够粘贴下来跑一下java

多线程程序评价标准

  • 安全性:编程

    ​ 安全性就是不损坏对象。也就是保证对象内部的字段的值与预期相同。设计模式

  • 生存性:安全

    ​ 生存性是指不管何时,必要的处理都必定可以执行。失去生存性最典型的例子就是“死锁”。网络

  • 可复用性:多线程

    ​ 指类可以重复利用。若类可以做为组件从正常运行的软件里分割出来,说明这个类有很高的复用性。框架

  • 性能:dom

    ​ 指可以快速、大批量地执行处理。主要影响因素有:吞吐量、响应性、容量等。异步

这里还要再区分一下这四条。前两条是程序正常运行的必要条件;后两条是程序提升质量的必要条件。性能

任何模式都有一个相同的“中心思想”

安全性和生存性是基础,是全部模式都必须保证的;可复用性和性能是目的,是全部模式诞生的意义。”

上面这句话是咱们每个使用设计模式的人,甚至是本身编写代码的人所应该牢记在心的。

在接下来分析的两个模式中,我会用实际的设计模式的例子来帮助你们理解上面这句话的含义。

Read-Write Lock 模式

RW-Lock模式特色

  • 在执行读取操做以前,线程必须获取用于读取的锁。
  • 在执行写入操做以前,线程必须获取用于写入的锁。
  • 多个线程能够同时读取,可是读取时,不能够写入。
  • 至多有一个线程正在写入,此时其余线程不能够读取或写入。

通常来讲,执行互斥处理(也是必要的)会下降程序性能(这里的互斥处理指使用synchronized关键字)。可是经过这个模式,将针对写入的互斥处理和读取的互斥处理分开考虑,则能够提升程序性能。(具体性能提高效果请见下文“性能对比”一节)

冲突总结

多线程读写时总共有4种状况,会发生冲突的有三种。下面给出冲突表格:

读取 写入
读取 无冲突 读和写的冲突 RW Conflict
写入 读和写的冲突 RW Conflict 写和写的冲突 WW Conflict

手搓RW Lock模式代码

这部份内容是为了帮助你们更好地理解RW Lock的实现原理和过程,实际操做中咱们没必要编写这么多代码来实现读写锁。可是在这里强烈建议认真阅读此部分,了解原理后对使用JAVA自带包或是实现特殊需求都会大有裨益!

类图

Data类中的buffer字段是读写的信息。ReaderThread类是读取的线程,WriterThread是写入的线程。Data类中还保有一个ReadWriteLock类的实例,它是这个模式的主角,起到保护读写的做用。

Data类


第一行红线处,lock是一个ReadWriteLock类的实例,起到保护读写的做用。

第2、三行红线处,分别是readLock方法和readUnlock方法,夹在中间的是doRead方法(进行读取的方法)。

第4、五行红线处,分别是writeLock方法和writeUnlock方法,夹在中间的是doWrite方法(进行写入的方法)。

Data类中还有用于模拟耗时的方法,即假定写入操做耗时比读取长(符合一般程序的状况)。

这里提到的”夹在中间“的说法,实际上是另外一种设计模式——“Before/After模式”。因为它的使用有一些坑点,我这里先“中断”一下,简单讲一下“Before/After模式”。

P.S. Before/After模式

前置处理(此模式中为获取锁)
    try{
        实际的操做(有return也会执行finally语句块中的内容)
    } finally {
        后置处理(此模式中为释放锁)
    }

以上代码为Before/After模式的基本框架。

此模式使用有两点要特别注意!!!

  • try语句后面必定要跟着finally语句块!finally语句块的含义是:只要进入了try语句块,就必定会在最后执行一次finally语句块内的代码,即便try语句块内有return语句也会执行。在这个模式中,使用finally语句就保证了,获取的锁在最后必定会被释放掉,避免"死锁"发生。

  • 前置处理的语句必定要放在try语句块外面!这一点可能会有不少人不理解,放在里面仍是外面有什么区别?回答是:在绝大多数状况下,确实没有区别。 可是当线程被interrupt时,程序就有可能出现过多调用readUnlock和writeUnlock方法的风险。假如如今程序正在lock.readLock()中进行wait,此时该线程被interrupt,那么程序会抛出InterruptedException异常,并退出readLock方法。这时readingReaders字段并不会递增。

    从readLock方法退出的线程回跳到finally语句块,执行lock.readUnlock()。在这个方法中,,以前未递增的readingReaders字段会执行递减操做,该字段的值会与咱们预期不一样(变得比正常要小)。这就颇有可能引起难以察觉的bug。

    (上面两段中出现的方法名和字段不知道不要紧,它们都在下面即将介绍的ReadWriteLock类中,建议你们看完下面的ReadWriteLock类的介绍再回来理解一下这部分,很重要!!很容易出bug!!!

ReadWriteLock类

该类中保存有四个私有字段,前三个字段的含义很好理解,见图片中的代码注释。

在这里,特别强调preferWriter字段!这是保证程序运行结果达到预期的重要一环,其含义和用法须要你们好好理解。这个preferWriter表明的含义是读取和写入二者之间的优先级关系。当preferWriter字段为true时,表明写入优先;为false时,表明读取优先。那么这个读取或写入的优先又是如何经过这一个布尔值实现的呢?这里就体现出了ReadWriteLock类的设计巧妙之处。

咱们看readLock方法中的守护模式(while+wait)的守护条件(while成立的条件)。(见上图中第二行红线)这行代码的含义是若是有正在写入的线程(数据正在被写入)或是写入优先而且有正在等待写入的线程,那么读取的线程就要wait。这里,preferWriter字段发挥了它关键的做用。

再看readUnlock方法中对preferWriter字段的操做(第三行红线)。这里的含义是,在读取锁释放时,就把preferWriter字段置为true。由于读取锁释放时,必定表示已经进行完一次读取操做了,此时应该把优先权让给写入操做,因此将preferWriter置为true。

同理,writeUnlock方法中对preferWriter字段的操做(第四行红线)也即表明进行完一次写入操做后,要把优先权交给读取操做,即把preferWriter字段置为false。

这就像两我的却只有一个水瓶,一我的喝完一口水以后就要把水瓶交给对方,否则就会出现渴死的现象。

那么若是把ReadWriteLock类中的preferWriter字段去掉,程序运行起来会是什么样子呢?以下:

读取线程比写入线程多,并且读取操做耗时短,因此读取线程会一直抢占锁,致使写入线程没法写入。这就是程序“渴死”的样子了。(你们有兴趣能够把文末代码粘贴下来,把preferWriter字段去掉本身跑一下

正确运行结果

正确的运行结果应该是读取一段时间就写入一次,这样不断循环。因此读取的内容应该不断变化。结果见下图:

适用场合

  • 读取操做繁重时

    ​ 即read操做很耗费时间。这种状况下,使用这种模式比Single Thread Execution模式(使用synchrnized关键字)更适合。反之,Single Thread Execution模式性能更好。

  • 读取频率比写入频率高时

    ​ 该模式的优势在于Reader角色之间不会发生冲突,这样能够避免阻塞而耗费时间。但若写入频率很高,则Writer角色会频繁打断Reader角色的读取工做,致使性能提高不会很明显。

“逻辑锁”vs“物理锁”

你们确定都很熟悉经过synchronized关键字来进行线程同步控制,由于synchronized关键字能够获取实例的锁。可是这里synchronized关键字所获取的锁是JVM为每个实例提供的一个物理锁。每一个实例只有一个物理锁,不管如何编写程序,也没法改变这个物理锁的运行。

咱们这个Read Write Lock模式中所提供的“用于写入的锁”和“用于读取的锁”都是逻辑锁。这个锁不是JVM所规定的结构,而是编程人员本身实现的一种逻辑结构。这就是所谓的逻辑锁。咱们能够经过控制ReadWriteLock类来控制逻辑锁的运行。

那么这两者的关系是什么呢?其实,ReadWriteLock类提供的两个逻辑锁的实现,都是依靠ReadWriteLock实例持有的物理锁完成的。

而此处咱们也来解释一下上节中所说的,读取不繁重时,使用咱们本身所构建的逻辑锁就会致使比使用synchronized关键字(物理锁)多不少逻辑操做,这样多出来的逻辑操做所耗费的时间也许会大于线程被阻塞的时间。这样就会致使本模式反而会比Single Thread Execution性能差。

性能对比

示例代码中一共有6个读取线程,两个写入线程。在本节性能对比中,我让每一个读取线程进行20次读取后就输出运行时间而后终止。如下两张图分别为使用Read-Write Lock模式耗时和使用synchronized关键字耗时。

Read-Write Lock模式:

synchronized关键字:

从以上两图输出的时间能够看出,在每一个线程读取20次的状况下,使用Read-Write Lock模式能够比synchronized关键字节省三分之二(7秒钟左右)的时间。这在大量读取的程序中,会给程序性能带来极大的提高!!!(固然对于OO第二单元电梯做业来讲,因为读写频率差别不大并且读取并不繁琐,因此在电梯程序中使用Read-Write Lock模式性能提高并不明显。不过谁又能说得准之后会不会用到呢?)

“中心思想”分析

  • 正常运行的必要条件

    ​ 本模式中,经过ReadWriteLock类中的两个获取锁和两个释放锁的方法来模拟了synchronized关键字获取实例的锁和释放实例的锁这两个过程,从而在逻辑上保证了本模式在线程安全方面与synchronized关键字保护的方法彻底相同。所以在安全性和生存性两方面,本模式很好地完成了。

  • 提高性能的必要条件

    ​ 本模式中,经过找到读取和写入交汇的四种状况中的读读无冲突的状况,而且实现读取锁和写入锁的分离,实现了多线程同时读取的效果,以此来提升频繁读取或是“重读取”的程序的性能。
    ​ 同时,咱们不难发现,关于多线程同步控制的代码都封装在ReadWriteLock类中,其余部分直接调用便可,无需进行同步控制,提升了可复用性。

Future 模式

Future模式特色

我从本模式中先提取出两个最关键的核心代码展现一下。

Data data = host.request(10, ‘A’);

host.request方法是启动一个新线程来执行请求。可是在这行代码中该方法的返回值,不是新线程执行获得的最后结果,这个data只是一张“提货单”、“预定券”

先返回“提货单”的意义在于这个返回值能够当即获得,不用等待请求处理线程返回最后结果。在“作蛋糕”的期间,咱们能够作一些别的和“蛋糕”无关的事情,等到“蛋糕作好了”咱们再回去取“蛋糕”。

data.getContent();

上面这句代码就是线程“取蛋糕”的动做。这个方法的返回值是真正的“能吃的蛋糕”。

手搓Future模式代码

类图

Main类发出请求给Host类,Host类接收到请求后马上制造一个FutureData类的实例看成提货券返回给Main类,同时Host类马上启动一个新线程来处理请求(假设此处请求处理须要花费至关长时间),最后处理结果获得RealData类(蛋糕)。

Main类

Main类中,向Host类发出了三个请求。以后Main线程就去作别的工做了,咱们这里用sleep(2000)来模拟。作完别的工做以后,Main线程输出请求的结果。

Host类

第一个红线处,经过Future这个FutureData类的实例(共享对象),将Main线程(买蛋糕的人)和realdata(蛋糕)创建起了,超越“时空”的联系。

为何说“时空”呢?我本身的理解这个模式,就是在主线程获得提货券后,主线程无论在什么时候何地(这里的空间是抽象空间,也即主线程不在处理请求线程的”线程空间"内)均可以在结果计算出来后即时获取结果。

第二个红线处,使用了一个不太经常使用的语法模式——匿名内部类。读者没必要对这个语法熟练掌握,只须要知道在示例程序里,这个类新建了一个处理请求的线程实例并让新的线程运行起来去处理请求便可。(count和c变量前面都加上final关键字是匿名内部类的要求,了解便可)

说到这里,对于每一个新的请求都启动一个新的线程来处理是另外一个多线程设计模式——Thread-Per-Message模式。这个模式较为简单,感兴趣的读者能够自行学习了解一下,这里再也不赘述了。

FutureData类

第一个红线处,这里设计的又是一个新的多线程设计模式——Balk模式。Balk模式的“中心思想”是不要我就走了。即当有多个线程时,其中一个线程已经完成了请求,那么别的线程来要完成请求时,这个模式就经过if条件告诉线程:“我已经完成个人请求了,不用你再来工做了,你能够走了。”,经过return将线程返回回去。

第2、三个红线处,即在请求处理线程完成“蛋糕”的交付以后(this.realdata = realdata;),将ready字段置true,代表“蛋糕”已经随时能够取走了。而后通知全部等待线程。

第4、五个红线处,使用守护模式,以没有ready做为守护条件,即若是“蛋糕”尚未作好,“取蛋糕”的线程就要wait。不然经过getContent方法返回回去。

RealData类

第一个红线处,这个String字段在本示例程序中表明“蛋糕”。

第二个红线处,示例程序中用sleep来模拟耗时很长的请求处理过程。

运行结果

经过结果输出来看,在主线程执行其余工做的时候,与此同时请求正在被处理,这样极大地提升了处理效率。

模式分析

  • 提升吞吐量

    ​ 单核CPU中,纯计算过程是没法提升吞吐量的。其余状况都可。

  • 异步方法调用

    ​ 经过Thread-Per-Message模式经过新建线程,模拟实现了异步。可是Thread-Per-Message模式没法接收返回值。

  • “准备”和“使用”返回值的分离

    ​ 为了解决Thread-Per-Message模式没法接收返回值的尴尬局面,Future模式横空出世。Future模式经过将准备返回值(返回提货券)和使用返回值(调用getContent方法)分离,即解决了异步调用没法接收返回值的问题,又提升了性能。

与生产者-消费者模式有区别吗?

答案是有。

生产者-消费者模式你们都很熟悉,经过一个tray来将生产者生产产品(有的人将其对应为本模式的请求处理过程)和消费者使用产品(有的人将其对应为本模式的使用返回值过程)分离开来。目前来看,没有什么区别。

可是,咱们仔细想想,Future模式经过一张提货券将“生产者“和”消费者“创建起来一对一的独一无二的联系。也就是说我有这个”蛋糕”的提货券,我只能取我这个本身的“蛋糕”,而不能取“蛋糕店”里作好的别人的“蛋糕”。说到这里,相信你们都已经发现本模式与生产者-消费者模式最大的区别了吧。

模式拓展

  • 不让主线程久等的Future角色

    ​ 在示例程序中,若是FutureData的getContent方法被调用时,RealData类的实例尚未建立完成,则要主线程wait建立完成,有时这也会对主线程的效率形成损失。

    ​ 因此,为了不这种状况的发生,咱们能够将守护模式换成Balk模式,即主线程来“取蛋糕”时,若“蛋糕”还没作好,就让主线程返回,再等一下子。这样主线程能够继续进行其余工做,过必定时间后再回来“取蛋糕”。

  • 会发生变化的Future角色

    ​ 一般状况下,返回值只会被设置到Future角色中一次。可是在有时须要不断反复设置返回值时,能够考虑给Future角色赋予“当前返回值”,即这个返回值会不断随时间而改变。

    ​ 例如:在经过网络获取图像数据时,能够在最开始获取图像的长和宽,接着获取模糊图像数据,在获取清晰图像数据。此时,这个不断变化的Future角色可能会大有用处。

模式思考

在课上,老师提示我,是否能够用简单的方法实现主动返回值的Future模式。

目前,我只想到使用回调模式,在Future模式返回值设置好后,经过Host类回调主线程。不过,使用这种方式会致使Main类里多出不少与多线程同步处理相关的代码,致使Main类变的臃肿,并且整个模式可复用性也会下降。

我在想出好的解决办法以后会及时更新本文,向你们展现。同时也欢迎各位读者有好的解决办法在评论区留言。

Future模式“中心思想”

  • 正常运行必要条件

    本模式相似生产者-消费者的逻辑,将处理与请求分离,分离的同时创建起超越“时空”的联系,保证了最后结果传输的准确性。

  • 提升性能必要条件

    经过将“准备”返回值和“使用”返回值分离,将主线程从漫长的请求处理过程解放出来,让主线程在请求处理期间,能够作别的工做,提升性能。

伟大的Concurrent包!

RW Lock模式

JAVA提供了java.util.concurrent.locks包来提供读写锁的实现。这个包里的ReentrantReadWriteLock类实现了ReadWriteLock接口。这个包的实现原理即为上述手搓RW-Lock模式代码所讲解的原理和实现。具体使用方法很简单,在理解原理以后使用很简单,就很少赘述了。

Future模式

JAVA提供了java.util.concurrent.Future接口至关于本模式中的Future角色。其中java.util.concurrent.FutureTask类是实现了Future接口的标准类,主要有get(获取返回值)、set(设置返回值)、cancel(中断请求处理运行)和setException(设置异常)四个方法。

其原理和上述Future模式的手搓代码原理彻底一致,相信你们彻底理解上述讲解后,对这些concurrent包的使用必定会更加驾轻就熟!!

示例程序代码

  • RW Lock模式
public class Main {
    public static void main(String[] args) {
        Data data = new Data(10);
        Thread Reader1 = new ReaderThread(data);
        Reader1.start();
        Thread Reader2 = new ReaderThread(data);
        Reader2.start();
        Thread Reader3 = new ReaderThread(data);
        Reader3.start();
        Thread Reader4 = new ReaderThread(data);
        Reader4.start();
        Thread Reader5 = new ReaderThread(data);
        Reader5.start();
        Thread Reader6 = new ReaderThread(data);
        Reader6.start();
        Thread Writer1 = new WriterThread(data, "ABCDEFGHIJKLMNOPQTSTUVWXYZ");
        Writer1.start();
        Thread Writer2 = new WriterThread(data, "abcdefghijklmnopqrstuvwxyz");
        Writer2.start();
        Scanner input = new Scanner(System.in);
        String end  = input.nextLine();
        while (end.equals("")) { end  = input.nextLine(); }
        Reader1.interrupt();
        Reader2.interrupt();
        Reader3.interrupt();
        Reader4.interrupt();
        Reader5.interrupt();
        Reader6.interrupt();
        Writer1.interrupt();
        Writer2.interrupt();
    }
}


public class Data {
    private final char[] buffer;
    private ReadWriteLock lock = new ReadWriteLock();
    public Data(int size) {
        this.buffer = new char[size];
        for (int i = 0; i < buffer.length; i++) {
            buffer[i] = '*';
        }
    }
    public synchronized char[] read() throws InterruptedException {
        lock.readLock();
        try {
            return doRead();
        } finally {
            lock.readUnlock();
        }
    }
    public synchronized void write(char c) throws InterruptedException {
        lock.writeLock();
        try {
            doWrite(c);
        } finally {
            lock.writeUnlock();
        }
    }
    private char[] doRead() {
        char[] newbuf = new char[buffer.length];
        for (int i = 0; i < buffer.length; i++) {
            newbuf[i] = buffer[i];
        }
        slowly();
        return newbuf;
    }
    private void doWrite(char c) {
        for (int i = 0; i < buffer.length; i++) {
            buffer[i] = c;
            slowly();
        }
    }
    private void slowly() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
}

//此处为性能测试代码(即执行20次读取,并统计时间)
public class ReaderThread extends Thread {
    private final Data data;
    public ReaderThread(Data data) {
        this.data = data;
    }
    public void run() {
        try {
            long begin = System.currentTimeMillis();
            for (int i = 0; i < 20; i++) {
                char[] readbuf = data.read();
                System.out.println(Thread.currentThread().getName() + " reads " + String.valueOf(readbuf));
            }
            long time = System.currentTimeMillis() - begin;
            System.out.println(Thread.currentThread().getName() + ":time = " + time);
        } catch (InterruptedException e) {
        }
    }
}


import java.util.Random;

public class WriterThread extends Thread {
    private static final Random random = new Random();
    private final Data data;
    private final String filler;
    private int index = 0;
    public WriterThread(Data data, String filler) {
        this.data = data;
        this.filler = filler;
    }
    public void run() {
        try {
            while (true) {
                char c = nextchar();
                data.write(c);
                Thread.sleep(random.nextInt(3000));
            }
        } catch (InterruptedException e) {
        }
    }
    private char nextchar() {
        char c = filler.charAt(index);
        index++;
        if (index >= filler.length()) {
            index = 0;
        }
        return c;
    }
}


public final class ReadWriteLock {
    private int readingReaders = 0; // (A)…实际正在读取中的线程个数
    private int waitingWriters = 0; // (B)…正在等待写入的线程个数
    private int writingWriters = 0; // (C)…实际正在写入中的线程个数
    private boolean preferWriter = true; // 若写入优先,则为true

    public synchronized void readLock() throws InterruptedException {
        while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
            wait();
        }
        readingReaders++;                       // (A) 实际正在读取的线程个数加1
    }

    public synchronized void readUnlock() {
        readingReaders--;                       // (A) 实际正在读取的线程个数减1
        preferWriter = true;
        notifyAll();
    }

    public synchronized void writeLock() throws InterruptedException {
        waitingWriters++;                       // (B) 正在等待写入的线程个数加1
        try {
            while (readingReaders > 0 || writingWriters > 0) {
                wait();
            }
        } finally {
            waitingWriters--;                   // (B) 正在等待写入的线程个数减1
        }
        writingWriters++;                       // (C) 实际正在写入的线程个数加1
    }

    public synchronized void writeUnlock() {
        writingWriters--;                       // (C) 实际正在写入的线程个数减1
        preferWriter = false;
        notifyAll();
    }
}
  • Future模式
public class Main {
    public static void main(String[] args) {
        System.out.println("main BEGIN");
        Host host = new Host();
        Data data1 = host.request(10, 'A');
        Data data2 = host.request(20, 'B');
        Data data3 = host.request(30, 'C');

        System.out.println("main otherJob BEGIN");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println("main otherJob END");

        System.out.println("data1 = " + data1.getContent());
        System.out.println("data2 = " + data2.getContent());
        System.out.println("data3 = " + data3.getContent());
        System.out.println("main END");
    }
}


public class Host {
    public Data request(final int count, final char c) {
        System.out.println("    request(" + count + ", " + c + ") BEGIN");

        // (1) 建立FutureData的实例
        final FutureData future = new FutureData();

        // (2) 启动一个新线程,用于建立RealData的实例
        new Thread() {
            public void run() {
                RealData realdata = new RealData(count, c);
                future.setRealData(realdata);
            }
        }.start();

        System.out.println("    request(" + count + ", " + c + ") END");

        // (3) 返回FutureData的实例
        return future;
    }
}


public interface Data {
    public abstract String getContent();
}


public class RealData implements Data {
    private final String content;
    public RealData(int count, char c) {
        System.out.println("        making RealData(" + count + ", " + c + ") BEGIN");
        char[] buffer = new char[count];
        for (int i = 0; i < count; i++) {
            buffer[i] = c;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }
        System.out.println("        making RealData(" + count + ", " + c + ") END");
        this.content = new String(buffer);
    }
    public String getContent() {
        return content;
    }
}


public class FutureData implements Data {
    private RealData realdata = null;
    private boolean ready = false;
    public synchronized void setRealData(RealData realdata) {
        if (ready) {
            return;     // balk
        }
        this.realdata = realdata;
        this.ready = true;
        notifyAll();
    }
    public synchronized String getContent() {
        while (!ready) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        return realdata.getContent();
    }
}

参考资料:《图解JAVA多线程设计模式》

相关文章
相关标签/搜索