《Java7并发编程实战手册》学习笔记(九)——测试并发应用程序

此篇博客为我的学习笔记,也是Java并发编程学习第一部分的最后一篇,若有错误欢迎你们指正

本次内容

  1. 监控Lock接口
  2. 监控Phaser类
  3. 监控执行器框架
  4. 监控Fork/Join池
  5. 输出高效的日志信息

1.监控Lock接口

咱们在并发程序中使用锁进行同步时,能够直接调用锁对象的一些方法来判断锁当前的状态。咱们也能够经过继承ReentrantLock类并在定制类的公开方法中调用父类中被protected关键字修饰的方法以此来达到某些需求。下面是ReentrantLock类实现的用于监控锁的状态的经常使用方法:java

  • getOwner():返回当前持有锁的线程对象,此方法被protected关键字修饰
  • getQueuedThreads():返回等待获取锁的线程集合,此方法一样被protected关键字修饰
  • hasQueuedThreads():返回一个布尔值表示当前是否有线程在等待获取锁,true表示有
  • getQueueLength():返回当前等待获取锁的线程的数量
  • isLocked():返回一个布尔值表示这个锁当前是否被一个线程持有,true表示被持有
  • isFair():返回一个布尔值表示这个锁是否时公平锁,true表示公平
  • getHoldCount():返回当前线程获取该锁的次数,这里的次数实际是指锁内部计数器的当前值
  • isHeldByCurrentThread():返回一个布尔值表示当前线程是否正持有该锁

范例实现

在这个范例中,咱们调用了一些方法去监控锁的状态
MyLock(定制锁,用于方便咱们调用ReentrantLock中的部分被保护方法):编程

package day09.code_1;

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public class MyLock extends ReentrantLock {

    //获取当前持有锁的线程的名称
    public String getOwnerName() {
        //判断当前是否有线程持有锁
        if (getOwner() == null) {
            return "None";
        }
        //得到当前持有锁的线程的名称
        return getOwner().getName();
    }

    public Collection<Thread> getThreads() {
        //返回当前等待获取锁的线程集合
        return getQueuedThreads();
    }
}
复制代码

Task(任务类,在此范例中非重点):数组

package day09.code_1;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Task implements Runnable {

    private Lock lock;

    //经过构造方法初始化锁
    public Task(Lock lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        //循环5次
        for (int i = 0; i < 5; i++) {
            //获取锁
            lock.lock();
            //打印当前线程持有锁的信息
            System.out.printf("%s: Get the Lock\n",
                    Thread.currentThread().getName());
            try {
                //休眠500毫秒
                Thread.sleep(500);
                //打印当前线程释放锁的信息
                System.out.printf("%s: Free the Lock\n",
                        Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放锁
                lock.unlock();
            }
        }
    }
}
复制代码

main方法:缓存

package day09.code_1;

import java.util.Collection;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立锁对象
        MyLock lock = new MyLock();
        //建立线程数组
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            //建立任务并将锁传入
            Task task = new Task(lock);
            //建立线程
            threads[i] = new Thread(task);
            //开启线程
            threads[i].start();
        }
        //循环15次
        for (int i = 0; i < 15; i++) {
            //打印提示信息
            System.out.printf("Main: Logging the Lock\n");
            System.out.printf("**************************\n");
            //打印当前持有锁的线程名称
            System.out.printf("Lock: Owner : %s\n", lock.getOwnerName());
            //打印当前是否有线程正等待获取锁
            System.out.printf("Lock: Queued Threads: %s\n",
                    lock.hasQueuedThreads());
            //若是存在线程等待获取锁
            if (lock.hasQueuedThreads()) {
                //打印等待获取锁的线程的数量
                System.out.printf("Lock: Queue Length: %d\n",
                        lock.getQueueLength());
                //提示信息前缀
                System.out.printf("Lock: Queued Threads: ");
                //获取等待获取锁的线程集合
                Collection<Thread> lockedThreads = lock.getThreads();
                //遍历集合打印线程的名称
                for (Thread lockedThread : lockedThreads) {
                    System.out.printf("%s ", lockedThread.getName());
                }
                //换行
                System.out.printf("\n");
            }
            //打印锁的公平性
            System.out.printf("Lock: Fairness: %s\n", lock.isFair());
            //打印锁是否被某个线程持有
            System.out.printf("Lock: Locked: %s\n", lock.isLocked());
            System.out.printf("**************************\n");
            //休眠1秒
            TimeUnit.SECONDS.sleep(1);
        }
    }
}
复制代码

