工做中不少时候都会用到线程池,可是线程池内部是怎么实现的呢oop
先看一下ThreadPoolExecutor类的构造方法ui
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
corePoolSize: 核心线程池大小,当线程池中的线程数小于corePoolSize时,每提交一个任务,都会新起一个线程来处理任务。线程会不断的从workQueue中取出任务执行。线程通常状况下即便空闲,也不会回收,除非设置了allowCoreThreadTimeOut
参数this
workQueue:工做队列,当线程数达到了corePoolSize后,后续提交的任务就会插入到workQueue中线程
maximumPoolSize:线程池最大线程数, 当workQueue满了以后,线程池就会启动新的线程来处理任务,可是整个线程池的线程数最大不会超过maximumPoolSize设计
keepAliveTime和unit:非core线程的最大空闲时间和时间单位code
threadFactory: 线程工厂,线程池会使用线程工厂来建立线程接口
handler:饱和策略,当线程池的线程达到maximumPoolSize且workQueue满了后,会使用handler处理新提交的任务队列
注意:不少人会搞错corePoolSize,maximumPoolSize,workQueue之间的关系,认为是core线程满了以后,会直接建立新的线程处理任务而不用插入到workQueue中。其实是workQueue满了以后才会建立新的线程,总的线程数量不超过maximumPoolSizeci
这里是一个线程池使用demorem
static void demo()throws Exception{ ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); Future<String> future = executorService.submit(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); System.out.println(future.get());; }
AbstractExecutorService类
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); //执行任务 return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
因为submit方法返回的是提供Future,因此提交任务的时候实际上提交的是一个RunnableFuture接口的实现类FutureTask。而execure(ftask)
则是任务执行的核心
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 当worker数小于corePoolSize时则建立worker if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 当worker大于等于corePoolSize且线程池是运行中时,则尝试插入任务到workerQueue中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 当线程数大于等于coreSize且workerQueue满了时,则再次尝试增长worker else if (!addWorker(command, false)) reject(command); }
代码中的worker能够理解为线程池中执行任务的线程,能够看到corePoolSize,workQueue间的关系是:
这里有个比较有意思的设计就是 private final AtomicInteger ctl;
这个变量。它是一个32位的整数类型,高3位表明了线程池的状态,低29位表明线程池中活跃的线程数。
为何要把两个变量合并到一个变量中呢?个人理解就是这样设计就能够在同一个cas操做中保证在设置数量的时候,状态是不变的。若是分开成两个变量,除非加更重的锁,不然在增长数量的过程当中,状态是有可能改变的。
那么问题来了:maximumPoolSize的做用是怎么体现的呢?
先看看private boolean addWorker(Runnable firstTask, boolean core)
方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //核心worker大于corePoolSize,非核心线程大于maximumPoolSize则增长失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
第17行能够看到在增长worker时,是会校验当前的worker数量的
在方法的第一个嵌套自旋中能够看到,里面有不少的状态判断和worker数量判断,当全部判断成功时会经过compareAndIncrementWorkerCount
方法去修改ctl
变量的worker数量
在JUC包中,做者大量的使用了自旋和CAS操做来代替锁操做,这种操做属于乐观锁
上面提到了线程池状态,而线程池存在五个状态,且各个状态间可以转化
五个状态: RUNNING: Accept new tasks and process queued tasks SHUTDOWN: Don't accept new tasks, but process queued tasks STOP: Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method TERMINATED: terminated() has completed 状态间的转化 RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow() SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING When pool is empty TIDYING -> TERMINATED When the terminated() hook method has completed
先看看Worker类的
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } // }
Worker自己就是一个Runnable,它包含了一个Thread字段用于执行认为。线程池中线程的数量其实就是Worker的数量。而Worker中的线程最终执行的就是里面的runWorker方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
能够看到有一个while循环会不断的获取任务执行,当获取到task后,接下来就会执行task.run方法。
那么假如队列为空时,core线程不是会继续保存在线程池中,非core线程会等待一段时间后再销毁吗?这个逻辑是怎么实现的?答案就在getTask()
方法中
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
能够看到getTask方法会根据线程数是否大于corePoolSize来或者allowCoreThreadTimeOut是否为true来决定从workQueue中获取任务时可否超时返回。
当容许超时返回,则超时后getTask
会返回null,且在runWorker
中当getTask
返回null时则会调用processWorkerExit
方法终止当前worker的线程。
当不容许超时返回时,则会一直阻塞在workQueue.take()
中
到这里为止就搞懂这3个问题了