二. 线程管理之线程池

不忘初心 砥砺前行, Tomorrow Is Another Day !html

相关文章

本文概要:java

  1. 认识Executor与ExecutorService
  2. 理解Future与FuturTask
  3. ThreadPoolExecutor介绍
  4. Executors工厂类

线程池的优势:bash

  • 重用线程,避免没必要要的对象建立和销毁.
  • 可有效控制最大并发线程数,提升系统资源的使用率.避免因线程过多强占系统资源致使阻塞.

一. 认识Executor与ExecutorService

  • 认识Executor

Executor做为线程池的顶级接口. 在Java的设计中,Runnable负责任务的提交. Executor负责任务的执行.将任务进行了解耦.多线程

public interface Executor {

    void execute(Runnable command);//执行已提交的 Runnable 任务对象
}
复制代码
  • 认识ExecutorService

ExecutorService接口继承了Executor接口,定义了一些生命周期的方法并发

public interface ExecutorService extends Executor {

   
    void shutdown();//顺次地关闭ExecutorService,中止接收新的任务,等待全部已经提交的任务执行完毕以后,关闭ExecutorService


    List<Runnable> shutdownNow();//阻止等待任务启动并试图中止当前正在执行的任务,中止接收新的任务,返回处于等待的任务列表


    boolean isShutdown();//判断线程池是否已经关闭

    boolean isTerminated();//若是关闭后全部任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,不然 isTerminated 永不为 true。

    
    boolean awaitTermination(long timeout, TimeUnit unit)//等待(阻塞)直到关闭或最长等待时间或发生中断,timeout - 最长等待时间 ,unit - timeout 参数的时间单位  若是此执行程序终止,则返回 true;若是终止前超时期满,则返回 false 

 
    <T> Future<T> submit(Callable<T> task);//提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。


    <T> Future<T> submit(Runnable task, T result);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。

 
    Future<?> submit(Runnable task);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//执行给定的任务,当全部任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的全部元素的 Future.isDone() 为 true。
        throws InterruptedException;


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)//执行给定的任务,当全部任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的全部元素的 Future.isDone() 为 true。
        throws InterruptedException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks)//执行给定的任务,若是在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消还没有完成的任务。
        throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

在ExecutorService中运用到了Future相关知识,下面对Futue作一个简单了解.异步

二. 理解Future与FutureTask

  • 理解Future

Future简单理解就是对异步任务的统计类,包含进行取消、查询是否完成、获取结果等操做.ide

Future类位于java.util.concurrent包下,对应源码.post

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    //表示若是在任务完成前被取消成功,则返回true
    boolean isCancelled();
    ////表示任务执行结束,不管是正常结束/中断/发生异常,都返回true
    boolean isDone();
    //用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
    V get() throws InterruptedException, ExecutionException;
    //用来获取执行结果,有超时机制,若是阻塞时间超过了指定时间,会抛出异常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
  • 理解FutureTask

FutureTask既实现了Future接口,又实现了Runnable接口.学习

对应源码ui

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

//构造方法
public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}

public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //回调callable的Call方法,获取异步任务返回值.
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            //...省略部分代码
        }
    }
复制代码

因此咱们能够经过Runnable接口实现线程,又能够经过Future接口获取线程执行完后的结果.

使用示例 上一篇文章Thread基础中已经使用过Future了,这里直接换成FutureTask的使用.

public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("子线程正在干活");
        Thread.sleep(3000);
        return "实现Callable接口,重写Call方法";
    }

    public static void main(String[] args) throws Exception {
        useFutureTask();
    }

    private static void  useFutureTask() throws InterruptedException, ExecutionException {
        MyCallable myCallable = new MyCallable();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        //定义一个Task,再提交任务.
        FutureTask<String> futureTask = new FutureTask<>(myCallable);
        executorService.submit(futureTask);

        executorService.shutdown();

        Thread.sleep(1000);//模拟正在干活
        System.out.println("主线程正在干活");
        //阻塞当前线程,等待返回结果.
        System.out.println("等待返回结果:" + futureTask.get());
        System.out.println("主线程全部的活都干完了");
    }
}

//调用输出
子线程正在干活
主线程正在干活
等待返回结果:实现Callable接口,重写Call方法
主线程全部的活都干完了
复制代码

