【Java并发编程六】线程池

1、概述

  在执行并发任务时,咱们能够把任务传递给一个线程池,来替代为每一个并发执行的任务都启动一个新的线程,只要池里有空闲的线程,任务就会分配一个线程执行。在线程池的内部,任务被插入一个阻塞队列(BlockingQueue),线程池里的线程会去取这个队列里的任务。编程

  利用线程池有三个好处:数组

  1. 下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的消耗
  2. 提升响应速度。当任务到达时,任务能够不须要的等到线程建立就能当即执行
  3. 提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控

2、线程池的使用

  一、建立线程池

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
  TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

  建立一个线程池须要的几个参数:缓存

  • corePoolSize:线程池的基本大小,当提交一个任务到线程池,线程池会建立一个线程来执行任务,即便其余空闲的基本线程可以执行新任务也会建立线程,等到须要执行的任务数大于corePoolSize时就再也不建立。若是调用了线程池的prestartAllCoreThreads方法,线程池会提早建立并启动全部基本线程。
  • maximumPoolSize:线程池最大大小,线程池容许建立的最大线程数,若是队列满了,而且已建立的线程数小于最大线程数,则线程池会再建立新的线程执行任务。
  • keepAliveTime:线程活动保持时间,线程池的工做线程空闲后,保持存活的时间。
  • unit:线程活动保持时间的单位
  • workQueue:任务队列,用于保存等待执行的任务的阻塞队列,能够选择如下几个队列:

一、ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。并发

二、LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量一般要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。ide

三、SynchronousQueue:一个不存储元素的阻塞队列。每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。工具

四、PriorityBlockingQueue:一个具备优先级得无限阻塞队列。this

  • threadFactory:用于设置建立线程的工厂,能够经过线程工程给每一个建立出来的线程设置更有意义的名字。
  • handler:饱和策略,当线程池和队列都满了,说明线程池处于饱和状态,那么必须采起一种策略处理提交的新任务。这个策略默认状况下是AbortPolicy,表示没法处理新任务时抛出异常。

  此外,咱们还能够经过调用Ececutors中的某个静态工厂方法来建立一个线程池(它们的内部实现原理都是相同的,仅仅是使用了不一样的工做队列或线程池大小):spa

  newFixedThreadPool:建立一个定长的线程池,每当提交一个任务就建立一个线程,直到达到池的最大长度,这时线程池会保持长度不在变化线程

  newCachedThreadPool:建立一个可缓存的线程池,若是当前的线程池的长度超过了处理的须要时,它能够灵活的回收空闲的线程,当需求增长时,它能够灵活的添加新的线程,并不会对池的长度作任何限制rest

  newSingleThreadPool:建立一个单线程化的executor,它只会建立惟一的工做者线程来执行任务

  newScheduledThreadPool:建立一个定长的线程池,并且支持定时的以及周期性的任务执行,相似于Timer

  二、向线程池提交任务

  可使用execute向线程池提交任务:

public class Test2
{
    public static void main(String[] args)
    {
        BlockingQueue<Runnable> workQueue=new LinkedBlockingDeque<Runnable>();
        ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, workQueue);
        poolExecutor.execute(new Task1());
        poolExecutor.execute(new Task2());
     poolExecutor.shutdown(); } }
class Task1 implements Runnable { public void run() { System.out.println("执行任务1"); } } class Task2 implements Runnable { public void run() { System.out.println("执行任务2"); } }

  也可使用submit方法来提交任务,它会返回一个future,咱们能够经过这个future来判断任务是否执行成功,经过future的get方法获取返回值,get方法会阻塞直到任务完成。

public class Test3
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<String>> resultList = new ArrayList<Future<String>>();   
        // 建立10个任务并执行
        for (int i = 0; i < 10; i++)
        {
            // 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
            Future<String> future = executorService.submit(new TaskWithResult(i));
            resultList.add(future);
        }
        for (Future<String> future : resultList)
        {
            while (!future.isDone());// Future返回若是没有完成,则一直循环等待,直到Future返回完成
            {
                System.out.println(future.get()); // 打印各个线程(任务)执行的结果
            }
        }
        executorService.shutdown();
    }
}
class TaskWithResult implements Callable<String>
{
    private int id;
    public TaskWithResult(int id)
    {
        this.id = id;
    }
    public String call() throws Exception
    {
        return "执行结果"+id;
    }
}

 

  三、线程关闭

  能够经过调用线程池的shutdown或shutdownNow方法来关闭线程池,可是它们的实现原理不一样,shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。shutdownNow会首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表。

  只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于咱们应该调用哪种方法来关闭线程池,应该由提交到线程池的任务特性决定,一般调用shutdown来关闭线程池,若是任务不必定要执行完,则能够调用shutdownNow。

