JAVA线程池的原理

 

提交一个线程到线程池:java

1.判断核心线程是否都在执行任务,若是有空闲的或者没有被建立的,则建立去执行新的工做线程执行任务,若是核心线程都在执行,则下一步;缓存

2.线程池判断工做队列是否已满,没有满,则提交任务到工做等待队列,若是工做等待队列已满,则下一步;bash

3.判断线程池里的全部工做线程是否都在工做,若是没有,则建立一个新的工做线程执行任务,若是已满,则根据饱和策略处理任务。服务器

饱和执行策略:并发

1.AbortPolicy 拒绝新任务,直接抛出异常异步

2.DiscardPolicy 直接丢弃任务spa

3.DiscardOldestPolicy 丢弃队列中最老的任务 先将阻塞队列中的头元素出队抛弃,再尝试提交任务。线程

4.CallerRunsPolicy 将任务丢给调用线程执行rest

 

接口继承关系code

Java API对ExecutorService接口的实现有两个,因此这两个便是Java线程池具体实现类:

1. ThreadPoolExecutor
2. ScheduledThreadPoolExecutor

ExecutorService还继承了Executor接口(注意区分Executor接口和Executors工厂类),这个接口只有一个execute()方法,最后咱们看一下整个继承树

 

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
}

参数详情:

corePoolSize 核心线程池大小,在刚开始建立线程池时,核心线程建立完不会当即启动,有任务提交时才会启动达到核心线程数大小,若是一开始就要建立能够调用prestartAllCoreThreads。

maximumPoolSize 容许的最大线程数量,当核心线程满和阻塞队列满时才会判断最大线程数量,决定是否建立普通线程。

keepAliveTime 线程数大于核心线程数时,多余空闲线程存活时间

unit keepAliveTime的时间单位

workQueue 线程数量超过核心线程数时用于保存任务,包含三种类型的BlockingQueue:有界队列,无界队列,同步移交。

    可选择的BlockingQueue:

        1.LinkedBlockingQueue 无界队列,遵循先进先出,使用该队列作为阻塞队列时要尤为小心,当任务耗时较长时可能会致使大量新任务在队列中堆积最终致使OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,致使cpu和内存飙升服务器挂掉。

        2.ArrayBlockingQueue 有界队列 遵循先进先出,

        3.PriorityBlockingQueue 优先级队列,任务的优先级由Comparator决定。

        4.SychronousQueue 将任务直接移交给工做线程,不是一个真正的队列,是一种线程移交机制。必须有另外一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

 

threadPoolFactory 建立新线程时使用的工厂类

handler 阻塞队列已满,且线程数达到最大线程后所采起的饱和策略

 

ExecutorService的建立

Java给咱们提供了一个Executors工厂类,它能够帮助咱们很方便的建立各类类型ExecutorService线程池,Executors一共能够建立下面这四类线程池:

newCachedThreadPool:建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

初始化时核心线程数为0,若是线程池长度超过处理须要,可当即回收空闲线程,无可回收则新建。

要将一个元素放入SynchronousQueue中,必须有另外一个线程正在等待接收这个元素。所以即使SynchronousQueue一开始为空且大小为1,第一个任务也没法放入其中,由于没有线程在等待从SynchronousQueue中取走元素。所以第一个任务到达时便会建立一个新线程执行该任务。

 

newFixedThreadPool:建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }

固定大小线程数,使用无限大的LinkedBlockingQueue存听任务。

 

newScheduledThreadPool:建立一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

DelayedWorkQueue是一个无界阻塞队列,是ScheduledThreadPoolExecutor的静态内部类

newSingleThreadExecutor:建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
 }

首先new了一个线程数目为 1 的ScheduledThreadPoolExecutor,再把该对象传入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:

DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
} 

#父类:
DelegatedExecutorService(ExecutorService executor) { 
           e = executor; 
}

其实就是使用装饰模式加强了ScheduledExecutorService(1)的功能,不只确保只有一个线程顺序执行任务,也保证线程意外终止后会从新建立一个线程继续执行任务。

 

Executors只是一个工厂类,它全部的方法返回的都是ThreadPoolExecutorScheduledThreadPoolExecutor这两个类的实例。

ExecutorService的使用

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

executorService.shutdown();

ExecutorService的执行

ExecutorService有以下几个执行方法:

- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny(...)
- invokeAll(...)

execute(Runnable)

这个方法接收一个Runnable实例,而且异步的执行,请看下面的实例:

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

executorService.shutdown();

这个方法有个问题,就是没有办法获知task的执行结果。若是咱们想得到task的执行结果,咱们能够传入一个Callable的实例。

submit(Runnable)

submit(Runnable)execute(Runnable)区别是前者能够返回一个Future对象,经过返回的Future对象,咱们能够检查提交的任务是否执行完毕,请看下面执行的例子:

Future future = executorService.submit(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

future.get();  //returns null if the task has finished correctly.

若是任务执行完成,future.get()方法会返回一个null。注意,future.get()方法会产生阻塞。

submit(Callable)

submit(Callable)submit(Runnable)相似,也会返回一个Future对象,可是除此以外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,能够返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。请看下面实例:

Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
    System.out.println("Asynchronous Callable");
    return "Callable Result";
}
});

System.out.println("future.get() = " + future.get());

若是任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。

invokeAny(…)

invokeAny(...)方法接收的是一个Callable的集合,执行这个方法不会返回Future,可是会返回全部Callable任务中其中一个任务的执行结果。这个方法也没法保证返回的是哪一个任务的执行结果,反正是其中的某一个。请看下面实例:

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 2";
}
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 3";
}
});

String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();

invokeAll(…)

invokeAll(...)与 invokeAny(...)相似也是接收一个Callable集合,可是前者执行以后会返回一个Future的List,其中对应着每一个Callable任务执行后的Future对象。状况下面这个实例

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 1";
}
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 3";
}
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}

executorService.shutdown();

ExecutorService的关闭

当咱们使用完成ExecutorService以后应该关闭它,不然它里面的线程会一直处于运行状态。

举个例子,若是的应用程序是经过main()方法启动的,在这个main()退出以后,若是应用程序中的ExecutorService没有关闭,这个应用将一直运行。之因此会出现这种状况,是由于ExecutorService中运行的线程会阻止JVM关闭。

若是要关闭ExecutorService中执行的线程,咱们能够调用ExecutorService.shutdown()方法。在调用shutdown()方法以后,ExecutorService不会当即关闭,可是它再也不接收新的任务,直到当前全部线程执行完成才会关闭,全部在shutdown()执行以前提交的任务都会被执行。

若是咱们想当即关闭ExecutorService,咱们能够调用ExecutorService.shutdownNow()方法。这个动做将跳过全部正在执行的任务和被提交尚未执行的任务。可是它并不对正在执行的任务作任何保证,有可能它们都会中止,也有可能执行完成。

正确结束线程应该在线程内先使用break结束任务。

相关文章
相关标签/搜索