【强制】线程池不容许使用Executors去建立,而是经过ThreadPoolExecutor的方式,这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。以上是《阿里巴巴》java开发手册中关于并发的强制规定,为何作这个规定?一下是我我的解惑和作出的相关延伸,此篇做为我我的学习笔记的开篇。java
一.为何要手动建立:
1.单例线程池web
Executor executor = Executors.newSingleThreadExecutor();
2.可扩容线程池设计模式
Executor executor1 = Executors.newCachedThreadPool();
3.定长线程池安全
Executor executor2 = Executors.newFixedThreadPool(10);
线程池的工做原理以下:并发
以上是三种平时常见的线程池,下面看建立源码:jvm
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
都是和ThreadPoolExecutor相关,ThreadPoolExecutor构造函数:函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
采用Executor建立线程池会出现资源耗尽即OOM错误,主要出问题的地方是阻塞队列的问题即workQueue。
1.看newSingleThreadExecutor建立源码,线程池核心线程数和最大线程数都是1,剩下多余的线程任务全都塞到LinkedBlockingQueue<Runnable>()中,如下是LinkedBlockingQueue<Runnable>()无参构造函数源码,高并发
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
Integer.MAX_VALUE=2147483647,根据线程池的工做原理,若是自动建立线程池即表示不限制LinkedBlockingQueue的长度,这个阻塞队列在高并发的状况下可能会撑爆线程池。
2.看newFixedThreadPool建立源码,和1同理。
3.看newCachedThreadPool源码,此线程池容许最多Integer.MAX_VALUE个线程的建立,工做队列是SynchronousQueue,如下是SynchronousQueue建立源码:学习
/** * Creates a {@code SynchronousQueue} with nonfair access policy. */ public SynchronousQueue() { this(false); } /** * Creates a {@code SynchronousQueue} with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
看注释,SynchronousQueue在这里是一个非公平的栈,这里SynchronousQueue的问题不深究,往后会专门学习jdk中阻塞队列。
newCachedThreadPool线程池的大小由线程池最大线程数控制,自动建立的线程池极可能建立很是多的线程撑爆内存。
以上简单从源码角度回答问题缘由。
自动建立线程池就是直接使用Executors建立,弊端以上是简单分析,可是看源码,手动建立线程池就是脱了一层壳直接new ThreadPoolExecutor 而已,本质同样的。ui
二.ThreadFactory
看上面ThreadPoolExecutor的构造函数源码,我发现了ThreadFactory。~~~~
咱们建立一个task,交付给Executor,Executor会帮咱们建立一个线程去执行这个任务。ThreadFactory就是一个建立线程的工厂。
先看源码
public interface ThreadFactory { /** * Constructs a new {@code Thread}. Implementations may also initialize * priority, name, daemon status, {@code ThreadGroup}, etc. * * @param r a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ Thread newThread(Runnable r); } /** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
ThreadFactory是个接口,DefaultThreadFactory是JUC包中对此接口的实现。
先简单看下这个工厂中的newThread方法,入参实现了Runnable接口,方法里有Thread的构造函数,两个if判断,默认状况下是非守护线程,线程优先级是5。线程的名字就是1的数据累加,在根据jvm是否开启了安全管理器(jvm默认关闭)来决定线程组。若是咱们不自定一个ThreadFactory,那么不管是采用自动建立线程池Executor,仍是手动建立线程池ThreadPoolExecutor,都是由DefaultThreadFactory来建立线程。
手动建立线程和自动建立线程有什么不同的吗?最粗略简单的不同就是线程名字的不同,不一样业务须要的线程池的线程取不一样的名字,这样在须要用到线程池的复杂业务状况下,日志会清爽不少,在实际开发中,这个优点很重要。
guava提供一个ThreadFactoryBuilder能够帮助咱们实现本身的ThreadFactory。
ThreadFactoryBuilder中核心功能源码
private static ThreadFactory build(ThreadFactoryBuilder builder) { final String nameFormat = builder.nameFormat; final Boolean daemon = builder.daemon; final Integer priority = builder.priority; final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler; final ThreadFactory backingThreadFactory = builder.backingThreadFactory != null ? builder.backingThreadFactory : Executors.defaultThreadFactory(); final AtomicLong count = nameFormat != null ? new AtomicLong(0L) : null; return new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread thread = backingThreadFactory.newThread(runnable); if (nameFormat != null) { thread.setName(ThreadFactoryBuilder.format(nameFormat, count.getAndIncrement())); } if (daemon != null) { thread.setDaemon(daemon); } if (priority != null) { thread.setPriority(priority); } if (uncaughtExceptionHandler != null) { thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); } return thread; } }; }
build设计模式,咱们能够本身去定义线程名字,是否为守护线程,线程优先级,线程的异常处理器。若是要选择自定义ThreadFactory,强烈推荐ThreadFactoryBuilder。
三.submit和execute
task提交到线程池有两种方式
1.submit 2.execute
demo:
public static void testSubmitAndExecute() throws ExecutionException, InterruptedException { ThreadFactoryBuilder orderThreadFactoryBuilder = new ThreadFactoryBuilder(); ThreadFactory orderThread = orderThreadFactoryBuilder.setNameFormat("---order---").build(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 30, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), orderThread); for (int i = 0; i < 5; i++) { int fina = i; Runnable runnable = () -> { if (fina == 3) { throw new RuntimeException("demo exception"); } else { System.out.println("runnable---" + fina + "---" + Thread.currentThread().getName()); System.out.println(); } }; // threadPoolExecutor.submit(runnable); threadPoolExecutor.execute(runnable); // Future<?> submit = threadPoolExecutor.submit(runnable); // System.out.println(submit.get()); } }
若是采用了execute执行,那么结果就是
发现当某个线程出现异常的时候,其实并不会影响到其余线程的执行。这里是直接抛出了一个异常。也能够经过ThreadFactory来定义Thread本身的UncaughtExceptionHandler属性,即实现UncaughtExceptionHandler接口的类。
改为submit形式处理:
Future<?> submit = threadPoolExecutor.submit(runnable); System.out.println(submit.get());
发现get会拿到一个返回结果,这里是null,这表示线程执行成功没有异常,并且会卡住,即当第i=3的线程异常以后,程序就抛出异常中止了。这是和execute不同的地方。
改为submit形式处理2:
threadPoolExecutor.submit(runnable);
看日志彻底看不到异常,明明是有异常抛出的。没有异常打印,可是不影响其余线程的运行。
这里简单记录下实际线程池提交的不一样状况。
写到目前为止,从手册中的一条规则开始思考,看源码,简单了解了为何,主要是跟线程池的工做队列有关,线程池中的线程怎么来的,找到了ThreadFactory,
1.newCachedThreadPool的SynchronousQueue这个须要学习2.ThreadGroup须要学习,System.getSecurityManager()java安全器须要去了解。3.线程池submit和execute两种方式在源码层析的学习。4.Thread,runnable,callable这三种的本质不一样和联系。