多线程程序加速指南

虽然对于一个计算机程序来讲最重要的是正确性,若是一个程序没办法产出正确的结果,那么这个程序的价值就大打折扣了。但程序性能也是很重要的一个方面,若是程序运行得太慢,那也会影响到程序的适用范围和硬件配置的成本。java

在以前的文章《4.多线程中那些看不到的陷阱》中,咱们了解了线程间的同步机制,这主要是为了保证程序在多线程环境下的正确性。在这篇文章中咱们将会深刻探究多线程程序的性能瓶颈和多种不一样的优化方式,那么咱们首先就从对程序性能的测量与分析开始吧。编程

分析多线程程序的性能

咱们先来看一个使用AtomicLong进行多线程计数的程序,下面的程序中会启动两个线程,每一个线程会对静态变量count进行一亿次(10的8次方)的累加操做,这段代码在开始和结束的时候都获取了当前时间,而后经过这两个时间值计算程序的运行耗时。数组

public class AtomicIntegerTest {

    private static AtomicLong count = new AtomicLong(0);

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        Runnable task = new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count.incrementAndGet();
                }
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}
复制代码

在个人电脑上运行这段程序最后输出的结果是2.44s,看起来有点长了,那么咱们再看一下若是直接在单个线程中对一个整型变量累加两亿次会是什么结果。下面是一个在单个线程中累加两亿次的程序代码:缓存

public class SingleThreadTest {

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        long count = 0;
        for (int i = 0; i < 2e8; ++i) {
            count += 1;
        }
        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}
复制代码

这段代码运行耗时只有0.33s,咱们经过两个线程进行累加的代码居然比单线程的代码还要慢得多,这不是就和咱们使用多线程加快程序运行的初衷相违背了吗?安全

多线程程序的线程间同步是影响多线程程序性能的关键所在,一方面程序中必须串行化的部分会使系统总体的耗时显著增长,另外一方面同步行为自己的开销也比较大,特别是在发生冲突的状况下。在上文的代码中,多线程累加的程序之因此会比单线程还慢得多就是由于在AtomicLong类型的静态变量count上有两个线程同时调用incrementAndGet方法进行累加,这就会致使在这个静态变量上存在很严重的冲突。性能优化

当一个线程成功修改了变量count的值后,另一个正在修改的线程就会修改失败而且会再次重试累加操做。而且由于AtomicLong类型的对象中是用一个volatile变量来保存实际的整型值的,而咱们在以前的文章《多线程中那些看不到的陷阱》中能够了解到,对volatile变量的修改操做必定要把修改后的数据从高速缓存写回内存当中,这也是用AtomicLong进行累加的耗时比单线程累加版本还要多这么多的主要缘由。bash

那么咱们有没有更好的方法能够解决这个问题呢?数据结构

使用任务拆分进行优化

在上面的例子中,咱们须要的只是最终累加的结果,因此为了减少线程间同步的开销,咱们能够将累加任务拆分到不一样的线程中执行,到最后再把每一个线程的结果加在一块儿就能够获得最终的结果了。在下面的代码中咱们就使用了这种方法,t1在count1上累加一亿次,t2在count2上累加一亿次,最后把count1和count2相加获得最终的结果,咱们来一块儿运行一下,看看效果如何。多线程

public class TwoThreadTest {

    private static long count1 = 0;
    private static long count2 = 0;

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        Thread t1 = new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count1 += 1;
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count2 += 1;
                }
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        long count = count1 + count2;
        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}
复制代码

这段程序在个人电脑上的耗时是0.20s,比以前单线程的0.33s有了不小的提升,更是遥遥领先原先用两个线程累加AtomicLong类型变量版本的2.44s。这说明咱们以前的分析是正确的,CAS重试和volatile写回内存两个操做所引发的开销是AtomicLong版本程序性能低下的罪魁祸首。并发

可是这个版本的结构仍是略显原始了,在应付累加这种简单的需求时可能还比较容易,可是一旦面临复杂的并发任务,那可能就要写不少复杂的代码,而且很容易出现错误了。例如咱们若是想把任务拆分到10个线程中运行,那么咱们就要首先把两亿次的累加任务拆分为10份,而后还要建立一个包含10个Thread对象的数组让他们分别的对不一样范围进行累加,最后还要经过join方法等待这10个线程都执行完成,这个任务听起来就不太容易。不要紧,下面咱们将会介绍一种目前比较经常使用的任务拆分与运行框架来解决这个问题,经过这个框架咱们能够很容易地写出易于编写和扩展的任务拆分式程序。

使用ForkJoinPool进行任务

JDK 1.7中引入了一个新的多线程任务执行框架,被称为ForkJoinPoolForkJoinPool是一个Java类,它实现了表明线程池功能的ExecutorService接口,因此它在使用方法上和经常使用的线程池类ThreadPoolExecutor类似,但在本节中咱们并不须要了解线程池的详细用法,不过感兴趣的读者能够参考这篇文章从0到1玩转线程池来了解一下。

