Spring Cloud Alibaba-ThreadPool(十)

线程池做用

  • 避免建立和销毁线程的资源浪费
    • 类加载和GC销毁须要消耗资源
  • 提升响应速度
    • 省去建立时间
  • 重复利用

重要参数(ThreadPoolExecutor)

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码
属性 类型 含义
corePoolSize int 核心线程池大小
maximumPoolSize int 最大线程池大小
keepAliveTime long 线程池中非核心线程空闲的存活时间大小
unit TimeUnit 线程空闲存活时间单位
workQueue BlockingQueue 线程等待队列
threadFactory ThreadFactory 线程建立工厂
handler RejectedExecutionHandler 拒绝策略
  • handler(拒绝策略)
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    复制代码
    • AbortPolicy(默认):抛出一个异常
    • DiscardPolicy:直接丢弃任务
    • DiscardOldestPolicy:丢弃队列里最老的任务,将当前这个任务继续提交给线程池
    • CallerRunsPolicy:交给线程池调用所在的线程进行处理

执行逻辑

  • 核心线程池==>>队列==>>非核心线程==>>拒绝策略
  • 提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会建立一个核心线程去处理提交的任务
  • 若是线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行
  • 当线程池里面存活的线程数已经等于corePoolSize了,而且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,若是没到达,建立一个非核心线程执行提交的任务
  • 若是当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理

异常处理

  • try-catch捕获异常
  • submit执行,Future.get接受异常:执行过程保留了异常信息
  • 重写ThreadPoolExecutor.afterExecute方法,处理传递的异常引用:ExtendedExecutor extends ThreadPoolExecutor
  • 实例化传入本身的ThreadFactory,设置Thread.uncaughtExceptionHandler处理未检测的异常:setUncaughtExceptionHandler

队列类型

  • ArrayBlockingQueue(有界队列):是一个用数组实现的有界阻塞队列,按FIFO排序量
  • LinkedBlockingQueue(可设置容量队列):基于链表结构的阻塞队列,按FIFO排序任务,容量能够选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量一般要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列
  • DelayQueue(延迟队列):是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,不然根据插入到队列的前后排序。newScheduledThreadPool线程池使用了这个队列
  • PriorityBlockingQueue(优先级队列):具备优先级的无界阻塞队列
  • SynchronousQueue(同步队列):一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列

线程池类型

  • newFixedThreadPool(固定数目线程的线程池)
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    复制代码
    • 特色
      • 核心线程数和最大线程数大小同样
      • 没有所谓的非空闲时间,即keepAliveTime为0
      • 阻塞队列为无界队列LinkedBlockingQueue
    • 工做机制
      • 提交任务
      • 若是线程数少于核心线程,建立核心线程执行任务
      • 若是线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列(OOM)
      • 若是线程执行完任务,去阻塞队列取任务,继续执行
    • 使用场景 CPU密集型的任务,确保CPU在长期被工做线程使用的状况下,尽量的少的分配线程,即适用执行长期的任务
  • newCachedThreadPool(可缓存线程的线程池)
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    复制代码
    • 特色
      • 核心线程数为0
      • 最大线程数为Integer.MAX_VALUE
      • 阻塞队列是SynchronousQueue
      • 非核心线程空闲存活时间为60秒
    • 工做机制
      • 提交任务
      • 由于没有核心线程,因此任务直接加到SynchronousQueue队列
      • 判断是否有空闲线程,若是有,就去取出任务执行
      • 若是没有空闲线程,就新建一个线程执行
      • 执行完任务的线程,还能够存活60秒,若是在这期间,接到任务,能够继续活下去;不然,被销毁
    • 使用场景
      • 并发执行大量短时间的小任务
  • newSingleThreadExecutor(单线程的线程池)
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    复制代码
    • 特色
      • 核心线程数为1
      • 最大线程数也为1
      • keepAliveTime为0
      • 阻塞队列是LinkedBlockingQueue
    • 工做机制
      • 提交任务
      • 线程池是否有一条线程在,若是没有,新建线程执行任务
      • 若是有,任务加到阻塞队列
      • 当前的惟一线程,从队列取任务,执行完一个,再继续取,一我的(一条线程)夜以继日地干活
    • 使用场景
  • newScheduledThreadPool(定时及周期执行的线程池)
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    复制代码
    • 特色
      • 最大线程数为Integer.MAX_VALUE
      • 阻塞队列是DelayedWorkQueue
      • keepAliveTime为0
      • scheduleAtFixedRate() :按某种速率周期执行
      • scheduleWithFixedDelay():在某个延迟后执行
    • 工做机制
      • 添加一个任务
      • 线程池中的线程从 DelayQueue 中取任务
      • 线程从 DelayQueue 中获取 time 大于等于当前时间的task
      • 执行完后修改这个 task 的 time 为下次被执行的时间
      • 这个 task 放回DelayQueue队列中
    • 使用场景
      • 周期性执行任务的场景,须要限制线程数量的场景

自定义ThreadPool

  • copy一份AbortPolicy改个名字,加些本身想加的内容(例如加个输出)。
public static class MyPolicy implements RejectedExecutionHandler {
        public MyPolicy() { }
        public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor e) {
            // 新增
            System.out.println("拒绝");
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }
复制代码
  • copy一份DefaultThreadFactory改个名字,加些本身想加的内容(例如加个输出)。
static class MyThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        MyThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            // 新增
            System.out.println(t.getName() + " has been created");
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
复制代码
  • 测试
public static void main(String agrs[]) throws Exception{
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadFactory threadFactory = new MyThreadFactory();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue, threadFactory,new MyPolicy());
        executor.prestartAllCoreThreads(); // 预启动全部核心线程
        for (int i = 1; i <= 10; i++) {
            executor.execute(()-> System.out.println(Thread.currentThread().getId()));
        }

        System.in.read(); //阻塞主线程
    }
复制代码
  • 结果
    • 核心线程2个,非核心线程2个,队列缓存2个,等前四个运行完了,继续运行缓存中的2个,符合预期

线程池的状态

  • RUNNING = -1
    • 该状态的线程池会接收新任务,并处理阻塞队列中的任务
    • 调用线程池的shutdown()方法,能够切换到SHUTDOWN状态
    • 调用线程池的shutdownNow()方法,能够切换到STOP状态
  • SHUTDOWN = 0
    • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务
    • 队列为空,而且线程池中执行的任务也为空,进入TIDYING状态
  • STOP = 1
    • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,并且会中断正在运行的任务
    • 线程池中执行的任务为空,进入TIDYING状态
  • TIDYING = 2
    • 该状态代表全部的任务已经运行终止,记录的任务数量为0
    • terminated()执行完毕,进入TERMINATED状态
  • TERMINATED = 3
    • 该状态表示线程池完全终止
相关文章
相关标签/搜索