ThreadPoolExecutor 基本使用参考:ThreadPoolExecutor执行过程分析java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 保存了线程池的运行状态(runState)和线程池内有效线程数量(workerCount)。git
// Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
用 ctl 的高3位来表示线程池的运行状态, 用低29位来表示线程池内有效线程的数量。ctlOf() 方法用于计算出ctl的值。runStateOf()和workerCountOf()方法分别经过CAPACITY来计算获得其runState和workerCount,CAPACITY=29个1。github
线程池的运行状态:less
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //shutdown() -> SHUTDONW , 不加新任务,继续执行阻塞队列中的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; //shutdownNow() -> STOP, 中断一切操做。 private static final int STOP = 1 << COUNT_BITS; //线程池没有线程,阻塞队列没有任务 -> TIDYING private static final int TIDYING = 2 << COUNT_BITS; //terminated() -> TERMINATED private static final int TERMINATED = 3 << COUNT_BITS;
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //若是线程池中线程数没有达到corePoolSize,则新增线程(worker) if (addWorker(command, true)) return; //更新c值。 c = ctl.get(); } //线程池处于RUNNING状态,而且阻塞队列未满 //workQueue.offer(command)是非阻塞方法,当队列满时直接返回false(例如,SynchronousQueue若是没有线程在阻塞take,则返回false) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次检查状态,若是发现不是RUNNING状态,则remove掉刚才offer的任务。 if (! isRunning(recheck) && remove(command)) reject(command); //若是有效线程数==0,添加一个线程,而不去启动它。?? //怎么会==0? else if (workerCountOf(recheck) == 0) addWorker(null, false); } //若是不是RUNNING状态,或者阻塞队列已满,则添加线程 //若是不能添加,则reject。 //false 表示添加的线程属于maximumPoolSize,若是线程数已经达到maximumPoolSize,则reject else if (!addWorker(command, false)) reject(command); }
BlockingQueue
的一些操做方法ide
抛出异常 特殊值 阻塞 超时 插入 add(e)
offer(e)
put(e)
offer(e, time, unit)
移除 remove()
poll()
take()
poll(time, unit)
检查 element()
peek()
不可用 不可用
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //1. 处于 STOP, TYDING 或 TERMINATD 状态 而且 //2. 不是SUHTDOWN 或者 firsttask != null 或 queue不为空 return false; for (;;) { int wc = workerCountOf(c); //wc大于最大容量。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //没有空余的线程了。 return false; //有效线程数加一,加一成功后break if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //runState改变,从头执行逻辑。 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop //else runState 没变,从新去执行加一操做。 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //添加成功,启动线程 //启动后执行runWorker(this); t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
运行worker,该线程不断的getTask()从队列中获取任务,而后 task.run();运行。只要队列中有值则不断循环。oop
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //getTask()方法是个无限循环, 会从阻塞队列 workQueue中不断取出任务来执行. //addWorker(null, false);状况,task==null,这样就须要getTask从队列中取任务执行(本身不带任务)。直到getTask返回null while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //执行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // STOP以上状态,或者SHUTDOWN状态下queue为空,即都没有任务要执行了。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //线程数减一 decrementWorkerCount(); //该线程退出。 return null; } //下面都是RUNNING状态,或SHUTDOWN状态queue!=null int wc = workerCountOf(c); // Are workers subject to culling? //设置了allowCoreThreadTimeOut,或者线程数大于core线程数。 //是否剔除超时的线程? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 经过返回 null 结束线程。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //线程已经准备好,正在take(),没有什么标志位? //取出runnable 返回 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
考虑到当线程池满时(任务数 > maximumPoolSize + Queue.size()),会执行饱和策略。默认AbortPolicy ,抛出RejectedExecutionException。源码分析
怎么能避免线程池拒绝提交的任务呢?首先想到经过信号量Semaphore来控制任务的添加。代码以下:ui
注意:该代码是无效的。this
Semaphore semaphore; /** * 使用semaphore,控制提交任务速度 * @throws InterruptedException * @throws ExecutionException */ @Test public void test555() throws InterruptedException, ExecutionException { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 7, 10, TimeUnit.SECONDS, new SynchronousQueue<>()); //信号量设置为线程池最大线程数 semaphore = new Semaphore(threadPoolExecutor.getMaximumPoolSize()); ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService(threadPoolExecutor); Runnable runnable = new Runnable() { @Override public void run() { for (int i = 0; i < 50; i++) { String name = "name_" + i; TestCallable testCallable = new TestCallable(name); try { //RetryUtil.createThreadPoolExecutor() semaphore.acquire(); executorCompletionService.submit(testCallable); logger.info("+++添加任务 name: " + name + poolInfo(threadPoolExecutor)); //threadPoolExecutor.submit(testCallable); } catch (RejectedExecutionException e) { logger.info("拒绝:" + name); } catch (InterruptedException e) { e.printStackTrace(); } try { //添加任务间隔200ms Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } finishState = 1; } }; Thread addThread = new Thread(runnable); addThread.start(); //logger.info(" taskCount: " + threadPoolExecutor.getTaskCount()); //添加的任务有被抛弃的。taskCount不必定等于添加的任务。 int completeCount = 0; while (!(completeCount == threadPoolExecutor.getTaskCount() && finishState == 1)) { Future<String> take = executorCompletionService.take(); String taskName = null; try { taskName = take.get(); //有可能线程池还没准备好? semaphore.release(); System.out.println("???" + take.isDone()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { logger.info(e.getMessage()); } logger.info("---完成任务 name: " + taskName + poolInfo(threadPoolExecutor) + " finishTask:" + (++completeCount)); } addThread.join(); while (threadPoolExecutor.getPoolSize() > 0) { Thread.sleep(1000); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); logger.info(simpleDateFormat.format(new Date()) + poolInfo(threadPoolExecutor)); } // Tell threads to finish off. threadPoolExecutor.shutdown(); // Wait for everything to finish. while (!threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS)) { logger.info("complete"); } } public String poolInfo(ThreadPoolExecutor threadPoolExecutor) { return " ActiveCount: " + threadPoolExecutor.getActiveCount() + " poolSize: " + threadPoolExecutor.getPoolSize() + " queueSize: " + threadPoolExecutor.getQueue().size() + " taskCount: " + threadPoolExecutor.getTaskCount(); }
只是在submit以前添加semaphore.acquire(); 在获取future后,添加semaphore.release();。atom
但这样依然会产生RejectedExecutionException。
经过源码分析缘由,
当线程池中线程已满,而且都处于忙碌状态。此时semaphore的值==线程池线程数,addThread被semaphore.acquire()阻塞,禁止submit新任务。当线程池中一个线程t1执行了runWorker(Worker w)中的task.run(),main线程就能够执行Future<String> take = executorCompletionService.take()获取结果并semaphore.release()释放信号量。
释放信号量semaphore后,addThread线程能够submit新任务,假设此时t1线程尚未执行到getTask() 中的poll()和take()方法。此时workQueue队列依然是满的。
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
而addThread已经执行到execute()的
if (isRunning(c) && workQueue.offer(command)) {
当workQueue已满,offer() 直接返回false(正确的顺序应该是等t1线程执行到workQueue.take()后addThread再开始执行workQueue.offer(command)。)。执行execute() 以下逻辑
else if (!addWorker(command, false)) reject(command);
addWork()中,wc = maximumPoolSize 返回false。
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //没有空余的线程了。 return false;
执行reject(),抛出RejectedExecutionException。
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
其思想就是替换BlockingQueue中的offer()方法为put()方法,这样execute() 中的workQueue.offer(command),就变成put(),阻塞添加任务,不会存在workQueue.offer() 返回false的状况。
//void execute(Runnable command) 中代码 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //一下代码,没法执行 else if (!addWorker(command, false)) reject(command);
但这样的问题是下面的else if (!addWorker(command, false)) 代码逻辑将没法执行,致使的结果就是,只针对corePoolSize==maxPoolSize 时有效。不建议这么作。
RejectedExecutionHandler block = new RejectedExecutionHandler() { rejectedExecution(Runnable r, ThreadPoolExecutor executor) { executor.getQueue().put( r ); } }; ThreadPoolExecutor pool = new ... pool.setRejectedExecutionHandler(block);
经过自定义RejectedExecutionHandler,在reject时调用Queue的put()方法,阻塞式添加任务。
其实忙活一圈,发现最简单的方式就是使用ThreadPoolExecutor.CallerRunsPolicy。
CallerRunsPolicy被拒绝的任务,谁submit的谁执行。想一想以前的各类阻塞也对,负责添加任务的线程由于线程池满了就阻塞在那里,还不如帮着执行一些任务..