java多线程——并发测试

这是多线程系列第六篇,其余请关注如下:java

java 多线程—线程怎么来的?编程

java多线程-内存模型数组

java多线程——volatile缓存

java多线程——锁安全

java多线程——CAS多线程

 

编写并发程序时候,能够采起和串行程序相同的编程方式。惟一的难点在于,并发程序存在不肯定性,这种不肯定性会令程序出错的地方远比串行程序多,出现的方式也没有固定规则。这对程序的应用会形成一些困难,那么如何在测试中,尽量的暴露出这些问题,而且了解其性能瓶颈,这也是对开发者带来新的挑战。并发

本篇基于多线程知识,梳理一些多线程测试须要掌握的方法和原则,以指望可能的在开发阶段,就暴露出并发程序的安全性和性能问题,为多线程可以高效安全的运行提供帮助。性能

本篇主要包含如下内容:测试

1. 并发测试分类优化

2. 正确性测试

3. 安全性测试

4. 性能测试

 

并发测试分类

 

测试流程

并发测试和串行测试有相同的部分,好比都须要线测试其在串行状况下的正确性,这个是保证后续测试的基础,固然了,正确性测试和咱们的串行测试方没有什么不一样,都是在保证其程序在单线程状况下执行和串行执行有相同的结果,这个咱们再也不陈述。 

通常的并发测试,咱们按照如下流程来进行。

 

分类

并发测试大体能够分为两类:安全性测试与活跃性测试。

安全性测试咱们能够定义为“不发生任何错误的行为”,也能够理解为保持一致性。好比i++操做,但单线程状况下,循环20次,i=20,但是在多线程状况下,若是总共循环20次,结果不为20,那么这个结果就是错误的,说明出现了错误的线程安全问题。咱们在测试这种问题的时候,必需要增长一个”test point”保证其原子性同时又不影响程序的正确性。以此为判断条件执行测试代码,关于“test point”如何作,咱们后续再讨论。

活跃性测试定义为“某个良好的行为终究会发生”,也能够为理解为程序运行有必然的结果,不会出现因某个方法阻塞,而运行缓慢,或者是发生了线程死锁,致使一直等待的状态等。

与活跃性测试相关的是性能测试。主要有如下几个方面进行衡量:吞吐量,响应性,可伸缩性。

  • 吞吐量:一组并发任务中已完成任务所占的比例。或者说是必定时间内完成任务的数量。
  • 响应性:请求从发出到完成之间的时间
  • 可伸缩性:在增长更多资源(CPU,IO,内存),吞吐量的提高状况。

 

安全性测试

 

安全性测试,如前面所说是“不发生任何错误的行为”,也是要对其数据竞争可能引起的错误进行测试。这也是咱们须要找到一个功能中并发的的“test point”,并对其额外的构造一些测试。并且这些测试最好不须要任何同步机制。

咱们经过一个例子来进行说明。

好比ArrayBlockingQueue,咱们知道这个class是采用一个有界的阻塞队列来实现的生产-消费模式的。若是对其测试并发问题的,重要的就是对put和take方法进行测试,一种有效的方法就是检查被放入队列中和出队列中的各个元素是否相等。若是出现数据安全性的问题,那么必然入队列的值和出队列的值没有发生对应,结果也不尽相同。好比多线程状况下,咱们把全部入列元素和出列元素的校检和进行比较,若是两者相等,那么代表测试成功。

为了保证其可以测试到全部要点,须要对入队的值进行随机生成,令每次测试获得的结果不尽相同。另外为了保证其公平性,要保证全部的线程一块儿开始运算,防止先进行的程序进行串行运算。

public class PutTakeTest {

    protected static final ExecutorService pool = Executors.newCachedThreadPool();



    //栅栏,经过它能够实现让一组线程等待至某个状态以后再所有同时执行

    protected CyclicBarrier barrier;

    protected final ArrayBlockingQueue<Integer> bb;

    protected final int nTrials, nPairs;

    //入列总和

    protected final AtomicInteger putSum = new AtomicInteger(0);

    //出列总和

    protected final AtomicInteger takeSum = new AtomicInteger(0);