2.监控Phaser类

Phaser类是一个功能十分强大的线程同步辅助类,它不只能够将并发任务统一地分阶段执行,还能够在程序中动态的更改phaser对象的任务注册数。Phaser类也提供了许多方法来帮助开发者监控phaser对象的状态,以下:并发

  • getPhase():返回phaser对象的当前阶段(阶段是从0开始的)
  • getRegisteredParties():返回phaser对象上所同步(注册)的任务数
  • getArrivedParties():返回已结束当前阶段并进行等待的任务数
  • getUnarrivedParties():返回未结束当前阶段的任务数

范例实现

Task(任务类):app

package day09.code_2;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Task implements Runnable {

    //休眠的时间
    private int time;

    //phaser对象
    private Phaser phaser;

    //经过构造函数赋值
    public Task(int time, Phaser phaser) {
        this.time = time;
        this.phaser = phaser;
    }

    @Override
    public void run() {
        //通知phaser对象线程以完成当前阶段并直接向下执行
        phaser.arrive();
        //打印进入第一阶段提示信息
        System.out.printf("%s: Entering phase 1\n",
                Thread.currentThread().getName());
        //休眠
        try {
            TimeUnit.SECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印线程完成第一阶段提示信息
        System.out.printf("%s: Finishing phase 1\n",
                Thread.currentThread().getName());
        //通知phaser对象线程以完成当前阶段并等待
        phaser.arriveAndAwaitAdvance();
        //打印进入第二阶段提示信息
        System.out.printf("%s: Entering phase 2\n",
                Thread.currentThread().getName());
        //休眠
        try {
            TimeUnit.SECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印线程完成第二阶段提示信息
        System.out.printf("%s: Finishing phase 2\n",
                Thread.currentThread().getName());
        //通知phaser对象线程以完成当前阶段并等待
        phaser.arriveAndAwaitAdvance();
        //打印进入第三阶段提示信息
        System.out.printf("%s: Entering phase 3\n",
                Thread.currentThread().getName());
        //休眠
        try {
            TimeUnit.SECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印完成第三阶段提示信息
        System.out.printf("%s: Finishing phase 3\n",
                Thread.currentThread().getName());
        //通知phaser对象线程以完成当前阶段并取消注册
        phaser.arriveAndDeregister();
    }
}
复制代码

main方法:框架

package day09.code_2;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立phaser对象
        Phaser phaser = new Phaser(3);
        //循环建立三个任务并开启线程运行它们
        for (int i = 0; i < 3; i++) {
            //经过循环次数为其设置不一样的休眠时间
            Task task = new Task(i + 1, phaser);
            Thread thread = new Thread(task);
            thread.start();
        }
        //循环十次
        for (int i = 0; i < 10; i++) {
            System.out.printf("********************\n");
            //打印提示信息
            System.out.printf("Main: Phaser Log\n");
            //打印phaser的当前阶段
            System.out.printf("Main: Phaser: Phase: %d\n",
                    phaser.getPhase());
            //打印在phaser对象上注册的任务数
            System.out.printf("Main: Phaser: Registered Parties: %d\n",
                    phaser.getRegisteredParties());
            //打印已结束当前阶段的任务数
            System.out.printf("Main: Phaser: Arrived Parties: %d\n",
                    phaser.getArrivedParties());
            //打印未结束当前阶段的任务数
            System.out.printf("Main: Phaser: Unarrived Parties: %d\n",
                    phaser.getUnarrivedParties());
            //休眠1秒
            TimeUnit.SECONDS.sleep(1);
        }
    }
}
复制代码

3.监控执行器框架

ThreadPoolExecutor类为咱们提供了以下方法来获取执行器的状态信息:dom

  • getCorePoolSize():获取线程池的核心线程数
  • getPoolSize():获取线程池中的实际线程数
  • getActiveCount():获取线程池中正在执行任务的线程数
  • getTaskCount():获取计划执行的任务数,一般来讲等于提交的任务数
  • getCompletedTaskCount():获取已经执行完成的任务数
  • isShutdown():当调用执行器的shutdown()方法后,此方法的返回值会变为true
  • isTerminating():当执行器正在关闭但还未完成时,次方法返回true
  • isTerminated():当执行器已经关闭时,此方法返回true

范例实现

Task(任务类):异步

package day09.code_3;

import java.util.concurrent.TimeUnit;

public class Task implements Runnable {

    //休眠时间
    private long milliseconds;

    public Task(long milliseconds) {
        //经过构造函数为休眠时间赋值
        this.milliseconds = milliseconds;
    }

    @Override
    public void run() {
        //打印任务开始执行的提示信息
        System.out.printf("%s: Begin\n",
                Thread.currentThread().getName());
        //休眠
        try {
            TimeUnit.MILLISECONDS.sleep(milliseconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印任务执行结束的提示信息
        System.out.printf("%s: End\n",
                Thread.currentThread().getName());
    }
}
复制代码

main方法:ide

package day09.code_3;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立一个缓存线程池
        ThreadPoolExecutor executor = (ThreadPoolExecutor)
                Executors.newCachedThreadPool();
        //建立随机数生成器
        Random random = new Random();
        //循环十次
        for (int i = 0; i < 10; i++) {
            //建立任务并将随机数做为其休眠时间
            Task task = new Task(random.nextInt(10000));
            //向执行器发送任务
            executor.submit(task);
        }
        //循环5次
        for (int i = 0; i < 5; i++) {
            //调用showLog方法打印线程池的信息
            showLog(executor);
            //休眠1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭线程池
        executor.shutdown();
        //循环5次
        for (int i = 0; i < 5; i++) {
            //调用showLog方法打印线程池的信息
            showLog(executor);
            //休眠1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //等待线程池执行完全部任务后关闭
        executor.awaitTermination(1, TimeUnit.DAYS);
        //打印程序结束提示信息
        System.out.printf("Main: End of the program\n");
    }

    private static void showLog(ThreadPoolExecutor executor) {
        //准备打印线程池相关信息
        System.out.printf("***********************\n");
        System.out.printf("Main: Executor Log\n");
        //打印线程池的核心线程数
        System.out.printf("Main: Executor: Core Pool Size: %d\n",
                executor.getCorePoolSize());
        //打印线程池的实际线程数
        System.out.printf("Main: Executor: Pool Size: %d\n",
                executor.getPoolSize());
        //打印线程池中正在执行任务的线程数
        System.out.printf("Main: Executor: Active Count: %d\n",
                executor.getActiveCount());
        //打印提交的任务数(计划执行的任务数)
        System.out.printf("Main: Executor: Task Count: %d\n",
                executor.getTaskCount());
        //打印执行器已执行完成的线程数
        System.out.printf("Main: Executor: Completed Task Count: %d\n",
                executor.getCompletedTaskCount());
        //打印执行器是否关闭
        System.out.printf("Main: Executor: Shutdown: %s\n",
                executor.isShutdown());
        //打印执行器是否正在终止
        System.out.printf("Main: Executor: Terminating: %s\n",
                executor.isTerminating());
        //打印执行器是否已经终止
        System.out.printf("Main: Executor: Terminated: %s\n",
                executor.isTerminated());
        System.out.printf("***********************\n");
    }
}
复制代码

4.监控Fork/Join池

ForkJoinPool类为咱们提供了以下方法来获取线程池的状态信息:

  • getParallelism():获取线程池的并行级数
  • getPoolSize():获取线程池内工做者线程的数量
  • getActiveThreadCount():获取正在执行任务的线程数
  • getRunningThreadCount():获取当前正在工做且未被任何机制阻塞(包括等待子任务执行完毕)的线程数
  • getQueuedSubmissionCount():获取已提交但未被执行过的任务数
  • getQueuedTaskCount():获取已提交且已开始执行的任务数
  • hasQueuedSubmissions():返回一个布尔值表示是否有未开始执行的任务等待
  • getStealCount():获取窃取的工做数
  • isTerminated():返回一个布尔值表示线程池是否已经关闭

范例实现

Task(任务类):

package day09.code_4;

import java.util.concurrent.RecursiveAction;

public class Task extends RecursiveAction {

    //数组
    private int[] array;

    //任务搜索的起始、终止 位置
    private int start, end;

    public Task(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        //若是任务过大
        if (end - start > 100) {
            //进行拆解
            int mid = (start + end) / 2;
            //建立新任务
            Task task1 = new Task(array, start, mid);
            Task task2 = new Task(array, mid, end);
            //异步执行任务
            task1.fork();
            task2.fork();
            //等待任务执行结束
            task1.join();
            task2.join();
        } else {
            //在指定范围内遍历数组
            for (int i = start; i < end; i++) {
                //自增
                array[i]++;
                //休眠5毫秒
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
复制代码

main方法:

package day09.code_4;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立线程池
        ForkJoinPool pool = new ForkJoinPool();
        //建立数组
        int[] array = new int[10000];
        //建立任务
        Task task = new Task(array, 0, 10000);
        //异步执行
        pool.execute(task);
        //在任务执行结束前不断循环
        while (!task.isDone()) {
            //打印线程池的信息
            showLog(pool);
            //休眠1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭线程池
        pool.shutdown();
        //等待线程池执行完全部任务
        pool.awaitTermination(1, TimeUnit.DAYS);
        //打印线程池信息
        showLog(pool);
        //打印程序结束提示信息
        System.out.printf("Main: End of the program\n");
    }

    private static void showLog(ForkJoinPool pool) {
        //打印线程池提示信息
        System.out.printf("***********************\n");
        System.out.printf("Main: Fork/Join Pool log\n");
        //打印线程池并行级数
        System.out.printf("Main: Fork/Join Pool: Parallelism: %d\n",
                pool.getParallelism());
        //打印线程池内部实际线程数量
        System.out.printf("Main: Fork/Join Pool: Pool Size: %d\n",
                pool.getPoolSize());
        //打印正在执行任务的线程数
        System.out.printf("Main: Fork/Join Pool: Active Thread Count: %d\n",
                pool.getActiveThreadCount());
        //打印正在工做且未被阻塞的线程数
        System.out.printf("Main: Fork/Join Pool: Running Thread Count: %d\n",
                pool.getRunningThreadCount());
        //打印已提交但未被执行过的任务数
        System.out.printf("Main: Fork/Join Pool: Queued Submission: %d\n",
                pool.getQueuedSubmissionCount());
        //打印已提交且已开始执行的任务数
        System.out.printf("Main: Fork/Join Pool: Queued Tasks: %d\n",
                pool.getQueuedTaskCount());
        //打印一个布尔值表示是否有未开始执行的等待任务
        System.out.printf("Main: Fork/Join Pool: Queued Submissions: %s\n",
                pool.hasQueuedSubmissions());
        //打印任务窃取次数
        System.out.printf("Main: Fork/Join Pool: Steal Count: %d\n",
                pool.getStealCount());
        //打印线程池是否已经关闭
        System.out.printf("Main: Fork/Join Pool: Terminated: %s\n",
                pool.isTerminated());
        System.out.printf("***********************\n");
    }
}
复制代码

5.输出高效的日志信息

在程序中,咱们应当输出高效的日志信息,而不仅是将信息打印到控制台。Java为咱们提供了Logger类来方便的输出日志信息。一个日志器(Logger)主要有下面几个组件:

  1. Handler:处理器,处理器能够决定日志的写入目标和格式
  2. Name:名称,一般状况下日志记录器的名称是其余类的包名加类名
  3. Level:日志记录器和日志信息均关联着一个级别,记录器不会输出级别更低的日志信息

范例实现

MyFormatter(格式化类):

package day09.code_5;

import java.util.Date;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;

public class MyFormatter extends Formatter {
    @Override
    public String format(LogRecord record) {
        //建立字符串构造器
        StringBuilder sb = new StringBuilder();
        //拼接日志级别
        sb.append("[" + record.getLevel() + "] - ");
        //拼接日志生成时间
        sb.append(new Date(record.getMillis()) + " ");
        //拼接产生日志的类名和方法名
        sb.append(record.getSourceClassName() + " . " + record.getSourceMethodName());
        //拼接日志信息和换行符
        sb.append(" " + record.getMessage() + "\n");
        //返回相应的字符串
        return sb.toString();
    }
}
复制代码

MyLogger(自定义日志生成器):

package day09.code_5;

import java.io.IOException;
import java.util.logging.FileHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyLogger {

    //处理器
    private static Handler handler;

    //用于获取日志生成器的静态方法
    public static Logger getLogger(String name) {
        //调用Logger类的静态方法获取日志生成器
        Logger logger = Logger.getLogger(name);
        //设置日志级别未ALL,输出一切等级的日志
        logger.setLevel(Level.ALL);
        try {
            //若是没有处理器
            if (handler == null) {
                //建立文档处理器关联相关文件
                handler = new FileHandler("src/day09/code_5/recipe8.log");
                //建立格式化工具并赋值给处理器
                MyFormatter format = new MyFormatter();
                handler.setFormatter(format);
            }
            //若是日志生成器没有处理器,则添加
            if (logger.getHandlers().length == 0) {
                logger.addHandler(handler);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        //返回日志生成器
        return logger;
    }

}
复制代码

Task(任务类):

package day09.code_5;

import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class Task implements Runnable {

    @Override
    public void run() {
        //调用静态方法获取得日志生成器,并将当前类名做为参数传入
        Logger logger = MyLogger.getLogger(this.getClass().getName());
        //输出FINER级别的消息表示方法开始执行
        logger.entering(Thread.currentThread().getName(), "run()");
        //休眠两秒
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //输出FINER级别的消息表示方法执行结束
        logger.exiting(Thread.currentThread().getName(), "run()",
                Thread.currentThread());
    }
}
复制代码

main方法:

package day09.code_5;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Main {

    public static void main(String[] args) {
        //经过静态方法获取日志生成器
        Logger logger = MyLogger.getLogger("Core");
        //输出FINER级别的消息表示方法开始执行
        logger.entering("Core", "main()", args);
        //建立数组
        Thread[] threads = new Thread[5];
        //遍历数组
        for (int i = 0; i < threads.length; i++) {
            //输出INFO级别的日志
            logger.log(Level.INFO, "Launching thread: " + i);
            //建立任务和线程对象
            Task task = new Task();
            threads[i] = new Thread(task);
            //输出INFO级别的日志
            logger.log(Level.INFO, "Thread created: " + threads[i].getName());
            //开启线程
            threads[i].start();
        }
        //输出日志
        logger.log(Level.INFO, "Five Threads created.");
        logger.log(Level.INFO, "Waiting for its finalization");
        //等待任务运行结束
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
                //打印日志
                logger.log(Level.INFO, "Thread has finished its execution", threads[i]);
            } catch (InterruptedException e) {
                //打印出现异常的日志
                logger.log(Level.SEVERE, "Exeception", e);
            }

        }
        //打印方法结束的日志
        logger.exiting("Core", "main()");
    }

}
复制代码
相关文章
相关标签/搜索