如何让ThreadPoolExecutor更早地建立非核心线程

最近在项目中遇到一个须要用线程池来处理任务的需求,因而我用ThreadPoolExecutor来实现,可是在实现过程当中我发现提交大量任务时它的处理逻辑是这样的(提交任务还有一个submit方法内部也调用了execute方法):java

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

注释中已经写的很是明白:浏览器

  1. 若是线程数量小于corePoolSize,直接建立新线程处理任务
  2. 若是线程数量等于corePoolSize,尝试将任务放到等待队列里
  3. 若是等待队列已满,尝试建立非核心线程处理任务(若是maximumPoolSIze > corePoolSize

可是在个人项目中一个线程启动须要10s左右的时间(须要启动一个浏览器对象),所以我但愿实现一个更精细的逻辑提高资源的利用率:安全

  1. 线程池保持corePoolSize个线程确保有新任务到来时能够当即获得执行
  2. 当没有空闲线程时,先把任务放到等待队列中(由于开启一个线程须要10s,因此若是在等待队列比较小的时候,等待其余任务完成比等待新线程建立更快)
  3. 当等待队列的大小大于设定的阈值threshold时,说明堆积的任务已经太多了,这个时候开始建立非核心线程直到线程数量已经等于maximumPoolSize
  4. 当线程数量已经等于maximumPoolSize,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务)
  5. 长时间空闲后退出非核心线程回收浏览器占用的内存资源

当我研究了常见的CachedThreadPoolFixedThreadPool以及尝试本身配置ThreadPoolExecutor的构造函数后,发现不管如何都不能实现上面提到的逻辑,由于默认的实现只有在workQueue达到容量上限后才会开始建立非核心线程,所以须要经过继承的方法实现一个新的类来完成需求。多线程

怎么实如今workQueue到达容量上限前就建立非核心线程?还要回顾下execute函数的代码ide

//尝试将任务插入等待队列,若是返回false
					//说明队列已经到达容量上限,进入else if逻辑
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
//尝试建立非核心线程
else if (!addWorker(command, false))
    reject(command);

那么只要改变workQueue.offer()的逻辑,在线程数量还小于maximumPoolSize的时候就返回false拒绝插入,让线程池调用addWoker,等不能再建立更多线程时再容许添加到队列便可。函数

能够经过子类重写offer方法来实现添加逻辑的改变oop

@Override
public boolean offer(E e) {
    if (threadPoolExecutor == null) {
        throw new NullPointerException();
    }
    //当调用该方法时,已经肯定了workerCountOf(c) > corePoolSize
    //当数量小于threshold,在队列里等待
    if (size() < threshold) {
        return super.offer(e);
	//当数量大于等于threshold,说明堆积的任务太多,返回false
	//让线程池来建立新线程处理
    } else {
        //此处可能会由于多线程致使错误的拒绝
        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
            return false;
        //线程池中的线程数量已经到达上限,只能添加到任务队列中
        } else {
            return super.offer(e);
        }
    }
}

这样就实现了基本实现了我须要的功能,可是在写代码的过程当中我找到了一个可能出错的地方:ThreadPoolExecutor线程安全的,那么重写的offer方法也可能遇到多线程调用的状况测试

//设想当poolSize = maximumPoolSize-1时,两个任务到达此处同时返回false
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	return false;
}

因为添加到队列返回falseexecute方法进入到else if (!addWorker(command, false))ui

if (isRunning(c) && workQueue.offer(command)) {
	int recheck = ctl.get();
	if (! isRunning(recheck) && remove(command))
		reject(command);
	else if (workerCountOf(recheck) == 0)
		addWorker(null, false);
}
//添加到队列失败后进入addWorker方法中
else if (!addWorker(command, false))
	reject(command);
}

再来看一下addWorker方法的代码,这里只截取须要的一部分this

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
    	//两个线程都认为还能够建立再建立一个新线程
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
        //两个线程同时调用cas方法只有一个可以成功
        //成功的线程break retry;进入后面的建立线程的逻辑
        //失败的线程从新回到上面的检查并返回false
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
    // else CAS failed due to workerCount change; retry inner loop
}

最终,在竞争中失败的线程因为addWorker方法返回了false最终调用了reject(command)。在前面写的要实现的逻辑里提到了,只有在等待队列容量达到上限没法再插入时才拒绝任务,可是因为多线程的缘由,这里只是超过了threshold但没有超过capacity的时候就拒绝任务了,因此要对拒绝策略的触发作出修改:第一次触发Reject时,尝试从新添加到任务队列中(不进行poolSize的检测),若是仍然不能添加,再拒绝任务
这里经过对execute方法进行重写来实现重试

