Java ThreadPoolExecutor 线程池

Executors

Executors 是一个Java中的工具类. 提供工厂方法来建立不一样类型的线程池.java

clipboard.png

从上图中也能够看出, Executors的建立线程池的方法, 建立出来的线程池都实现了 ExecutorService接口. 经常使用方法有如下几个:apache

  • newFixedThreadPool(int Threads): 建立固定数目线程的线程池, 超出的线程会在队列中等待.
  • newCachedThreadPool(): 建立一个可缓存线程池, 若是线程池长度超过处理须要, 可灵活回收空闲线程(60秒), 若无可回收,则新建线程.
  • newSingleThreadExecutor(): 建立一个单线程化的线程池, 它只会用惟一的工做线程来执行任务, 保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行. 若是某一个任务执行出错, 将有另外一个线程来继续执行.
  • newScheduledThreadPool(int corePoolSize): 建立一个支持定时及周期性的任务执行的线程池, 多数状况下可用来替代Timer类.

Executors 例子

newCachedThreadPool

线程最大数为 Integer.MAX_VALUE, 当咱们往线程池添加了 n 个任务, 这 n 个任务都是一块儿执行的.数组

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

使用 execute 方法添加任务时要注意: execute 提交的方式只能提交一个 Runnable 的对象, 且该方法的返回值是 void, 而且当线程的执行过程当中抛出了异常一般来讲主线程也没法获取到异常的信息的, 只有经过 ThreadFactory 主动设置线程的异常处理类才能感知到提交的线程中的异常信息.缓存

若是想要获取返回信息可使用 submit 方法.多线程

newFixedThreadPool

ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newScheduledThreadPool

三秒执行一次, 只有执行完这一次后, 才会执行.ide

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(2000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

顺序执行各个任务, 第一个任务执行完, 才会执行下一个.函数

ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

Executors存在什么问题

clipboard.png

在阿里巴巴Java开发手册中提到,使用Executors建立线程池可能会致使OOM(OutOfMemory ,内存溢出),可是并无说明为何,那么接下来咱们就来看一下到底为何不容许使用Executors?工具

咱们先来一个简单的例子,模拟一下使用Executors致使OOM的状况.this

/**
 * @author Hollis
 */
public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executor.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            //do nothing
        }
    }
}

经过指定JVM参数:-Xmx8m -Xms8m 运行以上代码,会抛出OOM:spa

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

以上代码指出,ExecutorsDemo.java 的第16行,就是代码中的 executor.execute(new SubThread());

Java中的 BlockingQueue 主要有两种实现, 分别是 ArrayBlockingQueueLinkedBlockingQueue.

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列, 必须设置容量.

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列, 容量能够选择进行设置, 不设置的话, 将是一个无边界的阻塞队列, 最大长度为 Integer.MAX_VALUE.

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

这里的问题就出在若是咱们不设置 LinkedBlockingQueue 的容量的话, 其默认容量将会是 Integer.MAX_VALUE.

newFixedThreadPool 中建立 LinkedBlockingQueue 时, 并未指定容量. 此时, LinkedBlockingQueue 就是一个无边界队列, 对于一个无边界队列来讲, 是能够不断的向队列中加入任务的, 这种状况下就有可能由于任务过多而致使内存溢出问题.

newCachedThreadPoolnewScheduledThreadPool 这两种方式建立的最大线程数多是Integer.MAX_VALUE, 而建立这么多线程, 必然就有可能致使OOM.

ThreadPoolExecutor 建立线程池

避免使用 Executors 建立线程池, 主要是避免使用其中的默认实现, 那么咱们能够本身直接调用 ThreadPoolExecutor 的构造函数来本身建立线程池. 在建立的同时, 给 BlockQueue 指定容量就能够了.

ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));

这种状况下, 一旦提交的线程数超过当前可用线程数时, 就会抛出 java.util.concurrent.RejectedExecutionException, 这是由于当前线程池使用的队列是有边界队列, 队列已经满了便没法继续处理新的请求.

除了本身定义 ThreadPoolExecutor 外. 还有其余方法. 如apache和guava等.

四个构造函数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

int corePoolSize => 该线程池中核心线程数最大值
线程池新建线程的时候,若是当前线程总数小于corePoolSize, 则新建的是核心线程, 若是超过corePoolSize, 则新建的是非核心线程

核心线程默认状况下会一直存活在线程池中, 即便这个核心线程啥也不干(闲置状态).

