Java线程池了解一下

前言

立刻就要过年了,还在岗位上坚守“swimming”的小伙伴们顶住。博主给你们带来一篇线程池的基本使用解解闷。java

为何须要使用线程池

一、减小线程建立与切换的开销编程

  • 在没有使用线程池的时候,来了一个任务,就建立一个线程,咱们知道系统建立和销毁工做线程的开销很大,并且频繁的建立线程也就意味着须要进行频繁的线程切换,这都是一笔很大的开销。

二、控制线程的数量设计模式

  • 使用线程池咱们能够有效地控制线程的数量,当系统中存在大量并发线程时,会致使系统性能剧烈降低。

线程池作了什么

重复利用有限的线程缓存

  • 线程池中会预先建立一些空闲的线程,他们不断的从工做队列中取出任务,而后执行,执行完以后,会继续执行工做队列中的下一个任务,减小了建立和销毁线程的次数,每一个线程均可以一直被重用,变了建立和销毁的开销。

线程池的使用

其实经常使用Java线程池本质上都是由ThreadPoolExecutor或者ForkJoinPool生成的,只是其根据构造函数传入不一样的实参来实例化相应线程池而已。并发

Executors

Executors是一个线程池工厂类,该工厂类包含以下集合静态工厂方法来建立线程池:dom

  • newFixedThreadPool():建立一个可重用的、具备固定线程数的线程池
  • newSingleThreadExecutor():建立只有一个线程的线程池
  • newCachedThreadPool():建立一个具备缓存功能的线程池
  • newWorkStealingPool():建立持有足够线程的线程池来支持给定的并行级别的线程池
  • newScheduledThreadPool():建立具备指定线程数的线程池,它能够在指定延迟后执行任务线程

ExecutorService接口

对设计模式有了解过的同窗都会知道,咱们尽可能面向接口编程,这样对程序的灵活性是很是友好的。Java线程池也采用了面向接口编程的思想,能够看到ThreadPoolExecutorForkJoinPool全部都是ExecutorService接口的实现类。在ExecutorService接口中定义了一些经常使用的方法,而后再各类线程池中均可以使用ExecutorService接口中定义的方法,经常使用的方法有以下几个:ide

  • 向线程池提交线程
    • Future<?> submit():将一个Runnable对象交给指定的线程池,线程池将在有空闲线程时执行Runnable对象表明的任务,该方法既能接收Runnable对象也能接收Callable对象,这就意味着sumbit()方法能够有返回值。
    • void execute(Runnable command):只能接收Runnable对象,意味着该方法没有返回值。
  • 关闭线程池
    • void shutdown():阻止新来的任务提交,对已经提交了的任务不会产生任何影响。(等待全部的线程执行完毕才关闭)
    • List<Runnable> shutdownNow(): 阻止新来的任务提交,同时会中断当前正在运行的线程,另外它还将workQueue中的任务给移除,并将这些任务添加到列表中进行返回。(立马关闭)
  • 检查线程池的状态
    • boolean isShutdown():调用shutdown()或shutdownNow()方法后返回为true。
    • boolean isTerminated():当调用shutdown()方法后,而且全部提交的任务完成后返回为true;当调用shutdownNow()方法后,成功中止后返回为true。

常见线程池使用示例

1、newFixedThreadPool

线程池中的线程数目是固定的,无论你来了多少的任务。函数

示例代码性能

public class MyFixThreadPool {

    public static void main(String[] args) throws InterruptedException {
        // 建立一个线程数固定为5的线程池
        ExecutorService service = Executors.newFixedThreadPool(5);

        System.out.println("初始线程池状态:" + service);

        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println("线程提交完毕以后线程池状态:" + service);

        service.shutdown();//会等待全部的线程执行完毕才关闭,shutdownNow:立马关闭
        System.out.println("是否所有线程已经执行完毕:" + service.isTerminated());//全部的任务执行完了,就会返回true
        System.out.println("是否已经执行shutdown()" + service.isShutdown());
        System.out.println("执行完shutdown()以后线程池的状态:" + service);

        TimeUnit.SECONDS.sleep(5);
        System.out.println("5秒钟事后,是否所有线程已经执行完毕:" + service.isTerminated());
        System.out.println("5秒钟事后,是否已经执行shutdown()" + service.isShutdown());
        System.out.println("5秒钟事后,线程池状态:" + service);
    }

}
复制代码

运行结果:this

初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕以后线程池状态:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
是否所有线程已经执行完毕:false
是否已经执行shutdown():true
执行完shutdown()以后线程池的状态:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
5秒钟事后,是否所有线程已经执行完毕:true
5秒钟事后,是否已经执行shutdown():true
5秒钟事后,线程池状态:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]

