Java多线程之Executor框架和手写简易的线程池

目录

线程池

什么是线程池

线程池一种线程使用模式,线程池会维护多个线程,等待着分配可并发执行的任务,当有任务须要线程执行时,从线程池中分配线程给该任务而不用主动的建立线程。java

线程池的好处

若是在咱们平时若是须要用到线程时,咱们通常是这样作的:建立线程(T1),使用建立的线程来执行任务(T2),任务执行完成后销毁当前线程(T3),这三个阶段是必需要有的。api

而若是使用线程池呢?数组

线程池会预先建立好必定数量的线程,须要的时候申请使用,在一个任务执行完后也不须要将该线程销毁,很明显的节省了T1和T3这两阶段的时间。安全

同时咱们的线程由线程池来统一进行管理,这样也提升了线程的可管理性。bash

手写一个本身的线程池

如今咱们能够简单的理解为线程池实际上就是存放多个线程的数组,在程序启动是预先实例化必定得线程实例,当有任务须要时分配出去。如今咱们先来写一个本身的线程池来理解一下线程池基本的工做过程。服务器

线程池须要些什么?

首先线程池确定须要必定数量的线程,因此首先须要一个线程数组,固然也能够是一个集合。网络

线程数组是用来进行存放线程实例的,要使用这些线程就须要有任务提交过来。当任务量过大时,咱们是不可能在同一时刻给全部的任务分配一个线程的,因此咱们还须要一个用于存听任务的容器多线程

这里的预先初始化线程实例的数量也须要咱们来根据业务肯定。并发

同时线程实例的数量也不能随意的定义,因此咱们还须要设置一个最大线程数框架

//线程池中容许的最大线程数
    private static int MAXTHREDNUM = Integer.MAX_VALUE;
    //当用户没有指定时默认的线程数
    private  int threadNum = 6;
    //线程队列,存放线程任务
    private List<Runnable> queue;

    private WorkerThread[] workerThreads;
复制代码

线程池工做

线程池的线程通常须要预先进行实例化,这里咱们经过构造函数来模拟这个过程。

public MyThreadPool(int threadNum) {
        this.threadNum = threadNum;
        if(threadNum > MAXTHREDNUM)
            threadNum = MAXTHREDNUM;
        this.queue = new LinkedList<>();
        this.workerThreads = new WorkerThread[threadNum];
        init();
    }

    //初始化线程池中的线程
 private void init(){
    for(int i=0;i<threadNum;i++){
        workerThreads[i] = new WorkerThread();
        workerThreads[i].start();
    }
  }
复制代码

在线程池准备好了后,咱们须要像线程池中提交工做任务,任务统一提交到队列中,当有任务时,自动分发线程。

//提交任务
    public void execute(Runnable task){
        synchronized (queue){
            queue.add(task);
            //提交任务后唤醒等待在队列的线程
            queue.notifyAll();
        }
    }
复制代码

咱们的工做线程为了获取任务,须要一直监放任务队列,当队列中有任务时就由一个线程去执行,这里咱们用到了前面提到的安全中断。

private class WorkerThread extends Thread {