@Override
public void execute(Runnable command) {
    try {
        super.execute(command);
    } catch (RejectedExecutionException e) {
    	/*
    	这里参考源码中将任务添加到任务队列的实现
    	可是其中经过(workerCountOf(recheck) == 0)
    	检查当任务添加到队列后是否还有线程存活的部分
    	因为是private权限的,没法实现相似的逻辑,所以须要作必定的特殊处理
		if (isRunning(c) && workQueue.offer(command)) {
		     int recheck = ctl.get();
		     if (! isRunning(recheck) && remove(command))
		         reject(command);
		     else if (workerCountOf(recheck) == 0)
		         addWorker(null, false);
		 }
		*/
        if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
            if (this.isShutdown() && remove(command))
                //二次检查
                realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失败,队列已经满了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}

这里有两个小问题:

  1. 初始化线程池传入的RejectedExecutionHandler不必定会抛出异常(事实上,ThreadPoolExecutor本身实现的4中拒绝策略中只有AbortPolicy可以抛出异常并被捕捉到),所以须要在初始化父类时传入AbortPolicy拒绝策略并将构造函数中传入的自定义拒绝策略保存下来,在重试失败后才调用本身的rejectedExecution
  2. corePoolSize = 0 的极端状况下,可能出现一个任务刚被插入队列的同时,全部的线程都结束任务而后被销毁了,此使这个被加入的任务就没法被执行,在ThreadPoolExecutor中是经过
    else if (workerCountOf(recheck) == 0)
    	addWorker(null, false);
    在添加后再检查工做线程是否为0来确保任务能够被执行,可是其中使用的方法是私有的,没法在子类中实现相似的逻辑,所以在初始化时只能强制corePoolSize至少为1来解决这个问题。

所有代码以下

public class MyThreadPool extends ThreadPoolExecutor {

    private RejectedExecutionHandler realRejectedExecutionHandler;

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity) {
        this(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                queueCapacity,
                new AbortPolicy());
    }

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity,
                        RejectedExecutionHandler handler) {
        super(corePoolSize == 0 ? 1 : corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new MyLinkedBlockingQueue<>(queueCapacity),
                new AbortPolicy());
        ((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this);
        realRejectedExecutionHandler = handler;
    }

    @Override
    public void execute(Runnable command) {
        try {
            super.execute(command);
        } catch (RejectedExecutionException e) {
            if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
                if (this.isShutdown() && remove(command))
                    //二次检查
                    realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失败,队列已经满了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}


public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private int threshold = 20;

    private ThreadPoolExecutor threadPoolExecutor = null;

    public MyLinkedBlockingQueue(int queueCapacity) {
        super(queueCapacity);
    }

    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @Override
	public boolean offer(E e) {
	    if (threadPoolExecutor == null) {
	        throw new NullPointerException();
	    }
	    //当调用该方法时,已经肯定了workerCountOf(c) > corePoolSize
	    //当数量小于threshold,在队列里等待
	    if (size() < threshold) {
	        return super.offer(e);
		//当数量大于等于threshold,说明堆积的任务太多,返回false
		//让线程池来建立新线程处理
	    } else {
	        //此处可能会由于多线程致使错误的拒绝
	        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	            return false;
	        //线程池中的线程数量已经到达上限,只能添加到任务队列中
	        } else {
	            return super.offer(e);
	        }
	    }
	}

    public boolean offerWithoutCheck(E e) {
        return super.offer(e);
    }
}

最后进行简单的测试

corePoolSize:2
maximumPoolSize:5
queueCapacity:10
threshold:7
任务2
线程数量:2
等待队列大小:0
等待队列大小小于阈值,继续等待。
任务3
线程数量:2
等待队列大小:1
等待队列大小小于阈值,继续等待。
任务4
线程数量:2
等待队列大小:2
等待队列大小小于阈值,继续等待。
任务5
线程数量:2
等待队列大小:3
等待队列大小小于阈值,继续等待。
任务6
线程数量:2
等待队列大小:4
等待队列大小小于阈值,继续等待。
任务7
线程数量:2
等待队列大小:5
等待队列大小小于阈值,继续等待。
任务8
线程数量:2
等待队列大小:6
等待队列大小小于阈值,继续等待。
任务9
线程数量:2
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,建立新线程处理。
任务10
线程数量:3
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,建立新线程处理。
任务11
线程数量:4
等待队列大小:7
等待队列大小大于等于阈值,线程数量小于MaximumPoolSize,建立新线程处理。
任务12
线程数量:5
等待队列大小:7
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务13
线程数量:5
等待队列大小:8
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务14
线程数量:5
等待队列大小:9
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
任务15
线程数量:5
等待队列大小:10
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
队列已满
任务16
线程数量:5
等待队列大小:10
等待队列大小大于等于阈值,但线程数量大于等于MaximumPoolSize,只能添加到队列中。
队列已满

再从新复习一遍要实现的功能:

  1. 线程池保持corePoolSize个线程确保有新任务到来时能够当即获得执行
  2. 当没有空闲线程时,先把任务放到等待队列中(由于开启一个线程须要10s,因此若是在等待队列比较小的时候,等待其余任务完成比等待新线程建立更快)
  3. 当等待队列的大小大于设定的阈值threshold时,说明堆积的任务已经太多了,这个时候开始建立非核心线程直到线程数量已经等于maximumPoolSize
  4. 当线程数量已经等于maximumPoolSize,再将新来的任务放回到任务队列中等待(直到队列满后开始拒绝任务)
  5. 长时间空闲后退出非核心线程回收浏览器占用的内存资源

能够看出,线程池运行的逻辑和要实现的目标是相同的。

相关文章
相关标签/搜索