线程池就是一个线程的集合,其中的线程会一直等待执行任务,因此咱们能够把任务以任务对象的形式提交到线程池,而后线程池就会利用其中的线程来执行任务。在ForkJoinPool的使用中,线程池指的就是ForkJoinPool类型的对象,而任务对象指的就是继承自ForkJoinTask的类的对象。在下面的示例代码中,咱们使用了自定义的RecursiveTask的子类来做为任务类,RecursiveTask类就继承自ForkJoinTask类。

Recursive的意思是递归,也就是说咱们在这个任务类的执行过程当中可能会建立新的任务类对象来表明当前任务的子任务,而后经过结合多个子任务的结果来返回当前任务的结果。好比一开始的任务是累加两亿次,但那么咱们就能够把它分为两个分别累加一亿次的子任务的结果之和,一样的道理,累加一亿次的子任务也能够再被分为两个累加五千万次的子子任务。这样的拆分会一直持续到咱们认为任务规模已经足够小的时候,这时子任务的结果就会被计算,而后再返回给上层任务进行处理以后就获得上层任务的结果了。

若是前面这一段文字描述看不明白也不要紧,咱们在代码中找一找答案:

public class ForkJoinTest {

    private static class AccumulateTask extends RecursiveTask<Long> {

        private long start;
        private long end;
        private long threshold;

        /**
         * 任务的构造函数
         *
         * @param start         任务处理范围的起始点(包含)
         * @param end           任务处理范围的结束点(不包含)
         * @param threshold     任务拆分的阈值
         */
        public AccumulateTask(long start, long end, long threshold) {
            this.start = start;
            this.end = end;
            this.threshold = threshold;
        }