    private volatile boolean on = true;
    @Override
    public void run() {
        Runnable task = null;
        //判断是否能够取任务
        try {
            while(on&&!isInterrupted()){
                synchronized (queue){
                    while (on && !isInterrupted() && queue.isEmpty()) {
                        //这里若是使用阻塞队列来获取在执行时就不会报错
                        //报错是由于退出时销毁了全部的线程资源,不影响使用
                        queue.wait(1000);
                    }
                    if (on && !isInterrupted() && !queue.isEmpty()) {
                        task = queue.remove(0);
                    }

                    if(task !=null){
                        //取到任务后执行
                        task.run();
                    }
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        task = null;//任务结束后手动置空,加速回收
    }

    public void cancel(){
        on = false;
        interrupt();
    }
}
复制代码

固然退出时还须要对线程池中的线程等进行销毁。

//销毁线程池
    public void shutdown(){
        for(int i=0;i<threadNum;i++){
            workerThreads[i].cancel();
            workerThreads[i] = null;
        }
        queue.clear();
    }
复制代码

好了,到这里咱们的一个简易版的线程池就完成了,功能虽然很少可是线程池运行的基本原理差很少实现了,实际上很是简单,咱们来写个程序测试一下:

public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        // 建立3个线程的线程池
        MyThreadPool t = new MyThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(5);
        t.execute(new MyTask(countDownLatch, "testA"));
        t.execute(new MyTask(countDownLatch, "testB"));
        t.execute(new MyTask(countDownLatch, "testC"));
        t.execute(new MyTask(countDownLatch, "testD"));
        t.execute(new MyTask(countDownLatch, "testE"));
        countDownLatch.await();
        Thread.sleep(500);
        t.shutdown();// 全部线程都执行完成才destory
        System.out.println("finished...");
    }

    // 任务类
    static class MyTask implements Runnable {

        private CountDownLatch countDownLatch;
        private String name;
        private Random r = new Random();

        public MyTask(CountDownLatch countDownLatch, String name) {
            this.countDownLatch = countDownLatch;
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 执行任务
            try {
                countDownLatch.countDown();
                Thread.sleep(r.nextInt(1000));
                System.out.println("任务 " + name + " 完成");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
        }
    }
}

result:
任务 testA 完成
任务 testB 完成
任务 testC 完成
任务 testD 完成
任务 testE 完成
finished...
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at com.learn.threadpool.MyThreadPool$WorkerThread.run(MyThreadPool.java:75)
...
复制代码

从结果能够看到咱们提交的任务都被执行了,当全部任务执行完成后,咱们强制销毁了全部线程,因此会抛出异常。

JDK中的线程池

上面咱们实现了一个简易的线程池,稍微理解线程池的基本运做原理。如今咱们来认识一些JDK中提供了线程池吧。

ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService
复制代码

ThreadPoolExecutor是一个ExecutorService ,使用可能的几个合并的线程执行每一个提交的任务,一般使用Executors工厂方法配置,经过Executors能够配置多种适合不一样场景的线程池。

ThreadPoolExecutor中的主要参数
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 
复制代码
corePoolSize

线程池中的核心线程数,当外部提交一个任务时,线程池就建立一个新线程执行任务,直到当前线程数等于corePoolSize时再也不建立新线程; 若是当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行; 若是执行了线程池的prestartAllCoreThreads()方法,线程池会提早建立并启动全部核心线程。

maximumPoolSize

线程池中容许的最大线程数。若是当前阻塞队列已满,还在继续提交任务,则建立新的线程执行任务,前提是当前线程数小于maximumPoolSize。

keepAliveTime

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认状况下,线程通常不会被销毁,该参数只在线程数大于corePoolSize时才有用。

workQueue

workQueue必须是阻塞队列。当线程池中的线程数超过corePoolSize的时候,线程会进入阻塞队列进行等待。阻塞队列可使有界的也能够是无界的。

threadFactory

建立线程的工厂,经过自定义的线程工厂能够给每一个新建的线程设置一个线程名。Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-{数字}-thread-{数字}”。

RejectedExecutionHandler

线程池的饱和处理策略,当阻塞队列满了,且没有空闲的工做线程,若是继续提交任务,必须采起一种策略处理该任务,线程池提供了4种策略:

  • AbortPolicy:直接抛出异常,默认的处理策略
  • CallerRunsPolicy:使用调用者所属的线程来执行当前任务
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
  • DiscardPolicy:直接丢弃该任务 若是上述提供的处理策略没法知足业务需求,也能够根据场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
ThreadPoolExecutor中的主要执行流程

线程池

//图片来自网络

  1. 线程池判断核心线程池里的线程(corePoolSize)是否都在执行任务。若是不是,则建立一个新的工做线程来执行任务。若是核心线程池里的线程都在执行任务,则进入2。
  2. 线程池判断工做队列(workQueue)是否已满。若是工做队列没有满,则将新提交的任务存储在该队列里。若是工做队列满了,则进入3。
  3. 线程池判断线程池的线程(maximumPoolSize)是否都处于工做状态。若是没有,则建立一个新的工做线程来执行任务。若是已经满了,则交给饱和策略来处理这个任务。

这里须要注意的是核心线程池大小指得是corePoolSize参数,而线程池工做线程数指的是maximumPoolSize。

Executor

实际上咱们在使用线程池时,并不必定须要本身来定义上面介绍的参数的值,JDK为咱们提供了一个调度框架。经过这个调度框架咱们能够轻松的建立好线程池以及异步的获取任务的执行结果。

调度框架的组成

任务

通常是指须要被执行的任务,多为使用者提供。被提交的任务须要实现Runnable接口或Callable接口。

任务的执行

Executor是任务执行机制的核心接口,其将任务的提交和执行分离开来。ExecutorService继承了Executor并作了一些扩展,能够产生Future为跟踪一个或多个异步任务执行。任务的执行主要是经过实现了Executor和ExecutorService接口的类来进行实现。例如:ThreadPoolExecutor和ScheduledThreadPoolExecutor。

结果获取

对结果的获取能够经过Future接口以及其子类接口来实现。Future接口提供了一系列诸如检查是否就绪,是否执行完成,阻塞以及获取结果等方法。

Executors工厂中的线程池

FixedThreadPool
new ThreadPoolExecutor(nThreads, nThreads, 0L, 
                        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
复制代码

该线程池中corePoolSize和maximumPoolSize参数一致。同时使用无界阻塞队列,将会致使maximumPoolSize和keepAliveTime已经饱和策略无效,由于队列会一直接收任务,直到OOM。

SingleThreadExecutor
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>())
复制代码

该线程池中corePoolSize和maximumPoolSize都为1,表示始终只有一个线程在工做,适用于须要保证顺序地执行各个任务;而且在任意时间点,不会有多个线程是活动的应用场景。同时使用无界阻塞队列,当任务多时极有可能OOM。

CachedThreadPool
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>()
复制代码

CachedThreadPool类型的线程池corePoolSize为0,表示任务将会提交给队列,可是SynchronousQueue又是一个不包含任何容量的队列。因此每个任务提交过来都会建立一个新的线程来执行,该类型的线程池适用于执行不少的短时间异步任务的程序,或者是负载较轻的服务器。若是当任务的提交速度一旦超过任务的执行速度,在极端状况下可能会由于建立过多线程而耗尽CPU和内存资源。

ScheduledThreadPool

对于定时任务类型的线程池,Executor能够建立两种不一样类型的线程池:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor,前者是包含若干个线程的ScheduledThreadPoolExecutor,后者是只包含一个的ScheduledThreadPoolExecutor。

ScheduledThreadPoolExecutor适用于须要多个后台线程执行周期任务,同时为了知足资源管理的需求而须要限制后台线程的数量的应用场景。

SingleThreadScheduledExecutor适用于须要单个后台线程执行周期任务,同时须要保证顺序地执行各个任务的应用场景。

new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
复制代码

在对该类型线程池进行实例化时,咱们能够看到maximumPoolSize设置为了Integer的最大值,因此很明显在极端状况下和CachedThreadPool类型同样可能会由于建立过多线程而耗尽CPU和内存资源。

DelayedWorkQueue是一种延时阻塞队列,此队列的特色为其中元素只能在其延迟到期时才被使用。ScheduledThreadPool类型在执行任务时和其余线程池有些不一样。

  1. ScheduledThreadPool类型线程池中的线程(假设如今线程A开始取任务)从DelayedWorkQueue中取已经到期的任务。
  2. 线程A获取到任务后开始执行。
  3. 任务执行完成后设置该任务下一次执行的时间。
  4. 将该任务从新放入到线程池中。

ScheduledThreadPool中存在着定时任务和延时任务两种。

延时任务经过schedule(...)方法以及重载方法和scheduleWithFixedDelay实现,延时任务经过设置某个时间间隔后执行,schedule(...)仅执行一次。

定时任务由scheduleAtFixedRate实现。该方法建立并执行在给定的初始延迟以后,随后以给定的时间段进行周期性动做,即固定时间间隔的任务。

特殊的scheduleWithFixedDelay方法是建立并执行在给定的初始延迟以后首先启用的按期动做,随后在一个执行的终止和下一个执行的开始之间给定的延迟,即固定延时间隔的任务。

固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间是肯定的。对于scheduleAtFixedRate方法中,若任务处理时长超出设置的定时频率时长,本次任务执行完才开始下次任务,下次任务已经处于超时状态,会立刻开始执行。若任务处理时长小于定时频率时长,任务执行完后,定时器等待,下次任务会在定时器等待频率时长后执行。

固定延时间隔的任务是指每次执行完任务之后都等待一个固定的时间。因为操做系统调度以及每次任务执行的语句可能不一样,因此每次任务执行所花费的时间是不肯定的,也就致使了每次任务的执行周期存在必定的波动。

须要注意的是定时或延时任务中所涉及到时间、周期不能保证明时性及准确性,实际运行中会有必定的偏差。

Callable/Future

在介绍实现多线程的时候咱们有简单介绍过Runnable和Callable的,这二者基本相同,不一样在于Callable能够返回一个结果,而Runnable不返回结果。对于Callable接口的使用方法和Runnable基本相同,同时咱们也能够选择是否对结果进行接收处理。在Executors中提供了将Runnable转换为Callable的api:Callable<Object> callable(Runnable task)

Future是一个用于接收Runnable和Callable计算结果的接口,固然它还提供了查询任务状态,中断或者阻塞任务以及查询结果的能力。

boolean cancel(boolean mayInterruptIfRunning)  //尝试取消执行此任务。  
V get()  //等待计算完成,而后检索其结果。  
V get(long timeout, TimeUnit unit) //等待最多在给定的时间,而后检索其结果(若是可用)。  
boolean isCancelled() //若是此任务在正常完成以前被取消,则返回 true 。  
boolean isDone() //若是任务已完成返回true复制代码

FutureTask是对Future的基本实现,具备启动和取消计算的方法,查询计算是否完整,并检索计算结果。FutureTask对Future作了必定得扩展:

void run() //将此future设置为其计算结果,除非已被取消。  
protected boolean runAndReset()  //执行计算而不设置其结果,而后重置为初始状态,若是计算遇到异常或被取消,则不执行此操做。  
protected void set(V v) //将此Future的结果设置为给定值,除非Future已被设置或已被取消。  
protected void setException(Throwable t) //除非已经设置了此 Future 或已将其取消,不然它将报告一个 ExecutionException,并将给定的 throwable 做为其缘由。  
复制代码

FutureTask除了实现Future接口外,还实现了Runnable接口。因此FutureTask能够由Executor执行,也能够由调用线程直接执行futureTask.run()。

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将致使调用线程阻塞;

当FutureTask处于已完成状态时,执行FutureTask.get()方法将致使调用线程当即返回结果或抛出异常。

当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将致使此任务永远不会被执行;

当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来尝试中止该任务;

当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成)。

关因而否使用Executors

在以前阿里巴巴出的java开发手册中,有明确提出禁止使用Executors:

【强制】线程池不容许使用 Executors 去建立,而是经过 ThreadPoolExecutor 的方式, 这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。

在上面咱们分析过使用Executors建立的几种线程池的使用场景和缺点,大多数状况下出问题在于可能致使OOM,在我实际使用中基本没有遇到过这样的状况。可是考虑到阿里巴巴这样体量的并发请求,可能遇到这种状况的概率较大。因此咱们仍是应该根据实际状况考虑是否使用,固然实际遵循阿里巴巴开发手册来可能会更好一点,毕竟这是国类顶尖公司常年在生产中积累下的经验。

最后,在本节中只是简单介绍线程池及其基本原理,帮助更好的理解线程池。并不涉及具体如何使用。

相关文章
相关标签/搜索