Java多线程ThreadPoolExecutor初探

我的博客项目地址java

但愿各位帮忙点个star,给我加个小星星✨git


在java中,使用线程时经过new Thread实现很简单,可是若是并发数量不少时,频繁地建立线程就会大大下降系统的效率。github

因此能够经过线程池,使得线程能够复用,每执行完一个任务,并非被销毁,而是能够继续执行其余任务。编程

花了两天时间去看了高洪岩写的《JAVA并发编程》,是想要知其然,知其因此然,在使用的状况下,了解学习了一下原理记录下java.util.concurrent并发包下的ThreadPoolExecutor特性和实现缓存


使用示例

粗暴点,咱们直接看如何使用吧多线程

(一)使用Executors

简单举个🌰:
Executors.newCachedThreadPool();        //建立一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //建立容量为1的缓冲池
Executors.newFixedThreadPool(int);    //建立固定容量大小的缓冲池
具体实现逻辑:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
	public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

	public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
复制代码

经过该Executors的静态方法进行线程池的建立,并且从具体实现来看,仍是调用了new ThreadPoolExecutor(),只是内部参数已经帮咱们配置好了。并发

(二) 使用ThreadPoolExecutor

既然真正实现都是用ThreadPoolExecutor,那就本身设定好方法的参数吧。ide

public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.HOURS, new LinkedBlockingDeque<>());

        for(int i=0;i<10;i++){
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                    executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();

    }

    static class MyTask implements Runnable {
        private int taskNum;

        public MyTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            System.out.println("正在执行task "+taskNum);
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"执行完毕");
        }
    }
复制代码

打印效果以下:函数

正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完别的任务数目:0
task 2执行完毕
task 0执行完毕
task 3执行完毕
task 1执行完毕
正在执行task 8
task 4执行完毕
正在执行task 7
正在执行task 6
正在执行task 5
正在执行task 9
task 8执行完毕
task 6执行完毕
task 7执行完毕
task 5执行完毕
task 9执行完毕
复制代码

任务Task提交以后,因为是多线程状态下,因此打印效果并非同步的,能够看出任务都已经顺利执行。学习

我这个实现参数是5个corePoolSize核心线程数和5个maximumPoolSize最大线程数,当线程池中的线程数超过5个的时候,将新来的任务放进缓存队列中,小伙伴能够试下把任务数(for循环的个数)提升一点,让缓存等待的任务数超过5个,看看默认的任务拒绝策略(AbortPolicy)会抛出什么错误hhh

下面来看看ThreadPoolExecutor的庐山真面目吧~


ThreadPoolExecutor

它有如下四个构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码

从构造方法能够看出,前三个方法最终都是调用第四个构造器进行初始化工做的。

参数解释:

  • corePoolSize:池中保持的线程数,包括空闲的线程,也就是核心池的大小
  • maximumPoolSize:池中锁容许最大线程数
  • keepAliveTime:当线程数量超过corePoolSize,在没有超过指定的时间内不从线程池中删除,若是超过该时间,则删除
  • unit:keepAliveTime的时间单位
  • workQueue:执行前用来保存任务的队列,此队列只保存由execute方法提交的Runnable任务

workQueue(任务队列,是一个阻塞队列)

ArrayBlockingQueue:
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

复制代码
LinkedBlockingDeque:(支持列头和列尾操做,pollFirst/pollLast)
public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }


    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

复制代码

从源码构造函数能够看到,不传参数的时候,默认阻塞队列中的大小是Integer.MAX_VALUE;

SynchronousQueue:
public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
复制代码

Array和Linked在传入大小小于0时将会报错,比较经常使用的是LinkedBlockingDeque和SynchronousQueue,线程池的排队策略与BlockingQueue有关

ThreadFactory:线程工厂

主要用来建立线程,能够在newThread()方法中自定义线程名字和设置线程异常状况的处理逻辑。

举个🌰:

static class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread();
            thread.setName("JingQ" + new Date());
            thread.setUncaughtExceptionHandler((t, e) -> {
                doSomething();
                e.printStackTrace();
            });
            return thread;
        }
    }
复制代码

handler:拒绝策略

有如下四种:

  • ThreadPoolExecutor.AbortPolicy:当任务添加到线程中被拒绝时,它会抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:任务被拒绝时,线程池丢弃被拒绝的任务
  • ThreadPoolExecutor.DiscardOldestPolicy:任务被拒绝时,线程池会放弃等待队列中最旧的未处理文物,而后将被拒绝的任务添加到等待队列中
  • ThreadPoolExecutor.CallerRunsPolicy:任务被拒绝时,会使用调用线程池的Thread线程对象处理被拒绝的任务

ThreadPoolExecutor继承结构

能够看出,实际上ThreadPoolExecutor是继承了AbstractExecutorService类和引用了ExecutorService、Executor接口。

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {
 
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}
复制代码

AbstarctExecutorService是一个抽象类,它实现的是ExecutorService接口

ExecutorService

public interface ExecutorService extends Executor {
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

接口ExecutorService引用了Executor接口,Executor接口比较简单,只有一个execute方法定义

Executor

public interface Executor {
    void execute(Runnable command);
}
复制代码

小结:

Executor是一个顶级接口,定义了一个execute方法,返回值为空,参数为Runnable。

ExecutorService继承了Executor而且定义了其它一些方法,结果以下图:

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的全部方法。

最后ThreadPoolExecutor继承了AbstractExecutorService,咱们最经常使用到它两个方法,submit和execute,下面介绍一下这二者:

execute():
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * (如下是我的渣翻译,有误请轻喷~) * 有如下三步流程: * * 1. 若是少于核心池大小的线程正在运行, * 那么尝试以给定的命令做为它的第一个任务启动一个新线程。 * 调用添加worker原子性检查运行状态和workder的数量, * 这样能够防止错误警报在不该该返回的状况下添加线程,返回false。 * * 2. 若是一个任务能够成功地排队,那么咱们仍然须要再次检查是否应该添加一个线程 * (由于现有的线程在上次检查后死亡),或者是在该方法进入后关闭了池。 * 所以,咱们从新检查状态,若是必要的话,若是中止的话,须要回滚队列。 * 若是没有新的线程,就去启动它 * * 3. 若是咱们不能排队任务,那么咱们尝试添加一个新线程。 * 若是失败了,咱们知道任务队列已经被关闭或饱和,因此拒绝这个任务。 */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
复制代码
submit:
	public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

   
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

复制代码

小结

execute()方法在ThreadPoolExecutor中进行了重写,submit()方法是在AbstractExecutorService实现的,ThreadPoolExecutor并无重写,而且execute方法是没有返回结果的,submit的返回类型是Future,可以得到任务的结果,可是实际执行的仍是execute方法。

固然,还有例如shutdown、getQueue、getActiveCount、getPoolSize等方法没有介绍到,推荐胖友们打开IDE进行查看吧~

ps:关于线程池的原理并未深刻记录,有关它的任务拒绝策略、线程初始化、ThreadPoolExecutor构造以后,当任务超过设定值,它的执行策略等原理都值得去深刻学习,下回记录~

相关文章
相关标签/搜索