        @Override
        protected Long compute() {
            long left = start;
            long right = end;

            // 终止条件:若是当前处理的范围小于等于阈值(threshold),
            //                    那么就直接经过循环执行累加操做
            if (right - left <= (int) threshold) {
                long result = 0;
                for (long i = left; i < right; ++i) {
                    result += 1;
                }
                return result;
            }

            // 获取当前处理范围的中心点
            long mid = (start + end) / 2;

            // 拆分出两个子任务,一个从start到mid,一个从mid到end
            ForkJoinTask<Long> leftTask = new AccumulateTask(start, mid, threshold);
            ForkJoinTask<Long> rightTask = new AccumulateTask(mid, end, threshold);

            // 经过当前线程池运行两个子任务
            leftTask.fork();
            rightTask.fork();

            try {
                // 获取两个子任务的结果并返回
                return leftTask.get() + rightTask.get();
            } catch (Exception e) {
                return 0L;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        // 建立总任务,范围是从1到两亿(包含),阈值为10的7次方,因此最终至少会有10个任务进行for循环的累加
        AccumulateTask forkJoinTask = new AccumulateTask(1, (int) 2e8+1, (long) 1e7);
        // 使用一个新建立的ForkJoinPool任务池运行ForkJoin任务
        new ForkJoinPool().submit(forkJoinTask);

        // 打印任务结果
        System.out.println("count = " + forkJoinTask.get());

        // 计算程序耗时并打印
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}
复制代码

在上面的代码中,咱们会不断建立在指定范围内累加的子任务,直到任务范围小于阈值threshold(在代码中是10的7次方)时才再也不拆分子任务,而是经过循环来获得累加结果。以后子任务的返回结果在上层任务中相加并做为上层任务的结果返回。到最后咱们就能够获得累加两亿次的结果了。

在这个程序中,最重要的是对任务对象的三个操做:

  1. 建立任务对象,代码中使用的是new AccumulateTask(start, mid, threshold)new AccumulateTask(mid, end, threshold),这两段代码会建立除了范围不一样其余逻辑与父任务彻底一致的子任务,每一个子任务会负责执行父任务范围中的一半;
  2. 执行子任务,经过调用任务对象的fork()方法可让子任务被提交到当前ForkJoinPool中执行;
  3. 等待子任务返回结果,经过调用子任务任务对象的get()方法,父任务将会等待子任务执行完成并返回结果,而后将两个子任务的结果相加获得父任务的执行结果。

为何一样都是线程池,可是ThreadPoolExecutor类就很难实现这样的执行方式呢?细心的读者可能已经发现了,咱们在一个任务中会拆分出两个子任务,而且要等待这两个子任务都执行完成才能返回父任务的结果。若是是在ThreadPoolExecutor中,在等待子任务运行完成获得结果时,父任务会一直阻塞而且占用一个线程,这样的话若是父任务太多就会致使子任务没有线程可供使用了,这个运行流程就没办法继续执行下去了。而ForkJoinPool这个特殊的线程池就解决了这个问题,父任务在等待子任务执行时可让出线程给其余任务,这样就不会致使线程都被阻塞状态的父任务所阻塞了。

这种将任务拆分为互不依赖的子任务,而后分别在不一样的线程上执行,最后再将结果进行逐步合并的方法就被称为Map-Reduce。这种方法在离线大数据技术中被普遍应用,甚至能够说大数据相关技术就是在Map-Reduce思想基础上发展起来的也不为过。

线程内变量

经过上面的几个例子,咱们能够看到,对于多线程程序来讲,共享数据就是最大的问题。共享数据不但可能引发数据竞争问题,致使程序出现问题;并且随着引入的线程同步操做又会拉低程序的性能,甚至可能使多线程程序的执行时间比单线程程序还长得多。在上面的例子中,咱们经过使用ForkJoinPool拆分并执行了一个累加任务,各个子任务之间基本彻底独立,作到了最大程度的并行化。可是在一些状况下,咱们可能没办法作到如此理想的方案,在一些状况下仍是会留有必定的线程同步操做和对应的代码临界区。

那么在这些状况下咱们如何处理能让程序的性能尽量高呢?

假设咱们如今要统计一个方法的调用次数,若是可能有多个线程同时调用该方法,那么就须要对多个线程的调用同时计数。这种状况下咱们能够考虑在每一个线程里各自保留一个整型变量用于保存每一个线程内的调用次数,而后在获取总数时只须要把每一个线程中的数量加在一块儿就能计算出来了。而在累加计数时咱们只须要修改当前线程对应的变量就能够了,天然就没有了数据竞争问题。java.util.concurrent.atomic包中的累加器LongAdder类采用的就是这样的思路,这种思路也有本身专门的专业术语,被称为**“线程封闭”**,线程封闭指的就是这种经过线程内变量来避免线程间共享数据的优化方式。

Java中也有专门的ThreadLocal类能够处理线程内变量,只是由于性能和线程销毁时的数据保存之类的缘由通常不会用于多线程累加这样的数据聚合场景,可是在保存和获取数据方面很是的便利,有兴趣的读者能够了解一下。

ConcurrentHashMap

java.util.concurrent包为咱们提供了锁、原子类、线程池、ForkJoinPool等一大批并发编程工具。最后,咱们来了解一下java.util.concurrent包中为咱们提供的一种线程安全数据结构。

在Java中,咱们经常使用的Map类是HashMap,可是这个类并非线程安全的,若是咱们在多个线程中同时对HashMap对象进行读写,那么就有可能引起一些程序问题。还有一个从JDK 1.0起就已经存在的Hashtable类能够保证线程安全,可是咱们打开这个类的源代码能够看到,这个类中的大部分方法上都加上了synchronized标记,其中包括了最经常使用的getput方法,这意味着Hashtable类的对象同一时间几乎只能被一个线程所使用,这样的效率相对是比较低的。

可是其实熟悉HashMap结构的朋友可能会知道,HashMap内部的结构是分为不少的桶的,每一个键值对都会根据key值的hashCode值被放到不一样的桶中。其实在作修改操做时咱们只须要对对应的一个桶加锁就能够了,而在执行读操做时,在大多数状况下是不用加锁的。JDK 1.5中引入的ConcurrentHashMap基本能达到这两点。

在这个类中,咱们经过两种方式来优化了并发的性能:

  1. 经过限制锁保护的代码范围来减小了锁冲突发生的可能性,并且也减小了须要的锁数量,减小了同步产生的开销;
  2. 另外一方面由于读取的时候并不须要加锁,而只有写操做才须要加锁,在一些读操做较多可是写操做较少的状况下,咱们就能大大下降读操做的成本,从而提升了程序的性能。

并且ConcurrentHashMap中的大多数方法不只优化了同步机制的效率,并且提供了不少原子性的相似于CAS的操做方法,下面是ConcurrentHashMap类经常使用的操做:

  • V putIfAbsent(K key, V value),原子性操做,若是map中不包含key,则执行map.put(key, value)并将put方法的返回值返回,不然直接返回map.get(key)的值,即当前值;
  • boolean remove(Object key, Object value),原子性操做,若是知足条件 (map中包含key对应的键值对 && value参数等于键值对中当前的value值) 则移除key对应的键值对并返回true,不然返回false;
  • boolean replace(K key, V oldValue, V newValue),原子性操做,当知足 (map中包含key对应的键值对 && oldValue参数等于键值对中当前的value值)时,将key对应的值改成newValue并返回true,不然返回false。

总结

咱们在这篇文章中从对一个使用AtomicLong进行多线程累加的程序的性能测试开始,经过Map-Reduce思想大大优化了这个程序的性能,在此过程当中还涉及到了ForkJoinPool类的使用。以后咱们经过线程内变量正式提出了**“线程封闭”的概念,若是咱们能作到线程封闭**,那么由于减小了线程间同步的开销,因此线程的性能必定会有很大的提升。最后咱们介绍了java.util.concurrent包中为咱们提供的并发安全数据类ConcurrentHashMap。相信经过这篇文章你们可以了解到多种多线程性能优化方法,但最重要的仍是要找出多线程程序性能的瓶颈所在,这样才能在实际的实践场景中根据不一样的状况因时制宜,使用合适的方法解决不一样的瓶颈问题。本文中的观点是多线程程序的瓶颈主要在于共享数据引发的数据竞争问题,若是可以让不一样的线程间不存在或者尽量少地存在共享数据与临界区代码,就可以对多线程程序的性能起到正面的影响。