    public static void main(String[] args) throws Exception {

        new PutTakeTest(10, 10, 100000).test(); // 10个承载因子,10个线程,运行100000

        pool.shutdown();

    }



    /**

     *

     * @param capacity 承载因子(缓存)

     * @param npairs 线程数量

     * @param ntrials 单个线程执行数量(吞吐量)

     */

    public PutTakeTest(int capacity, int npairs, int ntrials) {

        this.bb = new ArrayBlockingQueue<Integer>(capacity);

        this.nTrials = ntrials;

        this.nPairs = npairs;

        this.barrier = new CyclicBarrier(npairs * 2 + 1);

    }



    void test() {

        try {

            for (int i = 0; i < nPairs; i++) {

                pool.execute(new Producer());

                pool.execute(new Consumer());

            }

            barrier.await(); // 等待全部的线程就绪

            barrier.await(); // 等待全部的线程执行完成

            System.out.println("result,put==take :"+(putSum.get()==takeSum.get()));

        } catch (Exception e) {

            throw new RuntimeException(e);

        }

    }



    static int xorShift(int y) {

        y ^= (y << 6);

        y ^= (y >>> 21);

        y ^= (y << 7);

        return y;

    }



    //生产者

    class Producer implements Runnable {

        public void run() {

            try {

                int seed = (this.hashCode() ^ (int) System.nanoTime());

                int sum = 0;

                barrier.await();

                for (int i = nTrials; i > 0; --i) {

                    bb.put(seed);

                    sum += seed;

                    seed = xorShift(seed);

                }

                putSum.getAndAdd(sum);

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }



    //消费者

    class Consumer implements Runnable {

        public void run() {

            try {

                barrier.await();

                int sum = 0;

                for (int i = nTrials; i > 0; --i) {

                    sum += bb.take();

                }

                takeSum.getAndAdd(sum);

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }

}

 

以上程序中,咱们增长putSum和takeSum变量,用来统计put和take数据的校检和。同时采用CyclicBarrier(回环栅栏)令全部的线程同一时间从相同的位置开始执行。每一个线程的入列数据,为了保证其惟一性,都生成一个惟一的seed,在下列代码执行出,必然是多线程竞争的地方,

    for (int i = nTrials; i > 0; --i) {

         bb.put(seed);

                    sum += seed;

                    seed = xorShift(seed);

       }

若是此处出现线程安全问题,那么最终take出来的数据和put的数据必然是不相同的,最终putSum和takeSum的值必然不一样,相反则相同。

因为并发代码中大多数错误都是一些低几率的事件,所以在测试的时候,仍是须要反复测试屡次,以提升发现错误的几率。

 

性能测试

 

性能测试一般是功能测试的延伸。虽然性能测试与功能测试之间会有重叠之处,但它们的目标是不一样的。

首先性能测试须要反映出被测试对象在应用程序中的实际用法以及它的吞吐量。另外须要根据经验值来调整各类不一样的限值,好比线程数,并发数等,从而令程序更好的在系统上运行。

咱们对上述的PutTakeTest进行扩展,增长如下功能:

 

一、增长一个记录运行一次分组的运行时间,为了保证时间精确性。

采用BarrierTimer来维护,它implements Runnable,在计数达到栅栏(CyclicBarrier)指定的数量以后,会调用一次该回调,设置结束时间。

咱们用它来记录,单个测试运行的时间。有总时间了,单次操做的时间就能够计算出来了。如此咱们就能够计算出单个测试的吞吐量。

吞吐量=1ms/单次操做的时间=每秒能够执行的次数。

如下是基于栅栏的计时器。

public class BarrierTimer implements Runnable{

    private boolean started;

    private long startTime, endTime;



    public synchronized void run() {

        long t = System.nanoTime();

        if (!started) {

            started = true;

            startTime = t;

        } else

            endTime = t;

    }



    public synchronized void clear() {

        started = false;

    }



    public synchronized long getTime() {

        return endTime - startTime;

    }


}

 

二、性能测试须要针对不一样参数组合进行测试。

 

经过不一样参数来进行组合测试,以此来得到在不一样参数下的吞吐率,以及不一样线程数量下的可伸缩性。在putTakeTest里面,咱们只只针对安全性测试。

咱们看增强版本的TimedPutTakeTest,这里咱们把ArrayBlockingQueue的容量分别设置为一、十、100、1000,令其在线程数量分别为一、二、四、八、1六、3二、6四、128的状况下,看其链表的吞吐率。

public class TimedPutTakeTest extends PutTakeTest {

    private BarrierTimer timer = new BarrierTimer();



    public TimedPutTakeTest(int cap, int pairs, int trials) {

        super(cap, pairs, trials);

        barrier = new CyclicBarrier(nPairs * 2 + 1, timer);

    }



    public void test() {

        try {

            timer.clear();

            for (int i = 0; i < nPairs; i++) {

                pool.execute(new PutTakeTest.Producer());

                pool.execute(new PutTakeTest.Consumer());

            }

            barrier.await();

            barrier.await();

            long nsPerItem = timer.getTime() / (nPairs * (long) nTrials);

            System.out.print("Throughput: " + nsPerItem + " ns/item");

            System.out.println("result:"+(putSum.get()==takeSum.get()));

        } catch (Exception e) {

            throw new RuntimeException(e);

        }

    }



    public static void main(String[] args) throws Exception {

        int tpt = 100000; // trials per thread

        for (int cap = 1; cap <= 1000; cap *= 10) {

            System.out.println("Capacity: " + cap);

            for (int pairs = 1; pairs <= 128; pairs *= 2) {

                TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt);

                System.out.print("Pairs: " + pairs + "\t");

                t.test();

                System.out.print("\t");

                Thread.sleep(1000);

                t.test();

                System.out.println();

                Thread.sleep(1000);

            }

        }

        PutTakeTest.pool.shutdown();

    }

}

 

如下是咱们针对ArrayBlockingQueue的性能测试结果,个人电脑硬件环境是:

cpu i7 4核8线程

memory  16G

硬盘 SSD110G

 

jdk 环境

java version “1.8.0_45"

Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

 

从上面能够看到如下几点状况

一、在ArrayBlockingQueue的缓存容量在1的状况下,不管线性并发数为多少,都不能显著的提高其吞吐率。这是由于每一个线程在阻塞等待另外线程执行任务。

二、当尝试把缓存容量提高至十、100、1000的时候,吞吐率都获得了极大的提升,特别是在1000的时候,最高可达到900w次/s。

三、当线程增长到16个的时候,吞吐率会达到顶峰,而后再增长线程吞吐率不生反而降低,固然没有很大的降低,这是由于,当线程增多的时候,大部分时间耗费在阻塞和解除阻塞上面了。

其余阻塞队列的比较

 

如下是针对ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeQue、PriorityBlockingQueue几种阻塞队列进行的横向测评。硬件环境仍是和上述相同。jdk仍是采用1.8的API。

每一个队列的缓存容量是1000。而后分别在一、二、四、八、1六、3二、6四、128的线程并发下,查看其吞吐率。

从上述数据中,咱们能够看到:

一、ArrayBlockingQueue在jdk1.8的优化下性能高于LinkedBlockingQueue,虽然二者差异不是太大,这个是1.6以前,LinkedBlockingQueue是要优于ArrayBlockingQueue的。

二、PriorityBlockingQueue在达到290w的吞吐高峰以后,性能开始持续的降低,这是由于优先队列须要不断的优化优先列表,而须要必定的排序时间。

以上测试的主要目的是,测试生产者和消费者在经过有界put和take传送数据时,那些约束条件将对整个吞吐量产生影响。因此会忽略了许多实际的因素。另外因为jit的动态编译,会直接将编译后的代码直接编译为机器代码。因此以上测试须要通过预热处理,运行更多的次数,以保证全部的代码都是编译完成以后,才统计测试的运行时间。

 

最后

测试并发程序的正确性可能会特别困难的,由于并发程序的许多故障都是一些低几率的事情,而且它们对执行时序、负载状况以及其余难以重现的条件比较敏感。要想尽量的发现这些错误,就须要咱们作更多的工做来进行分析测试,期待今天的介绍可以帮助你们开阔一些思路。

 

引用:

以上测试代码,引用自《java Concurrency in Practice》

相关文章
相关标签/搜索