程序分析

  • 当咱们建立好一个FixedThreadPool以后,该线程池就处于Running状态了,可是pool size(线程池线程的数量)、active threads(当前活跃线程) queued tasks(当前排队线程)、completed tasks(已完成的任务数)都是0
  • 当咱们把6个任务都提交给线程池以后,
    • pool size = 5:由于咱们建立的是一个固定线程数为5的线程池(注意:若是这个时候咱们只提交了3个任务,那么pool size = 3,说明线程池也是经过懒加载的方式去建立线程)。
    • active threads = 5:虽然咱们向线程池提交了6个任务,可是线程池的固定大小为5,因此活跃线程只有5个
    • queued tasks = 1:虽然咱们向线程池提交了6个任务,可是线程池的固定大小为5,只能有5个活跃线程同时工做,因此有一个任务在等待
  • 咱们第一次执行shutdown()的时候,因为任务尚未所有执行完毕,因此isTerminated()返回falseshutdown()返回true,而线程池的状态会由Running变为Shutting down
  • 从任务的运行结果咱们能够看出,名为pool-1-thread-2执行了两次任务,证实线程池中的线程确实是重复利用的。
  • 5秒钟后,isTerminated()返回trueshutdown()返回true,证实全部的任务都执行完了,线程池也关闭了,咱们再次检查线程池的状态[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6],状态已经处于Terminated了,而后已完成的任务显示为6
2、newSingleThreadExecutor

从头至尾整个线程池都只有一个线程在工做。

实例代码

public class SingleThreadPool {

    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 5; i++) {
            final int j = i;
            service.execute(() -> {
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    }

}
复制代码

运行结果

0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1

程序分析 能够看到只有pool-1-thread-1一个线程在工做。

3、newCachedThreadPool

来多少任务,就建立多少线程(前提是没有空闲的线程在等待执行任务,不然仍是会复用以前旧(缓存)的线程),直接你电脑能支撑的线程数的极限为止。

实例代码

public class CachePool {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println("初始线程池状态:" + service);

        for (int i = 0; i < 12; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println("线程提交完毕以后线程池状态:" + service);

        TimeUnit.SECONDS.sleep(50);
        System.out.println("50秒后线程池状态:" + service);

        TimeUnit.SECONDS.sleep(30);
        System.out.println("80秒后线程池状态:" + service);
    }

}
复制代码

运行结果

初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕以后线程池状态:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-8
pool-1-thread-9
pool-1-thread-12
pool-1-thread-7
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
50秒后线程池状态:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12]
80秒后线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]

程序分析

  • 由于咱们每一个线程任务至少须要500毫秒的执行时间,因此当咱们往线程池中提交12个任务的过程当中,基本上没有空闲的线程供咱们重复使用,因此线程池会建立12个线程。
  • 缓存中的线程默认是60秒没有活跃就会被销毁掉,能够看到在50秒钟的时候回,全部的任务已经完成了,可是线程池线程的数量仍是12。
  • 80秒事后,能够看到线程池中的线程已经所有被销毁了。
4、newScheduledThreadPool

能够在指定延迟后或周期性地执行线程任务的线程池。

ScheduledThreadPoolExecutor

  • newScheduledThreadPool()方法返回的实际上是一个ScheduledThreadPoolExecutor对象,ScheduledThreadPoolExecutor定义以下:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
复制代码
  • 能够看到,它仍是继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口,而ScheduledExecutorService也是继承了ExecutorService接口,因此咱们也能够像使用以前的线程池对象同样使用,只不过是该对象会额外多了一些方法用于控制延迟与周期:
    • public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit):指定callable任务将在delay延迟后执行
    • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):指定的command任务将在delay延迟后执行,并且已设定频率重复执行。(一开始并不会执行)
    • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit):建立并执行一个在给定初始延迟后首期启用的按期操做,随后在每个执行终止和下一次执行开始之间都存在给定的延迟。

示例代码

下面代码每500毫秒打印一次当前线程名称以及一个随机数字。

public class MyScheduledPool {

    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(() -> {
            System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000));
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}
复制代码
5、newWorkStealingPool

每一个线程维护着本身的队列,执行完本身的任务以后,会去主动执行其余线程队列中的任务。

示例代码

public class MyWorkStealingPool {

    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool(4);
        System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors());

        service.execute(new R(1000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));

        //因为产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
        System.in.read();
    }

    static class R implements Runnable {

        int time;

        R(int time) {
            this.time = time;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }
}
复制代码

运行结果

cpu核心:4 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-1

程序分析 ForkJoinPool-1-worker-1任务的执行时间是1秒,它会最早执行完毕,而后它会去主动执行其余线程队列中的任务。

