使用 Executors,ThreadPoolExecutor,建立线程池,源码分析理解

以前建立线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 这四个方法。
固然 Executors 也是用不一样的参数去 new ThreadPoolExecutor 实现的,本文先分析前四种线程建立方式,后在分析 new ThreadPoolExecutor 建立方式java

使用 Executors 建立线程池

1.newFixedThreadPool()

因为使用了LinkedBlockingQueue因此maximumPoolSize没用,当corePoolSize满了以后就加入到LinkedBlockingQueue队列中。 每当某个线程执行完成以后就从LinkedBlockingQueue队列中取一个。 因此这个是建立固定大小的线程池。git

源码分析github

public static ExecutorService newFixedThreadPool(int nThreads) {
	return new ThreadPoolExecutor(
			nThreads,
			nThreads,
			0L,
			TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>());
}
复制代码

2.newSingleThreadPool()

建立线程数为1的线程池,因为使用了LinkedBlockingQueue因此maximumPoolSize 没用,corePoolSize为1表示线程数大小为1,满了就放入队列中,执行完了就从队列取一个。缓存

源码分析函数

public static ExecutorService newSingleThreadExecutor() {
	return new Executors.FinalizableDelegatedExecutorService
			(
					new ThreadPoolExecutor(
							1,
							1,
							0L,
							TimeUnit.MILLISECONDS,
							new LinkedBlockingQueue<Runnable>())
			);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
			Executors.defaultThreadFactory(), defaultHandler);
}
复制代码

3.newCachedThreadPool()

建立可缓冲的线程池。没有大小限制。因为corePoolSize为0因此任务会放入SynchronousQueue队列中,SynchronousQueue只能存放大小为1,因此会马上新起线程,因为maxumumPoolSizeInteger.MAX_VALUE因此能够认为大小为2147483647。受内存大小限制。oop

源码分析源码分析

public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(
			0,
			Integer.MAX_VALUE,
			60L,
			TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>());
}

public ThreadPoolExecutor(int corePoolSize,
						  int maximumPoolSize,
						  long keepAliveTime,
						  TimeUnit unit,
						  BlockingQueue<Runnable> workQueue) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
			Executors.defaultThreadFactory(), defaultHandler);
}
复制代码

使用 ThreadPoolExecutor 建立线程池

源码分析 ,ThreadPoolExecutor 的构造函数测试

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
	if (corePoolSize < 0 ||
			maximumPoolSize <= 0 ||
			maximumPoolSize < corePoolSize ||
			keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}
复制代码

构造函数参数

一、corePoolSize 核心线程数大小,当线程数 < corePoolSize ,会建立线程执行 runnableui

二、maximumPoolSize 最大线程数, 当线程数 >= corePoolSize的时候,会把 runnable 放入 workQueue中this

三、keepAliveTime 保持存活时间,当线程数大于corePoolSize的空闲线程能保持的最大时间。

四、unit 时间单位

五、workQueue 保存任务的阻塞队列

六、threadFactory 建立线程的工厂

七、handler 拒绝策略

任务执行顺序

一、当线程数小于 corePoolSize时,建立线程执行任务。

二、当线程数大于等于 corePoolSize而且 workQueue 没有满时,放入workQueue

三、线程数大于等于 corePoolSize而且当 workQueue 满时,新任务新建线程运行,线程总数要小于 maximumPoolSize

四、当线程总数等于 maximumPoolSize 而且 workQueue 满了的时候执行 handlerrejectedExecution。也就是拒绝策略。

四个拒绝策略

ThreadPoolExecutor默认有四个拒绝策略:

一、ThreadPoolExecutor.AbortPolicy() 直接抛出异常RejectedExecutionException

二、ThreadPoolExecutor.CallerRunsPolicy() 直接调用run方法而且阻塞执行

三、ThreadPoolExecutor.DiscardPolicy() 直接丢弃后来的任务

四、ThreadPoolExecutor.DiscardOldestPolicy() 丢弃在队列中队首的任务

