Java多线程知识点整理(线程池)

1.线程池的使用java

    线程池通常配合队列一块儿工做,是线程池限制并发处理任务的数量。而后设置队列的大小,当任务超过队列大小时,经过必定的拒绝策略来处理,这样能够保护系统免受大流量而致使崩溃--只是部分拒绝服务,仍是有一部分是能够正常服务的。程序员

    线程池通常有核心线程池大小和线程池最大大小配置,当线程池中的线程空闲一段时间时将会回收,而核心线程池中的线程不会被回收。算法

    多少个线程合适呢?建议根据实际业务状况来压测决定,或者根据利特法则来算出一个合理的线程池大小。Java提供了ExecutorService的几种实现:缓存

    a.ThreadPoolExecutor:标准线程池。并发

    b.newCachedThreadPool建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。this

    c.newFixedThreadPool 建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。url

    d.newScheduledThreadPool 建立一个定长线程池,支持定时及周期性任务执行。spa

    e.newSingleThreadExecutor 建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。线程

    f.ForkJoinPool:相似于ThreadPoolExecutor,可是使用work-stealing模式,其会为线程池中的每一个线程建立一个队列,从而用work-stealing(任务窃取)算法使得线程能够从其余线程队列里窃取任务来执行。即若是本身的任务处理完成了,则能够去忙碌的工做线程那里窃取任务执行。code

2.线程池简单分析

 2.1 、建立单线程的线程池:newSingleThreadExecutor 

ExecutorService  executorService=  Executors.newSingleThreadExecutor();

等价于

return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

2.二、建立固定数量的线程池

ExecutorService  executorService1=  Executors.newFixedThreadPool(10);

等价于

return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

注意:单线程的线程池与固定数量的线程池使用队列的策略是同样的,若是固定数量的线程池为1,则至关于单线程的线程池。

dubbo源码的使用:

// 文件缓存定时写入
    private final ExecutorService registryCacheExecutor = 
Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));

//执行代码
if (syncSaveFile) {
      doSaveProperties(version);
 } else {
         registryCacheExecutor.execute(new SaveProperties(version));
 }

 

2.三、建立可缓存的线程池,初始大小为0,线程池最大大小为Integer.MAX_VALUE。其使用SynchronousQueue队列,一个没有数据缓冲的阻塞队列。对其执行put操做后必须等待take操做消费该数据,反之亦然。该线程池不限制最大大小,若是线程池有空闲则复用,不然会建立一个新线程。若是线程池中的线程空闲60秒,则将被回收。该线程默认最大大小为Integer.MAX_VALUE,请肯定必要后再使用该线程池。

ExecutorService  executorService2=  Executors.newCachedThreadPool();

等价于

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

dubbo源码:在集群的时候使用到了,由于并不知道,传递过来的集群参数是多少。

private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));
   

    @SuppressWarnings("rawtypes")
	public Result invoke(final Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);

        
        Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
        for( final Invoker<T> invoker : invokers ) {
            Future<Result> future = executor.submit( new Callable<Result>() {
                public Result call() throws Exception {
                    return invoker.invoke(new RpcInvocation(invocation, invoker));
                }
            } );
            results.put( invoker.getUrl().getServiceKey(), future );
        }

2.四、支持延迟执行的线程池,其使用DelayedWorkQueue实现任务延迟。

ExecutorService  executorService3=  Executors.newScheduledThreadPool(10);

等价于

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

使用的案例:

Dubbo源码

//1.检测并链接注册中心,使用的是newScheduledThreadPool
  //定义一个全局的线程池:
// 定时任务执行器
private final ScheduledExecutorService retryExecutor = 
Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

    // 失败重试定时器,定时检查是否有请求失败,若有,无限次重试
    private final ScheduledFuture<?> retryFuture;


public FailbackRegistry(URL url) {
        super(url);
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, 
Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // 检测并链接注册中心
                try {
                    retry();
                } catch (Throwable t) { // 防护性容错
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

2.五、work-stealing线程池,默认为并行行数为Runtime.getRuntime().availableProcessors()

ExecutorService  executorService4=  Executors.newWorkStealingPool(2);

等价于

return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

 

3.线程池终止

线程池不在使用记得中止线程,能够调用shutdown以确保不接受新任务,并等待线程池中任务处理完成后再退出,或调用shutdownNow清除未执行任务,并用Thread.interrupt中止正在执行的任务。而后调用awaitTermination方法等待终止操做执行完成。

static ExecutorService  executorService3=  Executors.newScheduledThreadPool(10);
	public static void main(String[] args) {
		executorService3.shutdown();
		try {
			executorService3.awaitTermination(30, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

总结:在使用线程池时务必须设置池大小、队列大小并设置相应的拒绝策略(RejectedExcutionHandler)。线程池执行状况下没法捕获堆栈上下文,所以任务要记录相关参数,以方便定位提交任务的源头及定位引发问题的源头。

4.ThreadPoolExecutor六个核心参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

4.一、corePoolSize

核心池的大小。在建立了线程池以后,默认状况下,线程池中没有任何线程,而是等待有任务到来才建立线程去执行任务。默认状况下,在建立了线程池以后,线程池钟的线程数为0,当有任务到来后就会建立一个线程去执行任务。

4.二、maximumPoolSize

池中容许的最大线程数,这个参数表示了线程池中最多能建立的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续建立线程以处理任务,maximumPoolSize表示的就是wordQueue满了,线程池中最多能够建立的线程数量。

4.三、keepAliveTime

只有当线程池中的线程数大于corePoolSize时,这个参数才会起做用。当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间。

4.四、unit

keepAliveTime时间单位。

4.五、workQueue

存储还没来得及执行的任务。

4.六、threadFactory

执行程序建立新线程时使用的工厂。

4.七、handler

因为超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

总结:上面的内容,其余应该都相对比较好理解,只有corePoolSize和maximumPoolSize须要多思考。这里要特别再举例以四条规则解释一下这两个参数:

一、池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程

二、池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程

三、池中线程数大于等于corePoolSize,workQueue已满,可是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务

四、池中线程数大于大于corePoolSize,workQueue已满,而且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务

ThreadPoolExecutor的使用很简单,前面的代码也写过例子了。经过execute(Runnable command)方法来发起一个任务的执行,经过shutDown()方法来对已经提交的任务作一个有效的关闭。尽管线程池很好,但咱们要注意JDK API的一段话:

强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,能够进行线程自动回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预约义了设置。

因此,跳开对ThreadPoolExecutor的关注(仍是那句话,有问题查询JDK API),重点关注一下JDK推荐的Executors。

4.八、四种拒绝策略

所谓拒绝策略以前也提到过了,任务太多,超过maximumPoolSize了怎么把?固然是接不下了,接不下那只有拒绝了。拒绝的时候能够指定拒绝策略,也就是一段处理程序。

决绝策略的父接口是RejectedExecutionHandler,JDK自己在ThreadPoolExecutor里给用户提供了四种拒绝策略,看一下:

一、AbortPolicy

直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略。

二、CallerRunsPolicy

尝试直接运行被拒绝的任务,若是线程池已经被关闭了,任务就被丢弃了。

三、DiscardOldestPolicy

移除最晚的那个没有被处理的任务,而后执行被拒绝的任务。一样,若是线程池已经被关闭了,任务就被丢弃了。

四、DiscardPolicy

不能执行的任务将被删除。

相关文章
相关标签/搜索