3、线程池的执行过程

  整个ThreadExecutor的任务处理通过下面4个步骤,以下图所示:

  

  一、若是当前的线程数<corePoolSize,提交的Runnable任务,会直接做为new Thread的参数,当即执行,当提交的任务数超过了corePoolSize,就进入第二部操做

  二、将当前的任务提交到BlockingQueue阻塞队列中,若是Block Queue是个有界队列,当队列满了以后就进入第三步

  三、若是poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,执行对应的runnable任务

  四、若是第三步也没法处理,就会用RejectedExecutionHandler来作拒绝处理

4、执行定时及周期性任务

  Timer工具管理任务的定时以及周期性执行。示例代码以下:

public class TimerTest
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任务1执行时间:"+sdf.format(new Date()));
                try
                {
                    Thread.sleep(3000);//模拟任务1执行时间为3秒
                }
                catch (InterruptedException e)
                {
                    // TODO 自动生成的 catch 块
                    e.printStackTrace();
                }
            }
        };
        System.out.println("当前时间:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),4000); //间隔4秒周期性执行
    }
}

  执行结果:

  

  能够看到上述任务1以4秒为间隔周期性执行。可是Timer存在一些缺陷,主要是下面两个方面的问题:

  缺陷1Timer只建立惟一的线程的来执行全部的Timer任务,若是一个time任务的执行很耗时,会致使其余的TimeTask的时效准确性出问题。看下面的例子:

public class TimerTest
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任务1执行时间:"+sdf.format(new Date()));
                try
                {
                    Thread.sleep(10000);
                }
                catch (InterruptedException e)
                {
                    // TODO 自动生成的 catch 块
                    e.printStackTrace();
                }
            }
        };
        TimerTask timerTask2=new TimerTask()
        {
            @Override
            public void run()
            {
                
                System.out.println("任务2执行时间:"+sdf.format(new Date()));
            }
        };
        System.out.println("当前时间:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),1000); //间隔1秒周期性执行
        timer.schedule(timerTask2, new Date(),4000); //间隔4秒周期性执行
    }
}

  执行结果:

  

  缺陷2:若是TimeTask抛出未检查的异常,Timer将产生没法预料的行为。Timer线程并不捕获线程,全部TimerTask抛出的未检查的异常会终止timer线程。看下面的代码:

public class TimerTest2
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任务1执行时间:"+sdf.format(new Date()));
                throw new RuntimeException();
            }
        };
        
        TimerTask timerTask2=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任务2执行时间:"+sdf.format(new Date()));
            }
        };
        System.out.println("当前时间:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),1000); //周期1秒执行任务1
        timer.schedule(timerTask2, new Date() ,3000); //周期3秒执行任务2
        
    }
}

  执行结果为:

  

  

  针对上述的两个问题,咱们可使用ScheduledThreadPoolExecutor来做为Timer的替代。

  针对问题1,有下面代码:

public class ScheduledThreadPoolExecutorTest
{
    final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args)
    {
        TimerTask timerTask1 = new TimerTask()
        {

            @Override
            public void run()
            {
                System.out.println("任务1执行时间:" + sdf.format(new Date()));
                try
                {
                    Thread.sleep(10000);
                } catch (InterruptedException e)
                {
                    // TODO 自动生成的 catch 块
                    e.printStackTrace();
                }
            }
        };
        TimerTask timerTask2 = new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任务2执行时间:" + sdf.format(new Date()));
            }
        };
        System.out.println("当前时间:" + sdf.format(new Date()));
        ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
        poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000,TimeUnit.MILLISECONDS);
        poolExecutor.scheduleAtFixedRate(timerTask2,  0, 4000,TimeUnit.MILLISECONDS);
    }
}

  执行的结果为:

  

  针对问题2,有下面代码:

public class ScheduledThreadPoolExecutorTest2
{
    final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args)
    {
        TimerTask timerTask1 = new TimerTask()
        {

            @Override
            public void run()
            {
                System.out.println("任务1执行时间:" + sdf.format(new Date()));
                throw new RuntimeException();
            }
        };
        TimerTask timerTask2 = new TimerTask()
        {
            @Override
            public void run()
            {

                System.out.println("任务2执行时间:" + sdf.format(new Date()));
            }
        };
        System.out.println("当前时间:" + sdf.format(new Date()));
        ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
        poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000, TimeUnit.MILLISECONDS);  
        poolExecutor.scheduleAtFixedRate(timerTask2, 0, 2000, TimeUnit.MILLISECONDS);
    }
}

  执行结果为:

  

5、参考资料

  一、Java并发编程实践

相关文章
相关标签/搜索