固然能够本身继承RejectedExecutionHandler来写拒绝策略.

TestThreadPoolExecutor 示例

TestThreadPoolExecutor.java

package io.ymq.thread.TestThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:39 **/
public class TestThreadPoolExecutor {
    public static void main(String[] args) {

        long currentTimeMillis = System.currentTimeMillis();

        // 构造一个线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
        );

        for (int i = 1; i <= 10; i++) {
            try {
                String task = "task=" + i;
                System.out.println("建立任务并提交到线程池中:" + task);
                threadPool.execute(new ThreadPoolTask(task));

                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        try {
            //等待全部线程执行完毕当前任务。
            threadPool.shutdown();

            boolean loop = true;
            do {
                //等待全部线程执行完毕当前任务结束
                loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
            } while (loop);

            if (loop != true) {
                System.out.println("全部线程执行完毕");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("耗时:" + (System.currentTimeMillis() - currentTimeMillis));
        }


    }
}

复制代码

ThreadPoolTask.java

package io.ymq.thread.TestThreadPoolExecutor;

import java.io.Serializable;

/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:40 **/
public class ThreadPoolTask implements Runnable, Serializable {

    private Object attachData;

    ThreadPoolTask(Object tasks) {
        this.attachData = tasks;
    }

    public void run() {

        try {

            System.out.println("开始执行任务:" + attachData + "任务,使用的线程池,线程名称:" + Thread.currentThread().getName());

            System.out.println();

        } catch (Exception e) {
            e.printStackTrace();
        }
        attachData = null;
    }

}
复制代码

遇到java.util.concurrent.RejectedExecutionException

第一

你的线程池 ThreadPoolExecutor 显示的 shutdown() 以后,再向线程池提交任务的时候。 若是你配置的拒绝策略是 AbortPolicy 的话,这个异常就会抛出来。

第二

当你设置的任务缓存队列太小的时候,或者说, 你的线程池里面全部的线程都在干活(线程数== maxPoolSize),而且你的任务缓存队列也已经充满了等待的队列, 这个时候,你再向它提交任务,则会抛出这个异常。

响应

能够看到线程 pool-1-thread-1 到5 循环使用

建立任务并提交到线程池中:task=1
开始执行任务:task=1任务,使用的线程池,线程名称:pool-1-thread-1

建立任务并提交到线程池中:task=2
开始执行任务:task=2任务,使用的线程池,线程名称:pool-1-thread-2

建立任务并提交到线程池中:task=3
开始执行任务:task=3任务,使用的线程池,线程名称:pool-1-thread-3

建立任务并提交到线程池中:task=4
开始执行任务:task=4任务,使用的线程池,线程名称:pool-1-thread-4

建立任务并提交到线程池中:task=5
开始执行任务:task=5任务,使用的线程池,线程名称:pool-1-thread-5

建立任务并提交到线程池中:task=6
开始执行任务:task=6任务,使用的线程池,线程名称:pool-1-thread-1

建立任务并提交到线程池中:task=7
开始执行任务:task=7任务,使用的线程池,线程名称:pool-1-thread-2

建立任务并提交到线程池中:task=8
开始执行任务:task=8任务,使用的线程池,线程名称:pool-1-thread-3

建立任务并提交到线程池中:task=9
开始执行任务:task=9任务,使用的线程池,线程名称:pool-1-thread-4

建立任务并提交到线程池中:task=10
开始执行任务:task=10任务,使用的线程池,线程名称:pool-1-thread-5

全部线程执行完毕
耗时:1015
复制代码

测试代码

github github.com/souyunku/ym…

Contact

  • 做者:鹏磊
  • 出处:www.ymq.io
  • Email:admin@souyunku.com
  • 版权归做者全部,转载请注明出处
  • Wechat:关注公众号,搜云库,专一于开发技术的研究与知识分享

关注公众号-搜云库
搜云库
相关文章
相关标签/搜索