说到这里,可能有些人和我同样迷糊,ExecutorService能够经过submit和execute进行执行任务.那么这二者区别是啥.咱们来简单的总结一下.

对应源码

//submit方法
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

//execute方法
void execute(Runnable command);
复制代码

从源码对比能够知道.

  • 接收的参数不同.execute仅能接收Runnable类型.
  • submit()有返回值,而execute()没有
  • submit()能够进行Exception处理.

经过总结会发现这其实就与前面介绍的实现Runable与Callable接口的特色相似.

三. ThreadPoolExecutor介绍

全部线程池都是经过ThreadPoolExecutor来建立.

对应源码

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
复制代码

参数解析

  • corePoolSize : 核心线程数
  • maximumPoolSize : 最大线程数
  • keepAliveTime : 非核心线程闲置时的超时时间.若是设置了allowCoreThreadTimeOut为true,那么也将做用于核心线程.
  • unit : 时间单位
  • workQueue : 任务队列.存放Runnable任务.

另外还有2个参数,通常咱们不须要去手动设定.

  • ThreadFactory : 线程工厂,建立线程用.
  • RejectedExecutionHandler : 任务队列已满或者没法成功执行任务时,会调用此handler的rejectedExecution方法抛出异常通知调用者,俗称饱和策略.

理解ThreadPoolExecutor处理的流程


线程池工做流程
  1. 在提交任务时,检查是否达到核心线程数量.
    • 未达到,则启动核心线程去执行任务.
    • 已达到,进行下一步.
  2. 检查任务队列是否已满.
    • 队列未满,加入任务队列.
    • 队列已满,进行下一步.
  3. 检查是否到达最大线程数.
    • 未达到,启动非核心线程去执行任务.
    • 已达到,执行饱和策略.
  • 一旦有线程处于闲置时,就会去队列中获取任务执行.

四. Executors工厂类

经过Executors提供四种线程池,newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool、ewCachedThreadPool.

  • FixedThreadPool

可重用固定线程数的线程池.

对应源码

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

复制代码
  • 只有核心线程.数量固定
  • 无超时机制,队列大小无限制.

使用示例

private static void executeFixedThreadPool() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程:" + Thread.currentThread().getName());
                }
            });
        }
    }
复制代码
  • SingleThreadExecutor

单线程的线程池

对应源码

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
复制代码

同FixedThreadPool相似.

  • 固定只有一个核心线程. 所以它能确保全部任务在一个线程中按顺序执行.

使用示例

private static void executeSingleThreadExecutor() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程:" + Thread.currentThread().getName());
                }
            });
        }
    }
复制代码
  • ScheduledThreadPool

支持定时和周期性任务的线程池

对应源码

ScheduledExecutorService.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor.java    
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
复制代码

同FixedThreadPool相似.

  • 核心线程数量固定,非核心线程不定.

使用示例

private static void executeScheduledThreadPool(int flag) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("后"+System.currentTimeMillis());
//                try {
//                    Thread.sleep(10000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                System.out.println("线程:" + Thread.currentThread().getName());

            }
        };
        System.out.println("前"+System.currentTimeMillis());
        switch (flag) {
            case 0:
                //延迟1000毫秒后开始执行
                executorService.schedule(runnable, 1000, TimeUnit.MILLISECONDS);
                break;
            case 1:
                //延迟1000毫秒后开始执行,后面每隔2000毫秒执行一次.强调任务的执行频率,不受任务执行时间影响,过期不候.
                executorService.scheduleAtFixedRate(runnable, 1000, 2000, TimeUnit.MILLISECONDS);
                break;
            case 2:
                //延迟1000毫秒后开始执行,后面每次延迟3000毫秒执行一次.强调任务执行的间隔.
                executorService.scheduleWithFixedDelay(runnable, 1000, 3000, TimeUnit.MILLISECONDS);

                break;
            default:
                break;
        }
    }

复制代码
  • CachedThreadPool

对应源码

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
复制代码
  • 只有非核心线程.
  • 有超时机制,队列没法存储,被当即执行.

所以适合大量的耗时较少的任务.

使用示例

private static void executeCachedThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程:" + Thread.currentThread().getName());
                }
            });
        }
    }
复制代码

关于线程池相关就介绍到这里了.

因为本人技术有限,若有错误的地方,麻烦你们给我提出来,本人不胜感激,你们一块儿学习进步.

参考连接:

相关文章
相关标签/搜索