「Java并发编程」线程池相关知识点整理

为何要用线程池?

池化技术:减小每次获取资源的消耗,提升对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)。 每一个线程池还维护一些基本统计信息,例如已完成任务的数量。编程

使用线程池的好处:数组

  • 下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的消耗。
  • 提升响应速度。当任务到达时,任务能够不须要的等到线程建立就能当即执行。
  • 提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控。

线程池的实现原理?

execute方法源码

public void execute(Runnable command) {
        // 若是任务为null,则抛出异常。        if (command == null)
            throw new NullPointerException();        // ctl 中保存的线程池当前的一些状态信息  AtomicInteger        int c = ctl.get();        //判断当前线程池中执行的任务数量是否小于corePoolSize        if (workerCountOf(c) < corePoolSize) {
            //若是小于,则经过addWorker新建一个线程,而后,启动该线程从而执行任务。            if (addWorker(command, true))
                return;
            c = ctl.get();        }        //经过 isRunning 方法判断线程池状态        //线程池处于 RUNNING 状态才会被而且队列能够加入任务        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();            // 再次获取线程池状态,若是线程池状态不是 RUNNING 状态就须要从任务队列中移除任务。            // 并尝试判断线程是否所有执行完毕。同时执行拒绝策略。            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 若是当前线程池为空就新建立一个线程并执行。            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        //经过addWorker新建一个线程,并将任务(command)添加到该线程中;
        //而后,启动该线程从而执行任务。        //若是addWorker执行失败,则经过reject()执行相应的拒绝策略的内容。        else if (!addWorker(command, false))
            reject(command);
    }
线程池建立线程的时候,会将线程封装成工做线程Worker,Worker在执行完成任务以后,还会循环获取工做队列利的任务来执行。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//循环执行任务
                w.lock();                //若是线程池正在中止,确保线程被中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {
                    beforeExecute(wt, task);                    Throwable thrown = null;
                    try {
                        task.run();                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);                    }                } finally {
                    task = null;
                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);        }    }

「Java并发编程」线程池相关知识点整理

  1. 线程池判断核心线程池【corePoolSize】里的线程是否都在执行任务。若是不是,则建立一个新的工做线程来执行任务。若是核心线程池里的线程都在执行任务,则进入下个流程。
  2. 线程池判断工做队列【BlockingQueue】是否已经满。若是工做队列没有满,则将新提交的任务存储在这个工做队列里。若是工做队列满了,则进入下个流程。
  3. 线程池判断线程池【maximumPoolSize】的线程是否都处于工做状态。若是没有,则建立一个新的工做线程来执行任务。若是已经满了,则交给饱和策略【RejectedExecutionHandler.rejectedExecution()】来处理这个任务。

「Java并发编程」线程池相关知识点整理

如何使用线程池?并发

ThreadPoolExecutor重要分析

构造方法的重要参数

ThreadPoolExecutor方法的构造参数有不少,咱们看看最长的那个就能够了:框架

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:核心线程数定义了最小能够同时运行的线程数量
  • maximumPoolSize:当队列中存放的任务达到队列容量的时候,当前能够同时运行的线程数量变为最大线程数。【若是使用的无界队列,这个参数就没啥效果】
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,若是达到核心线程数的话,新任务就会被存放在队列中
  • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,若是这时没有新的任务提交,核心线程外的线程不会当即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
  • unit:keepAliveTime 的时间单位。
  • threadFactory:用于设置建立线程的工厂,能够经过线程工厂给每一个建立出来的线程设置更有意义的名字。
  • handler:饱和策略,当前同时运行的线程数量达到最大线程数量【maximumPoolSize】而且队列也已经被放满时,执行饱和策略。

线程池的简单使用

public class ThreadPoolTest {
    private static final int CORE_POOL_SIZE = 5; //核心线程数
    private static final int MAX_POOL_SIZE = 10; //最大线程数
    private static final int QUEUE_CAPACITY = 100; //任务队列的容量
    private static final Long KEEP_ALIVE_TIME = 1L; //等待时间
    public static void main(String[] args) {        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(                CORE_POOL_SIZE,                MAX_POOL_SIZE,                KEEP_ALIVE_TIME,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(QUEUE_CAPACITY),                new ThreadPoolExecutor.AbortPolicy());        for(int i = 0; i < 10 ; i ++){
            Runnable worker = new MyRunnable(""+ i); //建立任务
            threadPool.execute(worker); //经过execute提交
        }        threadPool.shutdown();        while(!threadPool.isTerminated()){
        }        System.out.println("Finished all threads");
    }}class MyRunnable implements Runnable {    private String command;    MyRunnable(String s) {
        this.command = s;
    }    @Override    public void run() {        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }    private void processCommand() {        try {            Thread.sleep(5000);
        } catch (InterruptedException e) {            e.printStackTrace();        }    }    @Override    public String toString() {        return this.command;
    }}

任务队列有哪些?

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO原则对元素进行排序。
  • LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序元素,吞吐量一般要高于ArrayBlockingQueue,Executors.newFixedThreadPool()就是使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQueue,Executors.newCachedThreadPool()就是使用了这个队列。
  • PriorityBlockingQueue:一个具备优先级的无限阻塞队列。

饱和策略有哪些呢?

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。【默认的饱和策略】
  • ThreadPoolExecutor.CallerRunsPolicy【提供可伸缩队列】:调用执行本身的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,若是执行程序已关闭,则会丢弃该任务。所以这种策略会下降对于新任务提交速度,影响程序的总体性能。若是您的应用程序能够承受此延迟而且你要求任何一个任务请求都要被执行的话,你能够选择这个策略。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最先的未处理的任务请求,并执行当前任务。public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }

固然,也能够根据须要自定义拒绝策略,须要实现RejectedExecutionHandler。ide

如何建立线程池?

1、使用ThreadPoolExecutor的各类构造方法。工具

2、经过Executor框架的工具类Executors能够建立三种类型的ThreadPoolExecutor。性能

《阿里巴巴 Java 开发手册》中强制线程池不容许使用 Executors 去建立,而是经过 ThreadPoolExecutor 的方式,这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险this

Executors 返回线程池对象的弊端以下: FixedThreadPool 和 SingleThreadExecutor: 容许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而致使 OOM。 CachedThreadPool 和 ScheduledThreadPool : 容许建立的线程数量为 Integer.MAX_VALUE ,可能会建立大量线程,从而致使 OOM。

执行execute方法和submit方法的区别?

  • execute()方法用于提交不须要返回值的任务,因此没法判断任务是否被线程池执行成功与否;
threadPool.execute(new Runnable() {
    @Override
    public void run() {
    }}); //经过execute提交
  • submit()方法用于提交须要返回值的任务。线程池会返回一个 Future 类型的对象,经过这个 Future 对象能够判断任务是否执行成功,而且能够经过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后当即返回,这时候有可能任务没有执行完。
Future<Object> future = threadPool.submit(hasReturnValueTask);
try{
    Object s = future.get();
}catch(InterruptedException e){
    //处理中断异常
}catch(ExecutionException e){
 //处理没法执行任务异常
}finally{
 threadPool.shutdown();
}

若是以为本文对你有帮助,能够点赞关注支持一下spa