若是指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 这个属性为 true, 那么核心线程若是不干活(闲置状态)的话, 超过必定时间(时长下面参数决定), 就会被销毁掉

很好理解吧, 正常状况下你不干活我也养你, 由于我总有用到你的时候, 但有时候特殊状况(好比我本身都养不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

long keepAliveTime
该线程池中非核心线程闲置超时时长

一个非核心线程, 若是不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉

若是设置 allowCoreThreadTimeOut = true, 则会做用于核心线程

TimeUnit unit

keepAliveTime的单位, TimeUnit是一个枚举类型, 其包括:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

BlockingQueue workQueue

一个阻塞队列, 用来存储等待执行的任务. 也就是说如今有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workQueue 中, 等待执行.

这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 通常来讲, 这里的阻塞队列有如下几种选择:

SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 若是全部线程都在工做怎么办? 那就*新建一个线程来处理这个任务!因此为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误, 使用这个类型队列的时候, maximumPoolSize 通常指定成 Integer.MAX_VALUE, 即无限大.

LinkedBlockingQueue: 这个队列接收到任务的时候, 若是当前线程数小于核心线程数, 则核心线程处理任务; 若是当前线程数等于核心线程数, 则进入队列等待. 因为这个队列最大值为 Integer.MAX_VALUE , 即全部超过核心线程数的任务都将被添加到队列中,这也就致使了 maximumPoolSize 的设定失效, 由于总线程数永远不会超过 corePoolSize.

ArrayBlockingQueue: 能够限定队列的长度, 接收到任务的时候, 若是没有达到 corePoolSize 的值, 则核心线程执行任务, 若是达到了, 则入队等候, 若是队列已满, 则新建线程(非核心线程)执行任务, 又若是总线程数到了maximumPoolSize, 而且队列也满了, 则发生错误.

DelayQueue: 队列内元素必须实现 Delayed 接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.

ThreadFactory threadFactory

它是ThreadFactory类型的变量, 用来建立新线程.

默认使用 Executors.defaultThreadFactory() 来建立线程. 使用默认的 ThreadFactory 来建立线程时, 会使新建立的线程具备相同的 NORM_PRIORITY 优先级而且是非守护线程, 同时也设置了线程的名称.

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略, 有如下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认).
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务, 可是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务, 而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务.

submit 方法

返回一个 Future 对象, 这个 Future 对象表明这线程的执行结果. 当主线程调用 Futureget 方法的时候会获取到从线程中返回的结果数据. 若是在线程的执行过程当中发生了异常, get 会获取到异常的信息.

submit 提交的方式有以下三种状况.

<T> Future<T> submit(Callable<T> task);

Callable 接口和 Runnable 接口的定义很相似, 只不过 Runnable 接口中是一个没有返回值的 run 方法, 而 Callable接口中是一个有返回值的 call 方法.

Future<?> submit(Runnable task);

也能够提交一个 Runable 接口的对象, 这样当调用 get 方法的时候, 若是线程执行成功会直接返回 null, 若是线程执行异常会返回异常的信息.

<T> Future<T> submit(Runnable task, T result);

这个接口就比较有意思了, 除了 task 以外还有一个 result 对象, 当线程正常结束的时候调用 Futureget 方法会返回 result 对象, 当线程抛出异常的时候会获取到对应的异常的信息.

值得注意的是:

submit 在执行过程当中与 execute 不同, 不会抛出异常而是把异常保存在成员变量中, 在 FutureTask.get 阻塞获取的时候再把异常抛出来.

execute 直接抛出异常以后线程就死掉了, submit 保存异常线程没有死掉, 所以 execute 的线程池可能会出现没有意义的状况, 由于线程没有获得重用. 而 submit 不会出现这种状况.

线程池好处

"在线程池中执行任务" 比 "为每一个任务分配一个线程" 优点更多. 经过重用现有的线程而不是建立新线程, 能够在处理多个请求时分摊在线程建立和销毁过程当中产生的巨大开销.

另外一个额外的好处是, 当请求到达时, 工做线程一般已经存在, 所以不会因为等待建立线程而延迟任务执行, 从而提升了响应性.

经过适当调整线程池打的大小, 能够建立足够多的线程以便使处理器保持忙碌状态, 同时还能够防止过多线程相互竞争资源而使应用程序耗尽内存或失败.

相关文章
相关标签/搜索