1.线程池的使用java
线程池通常配合队列一块儿工做,是线程池限制并发处理任务的数量。而后设置队列的大小,当任务超过队列大小时,经过必定的拒绝策略来处理,这样能够保护系统免受大流量而致使崩溃--只是部分拒绝服务,仍是有一部分是能够正常服务的。程序员
线程池通常有核心线程池大小和线程池最大大小配置,当线程池中的线程空闲一段时间时将会回收,而核心线程池中的线程不会被回收。算法
多少个线程合适呢?建议根据实际业务状况来压测决定,或者根据利特法则来算出一个合理的线程池大小。Java提供了ExecutorService的几种实现:缓存
a.ThreadPoolExecutor:标准线程池。并发
b.newCachedThreadPool建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。this
c.newFixedThreadPool 建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。url
d.newScheduledThreadPool 建立一个定长线程池,支持定时及周期性任务执行。spa
e.newSingleThreadExecutor 建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。线程
f.ForkJoinPool:相似于ThreadPoolExecutor,可是使用work-stealing模式,其会为线程池中的每一个线程建立一个队列,从而用work-stealing(任务窃取)算法使得线程能够从其余线程队列里窃取任务来执行。即若是本身的任务处理完成了,则能够去忙碌的工做线程那里窃取任务执行。code
2.线程池简单分析
2.1 、建立单线程的线程池:newSingleThreadExecutor
ExecutorService executorService= Executors.newSingleThreadExecutor();
等价于
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
2.二、建立固定数量的线程池
ExecutorService executorService1= Executors.newFixedThreadPool(10);
等价于
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
注意:单线程的线程池与固定数量的线程池使用队列的策略是同样的,若是固定数量的线程池为1,则至关于单线程的线程池。
dubbo源码的使用:
// 文件缓存定时写入 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //执行代码 if (syncSaveFile) { doSaveProperties(version); } else { registryCacheExecutor.execute(new SaveProperties(version)); }
2.三、建立可缓存的线程池,初始大小为0,线程池最大大小为Integer.MAX_VALUE。其使用SynchronousQueue队列,一个没有数据缓冲的阻塞队列。对其执行put操做后必须等待take操做消费该数据,反之亦然。该线程池不限制最大大小,若是线程池有空闲则复用,不然会建立一个新线程。若是线程池中的线程空闲60秒,则将被回收。该线程默认最大大小为Integer.MAX_VALUE,请肯定必要后再使用该线程池。
ExecutorService executorService2= Executors.newCachedThreadPool();
等价于
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
dubbo源码:在集群的时候使用到了,由于并不知道,传递过来的集群参数是多少。
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true)); @SuppressWarnings("rawtypes") public Result invoke(final Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); for( final Invoker<T> invoker : invokers ) { Future<Result> future = executor.submit( new Callable<Result>() { public Result call() throws Exception { return invoker.invoke(new RpcInvocation(invocation, invoker)); } } ); results.put( invoker.getUrl().getServiceKey(), future ); }
2.四、支持延迟执行的线程池,其使用DelayedWorkQueue实现任务延迟。
ExecutorService executorService3= Executors.newScheduledThreadPool(10);
等价于
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
使用的案例:
Dubbo源码
//1.检测并链接注册中心,使用的是newScheduledThreadPool //定义一个全局的线程池: // 定时任务执行器 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // 失败重试定时器,定时检查是否有请求失败,若有,无限次重试 private final ScheduledFuture<?> retryFuture; public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并链接注册中心 try { retry(); } catch (Throwable t) { // 防护性容错 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
2.五、work-stealing线程池,默认为并行行数为Runtime.getRuntime().availableProcessors()
ExecutorService executorService4= Executors.newWorkStealingPool(2);
等价于
return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
3.线程池终止
线程池不在使用记得中止线程,能够调用shutdown以确保不接受新任务,并等待线程池中任务处理完成后再退出,或调用shutdownNow清除未执行任务,并用Thread.interrupt中止正在执行的任务。而后调用awaitTermination方法等待终止操做执行完成。
static ExecutorService executorService3= Executors.newScheduledThreadPool(10); public static void main(String[] args) { executorService3.shutdown(); try { executorService3.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
总结:在使用线程池时务必须设置池大小、队列大小并设置相应的拒绝策略(RejectedExcutionHandler)。线程池执行状况下没法捕获堆栈上下文,所以任务要记录相关参数,以方便定位提交任务的源头及定位引发问题的源头。
4.ThreadPoolExecutor六个核心参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
4.一、corePoolSize
核心池的大小。在建立了线程池以后,默认状况下,线程池中没有任何线程,而是等待有任务到来才建立线程去执行任务。默认状况下,在建立了线程池以后,线程池钟的线程数为0,当有任务到来后就会建立一个线程去执行任务。
4.二、maximumPoolSize
池中容许的最大线程数,这个参数表示了线程池中最多能建立的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续建立线程以处理任务,maximumPoolSize表示的就是wordQueue满了,线程池中最多能够建立的线程数量。
4.三、keepAliveTime
只有当线程池中的线程数大于corePoolSize时,这个参数才会起做用。当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间。
4.四、unit
keepAliveTime时间单位。
4.五、workQueue
存储还没来得及执行的任务。
4.六、threadFactory
执行程序建立新线程时使用的工厂。
4.七、handler
因为超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
总结:上面的内容,其余应该都相对比较好理解,只有corePoolSize和maximumPoolSize须要多思考。这里要特别再举例以四条规则解释一下这两个参数:
一、池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程
二、池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程
三、池中线程数大于等于corePoolSize,workQueue已满,可是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务
四、池中线程数大于大于corePoolSize,workQueue已满,而且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务
ThreadPoolExecutor的使用很简单,前面的代码也写过例子了。经过execute(Runnable command)方法来发起一个任务的执行,经过shutDown()方法来对已经提交的任务作一个有效的关闭。尽管线程池很好,但咱们要注意JDK API的一段话:
强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,能够进行线程自动回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预约义了设置。
因此,跳开对ThreadPoolExecutor的关注(仍是那句话,有问题查询JDK API),重点关注一下JDK推荐的Executors。
4.八、四种拒绝策略
所谓拒绝策略以前也提到过了,任务太多,超过maximumPoolSize了怎么把?固然是接不下了,接不下那只有拒绝了。拒绝的时候能够指定拒绝策略,也就是一段处理程序。
决绝策略的父接口是RejectedExecutionHandler,JDK自己在ThreadPoolExecutor里给用户提供了四种拒绝策略,看一下:
一、AbortPolicy
直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略。
二、CallerRunsPolicy
尝试直接运行被拒绝的任务,若是线程池已经被关闭了,任务就被丢弃了。
三、DiscardOldestPolicy
移除最晚的那个没有被处理的任务,而后执行被拒绝的任务。一样,若是线程池已经被关闭了,任务就被丢弃了。
四、DiscardPolicy
不能执行的任务将被删除。