6、ForkJoinPool
  • ForkJoinPool能够将一个任务拆分红多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPool提供了以下几个方法用于建立ForkJoinPool实例对象:

    • ForkJoinPool(int parallelism):建立一个包含parallelism个并行线程的ForkJoinPool,parallelism的默认值为Runtime.getRuntime().availableProcessors()方法的返回值
    • ForkJoinPool commonPool():该方法返回一个通用池,通用池的运行状态不会受shutdown()shutdownNow()方法的影响。
  • 建立了ForkJoinPool示例以后,就能够调用ForkJoinPoolsubmit(ForkJoinTask task)invoke(ForkJoinTask task)方法来执行指定任务了。其中ForkJoinTask(实现了Future接口)表明一个能够并行、合并的任务。ForkJoinTask是一个抽象类,他还有两个抽象子类:RecursiveActionRecursiveTask。其中RecursiveTask表明有返回值的任务,而RecursiveAction表明没有返回值的任务。

示例代码

下面代码演示了使用ForkJoinPool对1000000个随机整数进行求和。

public class MyForkJoinPool {

    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random random = new Random();

    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt(1000);
        }
        System.out.println(Arrays.stream(nums).sum());
    }

// static class AddTask extends RecursiveAction {
//
// int start, end;
//
// AddTask(int start, int end) {
// this.start = start;
// this.end = end;
// }
//
// @Override
// protected void compute() {
// if (end - start <= MAX_NUM) {
// long sum = 0L;
// for (int i = 0; i < end; i++) sum += nums[i];
// System.out.println("from:" + start + " to:" + end + " = " + sum);
// } else {
// int middle = start + (end - start) / 2;
//
// AddTask subTask1 = new AddTask(start, middle);
// AddTask subTask2 = new AddTask(middle, end);
// subTask1.fork();
// subTask2.fork();
// }
// }
// }

    static class AddTask extends RecursiveTask<Long> {

        int start, end;

        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            // 当end与start之间的差大于MAX_NUM,将大任务分解成两个“小任务”
            if (end - start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) sum += nums[i];
                return sum;
            } else {
                int middle = start + (end - start) / 2;

                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                // 并行执行两个“小任务”
                subTask1.fork();
                subTask2.fork();
                // 把两个“小任务”累加的结果合并起来
                return subTask1.join() + subTask2.join();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        forkJoinPool.execute(task);

        long result = task.join();
        System.out.println(result);

        forkJoinPool.shutdown();
    }
}
复制代码

额外补充

上面咱们说到过:其实经常使用Java线程池都是由ThreadPoolExecutor或者ForkJoinPool两个类生成的,只是其根据构造函数传入不一样的实参来生成相应线程池而已。那咱们如今一块儿来看看Executors中几个建立线程池对象的静态方法相关的源码:

ThreadPoolExecutor构造函数原型

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
复制代码

参数说明

  • corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就须要将新的Runnable放入到等待队列workQueue中了。
  • maximumPoolSize:线程池维护线程的最大数量,当大于了这个值就会将任务由一个丢弃处理机制来处理(固然也存在永远不丢弃任务的线程池,具体得看策略)。
  • keepAliveTime:线程空闲时的存活时间(当线程数大于corePoolSize时该参数才有效)[java doc中的是这样写的 :when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.]
  • unit:keepAliveTime的单位。
  • workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口。

执行任务的过程

  1. poolSize (当前实际须要使用的线程数) < corePoolSize,提交 Runnable 任务,会立马执行。
  2. 当提交的任务数超过了 corePoolSize ,会将当前的 Runnable 提交到一个 BlockingQueue 中。
  3. 有界队列满了以后,若是 poolSize < maximumPoolSize 时,会尝试new一个Thread进行急救处理,立马执行对应的Runnable任务。
  4. 若是第三步也没法处理了,就会走到第四步执行reject操做。

newFixedThreadPool

poolSize 和 maximumPoolSize 相等,使用无界队列存储,不管来多少任务,队列都能塞的下,因此线程池中的线程数老是 poolSize。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
复制代码

newSingleThreadExecutor

poolSize 和 maximumPoolSize 都为1,使用无界队列存储,不管来多少任务,队列都能塞的下,因此线程池中的线程数老是 1。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
复制代码

newCachedThreadPool

poolSize 为 0,来一个任务直接扔到队列中,使用SynchronousQueue存储(没有容量的队列),因此来来一个任务就得新建一个线程,maximumPoolSize 为 Integer.MAX_VALUE,能够当作是容许建立无限的线程。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
复制代码

newScheduledThreadPool

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
复制代码

newWorkStealingPool

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
复制代码

拉票环节

以为文章写得不错的朋友能够点赞、转发、加关注呀!大家的支持就是我最大的动力,笔芯!

相关文章
相关标签/搜索