使用线程池管理线程有以下优势:java
Java 为咱们提供了 ThreadPoolExecutor 来建立一个线程池,其完整构造函数以下所示:git
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
int corePoolSize(核心线程数):线程池新建线程的时候,若是当前线程总数小于corePoolSize,则新建的是核心线程,若是超过corePoolSize,则新建的是非核心线程;核心线程默认状况下会一直存活在线程池中,即便这个核心线程啥也不干(闲置状态);若是设置了 allowCoreThreadTimeOut 为 true,那么核心线程若是不干活(闲置状态)的话,超过必定时间(时长下面参数决定),就会被销毁掉。github
int maximumPoolSize(线程池能容纳的最大线程数量):线程总数 = 核心线程数 + 非核心线程数。缓存
long keepAliveTime(非核心线程空闲存活时长):非核心线程空闲时长超过该时长将会被回收,主要应用在缓存线程池中,当设置了 allowCoreThreadTimeOut 为 true 时,对核心线程一样起做用。bash
TimeUnit unit(keepAliveTime 的单位):它是一个枚举类型,经常使用的如:TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)。并发
BlockingQueue workQueue(任务队列):当全部的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,若是队列满了,则新建非核心线程执行任务,经常使用的 workQueue 类型:异步
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,若是全部线程都在工做怎么办?那就新建一个线程来处理这个任务!因此为了保证不出现 线程数达到了 maximumPoolSize 而不能新建线程 的错误,使用这个类型队列的时候,maximumPoolSize 通常指定成 Integer.MAX_VALUE,即无限大。async
LinkedBlockingQueue:这个队列接收到任务的时候,若是当前线程数小于核心线程数,则新建线程(核心线程)处理任务;若是当前线程数等于核心线程数,则进入队列等待。因为这个队列没有最大值限制,即全部超过核心线程数的任务都将被添加到队列中,这也就致使了 maximumPoolSize 的设定失效,由于总线程数永远不会超过 corePoolSize。ide
ArrayBlockingQueue:能够限定队列的长度,接收到任务的时候,若是没有达到 corePoolSize 的值,则新建线程(核心线程)执行任务,若是达到了,则入队等候,若是队列已满,则新建线程(非核心线程)执行任务,又若是总线程数到了 maximumPoolSize,而且队列也满了,则发生错误。函数
DelayQueue:队列内元素必须实现 Delayed 接口,这就意味着你传进去的任务必须先实现 Delayed 接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
ThreadFactory threadFactory(线程工厂):用来建立线程池中的线程,一般用默认的便可。
RejectedExecutionHandler handler(拒绝策略):在线程池已经关闭的状况下和任务太多致使最大线程数和任务队列已经饱和,没法再接收新的任务,在上面两种状况下,只要知足其中一种时,在使用 execute() 来提交新的任务时将会拒绝,线程池提供了如下 4 种策略:
AbortPolicy:默认策略,在拒绝任务时,会抛出RejectedExecutionException。
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
DiscardOldestPolicy:该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
DiscardPolicy:该策略默默的丢弃没法处理的任务,不予任何处理。
当一个任务要被添加进线程池时,有如下四种执行策略:
其流程图以下所示:
常见的四类线程池分别有 FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool 和 CachedThreadPool,它们其实都是经过 ThreadPoolExecutor 建立的,其参数以下表所示:
参数 | FixedThreadPool | SingleThreadExecutor | ScheduledThreadPool | CachedThreadPool |
---|---|---|---|---|
corePoolSize | nThreads | 1 | corePoolSize | 0 |
maximumPoolSize | nThreads | 1 | Integer.MAX_VALUE | Integer.MAX_VALUE |
keepAliveTime | 0 | 0 | 10 | 60 |
unit | MILLISECONDS | MILLISECONDS | MILLISECONDS | SECONDS |
workQueue | LinkedBlockingQueue | LinkedBlockingQueue | DelayedWorkQueue | SynchronousQueue |
threadFactory | defaultThreadFactory | defaultThreadFactory | defaultThreadFactory | defaultThreadFactory |
handler | defaultHandler | defaultHandler | defaultHandler | defaultHandler |
适用场景 | 已知并发压力的状况下,对线程数作限制 | 须要保证顺序执行的场景,而且只有一个线程在执行 | 须要多个后台线程执行周期任务的场景 | 处理执行时间比较短的任务 |
若是你不想本身写一个线程池,那么你能够从上面看看有没有符合你要求的(通常都够用了),若是有,那么很好你直接用就好了,若是没有,那你就老老实实本身去写一个吧。
须要针对具体状况而具体处理,不一样的任务类别应采用不一样规模的线程池,任务类别可划分为 CPU 密集型任务、IO 密集型任务和混合型任务。
CPU 密集型任务:线程池中线程个数应尽可能少,推荐配置为 (CPU 核心数 + 1);
IO 密集型任务:因为 IO 操做速度远低于 CPU 速度,那么在运行这类任务时,CPU 绝大多数时间处于空闲状态,那么线程池能够配置尽可能多些的线程,以提升 CPU 利用率,推荐配置为 (2 * CPU 核心数 + 1);
混合型任务:能够拆分为 CPU 密集型任务和 IO 密集型任务,当这两类任务执行时间相差无几时,经过拆分再执行的吞吐率高于串行执行的吞吐率,但若这两类任务执行时间有数据级的差距,那么没有拆分的意义。
为了提高开发效率及更好地使用和管理线程池,我已经为大家封装好了线程工具类----ThreadUtils,依赖 AndroidUtilCode 1.16.1 版本便可使用,其 API 以下所示:
isMainThread : 判断当前是否主线程
getFixedPool : 获取固定线程池
getSinglePool : 获取单线程池
getCachedPool : 获取缓冲线程池
getIoPool : 获取 IO 线程池
getCpuPool : 获取 CPU 线程池
executeByFixed : 在固定线程池执行任务
executeByFixedWithDelay : 在固定线程池延时执行任务
executeByFixedAtFixRate : 在固定线程池按固定频率执行任务
executeBySingle : 在单线程池执行任务
executeBySingleWithDelay: 在单线程池延时执行任务
executeBySingleAtFixRate: 在单线程池按固定频率执行任务
executeByCached : 在缓冲线程池执行任务
executeByCachedWithDelay: 在缓冲线程池延时执行任务
executeByCachedAtFixRate: 在缓冲线程池按固定频率执行任务
executeByIo : 在 IO 线程池执行任务
executeByIoWithDelay : 在 IO 线程池延时执行任务
executeByIoAtFixRate : 在 IO 线程池按固定频率执行任务
executeByCpu : 在 CPU 线程池执行任务
executeByCpuWithDelay : 在 CPU 线程池延时执行任务
executeByCpuAtFixRate : 在 CPU 线程池按固定频率执行任务
executeByCustom : 在自定义线程池执行任务
executeByCustomWithDelay: 在自定义线程池延时执行任务
executeByCustomAtFixRate: 在自定义线程池按固定频率执行任务
cancel : 取消任务的执行
复制代码
若是你使用 RxJava 很 6,并且项目中已经使用了 RxJava,那么你能够继续使用 RxJava 来作线程切换的操做;若是你并不会 RxJava 或者是在开发 SDK,那么这个工具类再适合你不过了,它能够为你统一管理线程池的使用,不至于让你的项目中出现过多的线程池。
ThreadUtils 使用极为方便,看 API 便可明白相关意思,FixedPool、SinglePool、CachedPool 分别对应了上面介绍的 FixedThreadPool、SingleThreadExecutor、CachedThreadPool 这三种,IoPool 是建立 (CPU_COUNT * 2 + 1) 个核心线程数,CpuPool 是创建 (CPU_COUNT + 1) 个核心线程数;而全部的 execute 都是线程池外围裹了一层 ScheduledThreadPool,这里和 RxJava 线程池的实现有所类似,能够更方便地提供延时任务和固定频率执行的任务,固然也能够更方便地取消任务的执行,下面让咱们来简单地来介绍其使用,以从 assets 中拷贝 APK 到 SD 卡为例,其代码以下所示:
public static void releaseInstallApk(final OnReleasedListener listener) {
if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
ThreadUtils.executeByIo(new ThreadUtils.SimpleTask<Void>() {
@Override
public Void doInBackground() throws Throwable {
ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
return null;
}
@Override
public void onSuccess(Void result) {
if (listener != null) {
listener.onReleased();
}
}
});
} else {
if (listener != null) {
listener.onReleased();
}
LogUtils.d("test apk existed.");
}
}
复制代码
看起来还不是很优雅是吧,你能够把相关的 Task 都抽出来放到合适的包下,这样每一个 Task 的职责一看便知,如上例子能够改装成以下所示:
public class ReleaseInstallApkTask extends ThreadUtils.SimpleTask<Void> {
private OnReleasedListener mListener;
public ReleaseInstallApkTask(final OnReleasedListener listener) {
mListener = listener;
}
@Override
public Void doInBackground() throws Throwable {
ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
return null;
}
@Override
public void onSuccess(Void result) {
if (mListener != null) {
mListener.onReleased();
}
}
public void execute() {
ThreadUtils.executeByIo(this);
}
}
public static void releaseInstallApk(final OnReleasedListener listener) {
if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
new ReleaseInstallApkTask(listener).execute();
} else {
if (listener != null) {
listener.onReleased();
}
LogUtils.d("test apk existed.");
}
}
复制代码
是否是瞬间清爽了不少,若是执行成功的回调中涉及了 View 相关的操做,那么你须要在 destroy 中取消 task 的执行哦,不然会内存泄漏哦,继续以上面的例子为例,代码以下所示:
public class XXActivity extends Activity {
···
@Override
protected void onDestroy() {
// ThreadUtils.cancel(releaseInstallApkTask);// 或者下面的取消均可以
releaseInstallApkTask.cancel();
super.onDestroy();
}
}
复制代码
以上是以 SimpleTask 为例,Task 的话会多两个回调,onCancel() 和 onFail(Throwable t),它们和 onSuccess(T result) 都是互斥的,最终回调只会走它们其中之一,而且在 Android 端是发送到主线程中执行,若是是 Java 端的话那就仍是会在相应的线程池中执行,这点也方便了我作单元测试。
若是遇到了异步的单测,你会发现单测很快就跑完呢,并无等待咱们线程跑完再结束,咱们能够用 CountDownLatch 来等待线程的结束,或者化异步为同步的作法,这里咱们使用 CountDownLatch 来实现,我进行了简单的封装,测试 Fixed 的代码以下所示:
public class ThreadUtilsTest {
@Test
public void executeByFixed() throws Exception {
asyncTest(10, new TestRunnable<String>() {
@Override
public void run(final int index, CountDownLatch latch) {
final TestTask<String> task = new TestTask<String>(latch) {
@Override
public String doInBackground() throws Throwable {
Thread.sleep(500 + index * 10);
if (index < 4) {
return Thread.currentThread() + " :" + index;
} else if (index < 7) {
cancel();
return null;
} else {
throw new NullPointerException(String.valueOf(index));
}
}
@Override
void onTestSuccess(String result) {
System.out.println(result);
}
};
ThreadUtils.executeByFixed(3, task);
}
});
}
@Test
public void executeByFixedWithDelay() throws Exception {
asyncTest(10, new TestRunnable<String>() {
@Override
public void run(final int index, CountDownLatch latch) {
final TestTask<String> task = new TestTask<String>(latch) {
@Override
public String doInBackground() throws Throwable {
Thread.sleep(500);
if (index < 4) {
return Thread.currentThread() + " :" + index;
} else if (index < 7) {
cancel();
return null;
} else {
throw new NullPointerException(String.valueOf(index));
}
}
@Override
void onTestSuccess(String result) {
System.out.println(result);
}
};
ThreadUtils.executeByFixedWithDelay(3, task, 500 + index * 10, TimeUnit.MILLISECONDS);
}
});
}
@Test
public void executeByFixedAtFixRate() throws Exception {
asyncTest(10, new TestRunnable<String>() {
@Override
public void run(final int index, CountDownLatch latch) {
final TestScheduledTask<String> task = new TestScheduledTask<String>(latch, 3) {
@Override
public String doInBackground() throws Throwable {
Thread.sleep(500 + index * 10);
if (index < 4) {
return Thread.currentThread() + " :" + index;
} else if (index < 7) {
cancel();
return null;
} else {
throw new NullPointerException(String.valueOf(index));
}
}
@Override
void onTestSuccess(String result) {
System.out.println(result);
}
};
ThreadUtils.executeByFixedAtFixRate(3, task, 3000 + index * 10, TimeUnit.MILLISECONDS);
}
});
}
abstract static class TestScheduledTask<T> extends ThreadUtils.Task<T> {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
private int mTimes;
CountDownLatch mLatch;
TestScheduledTask(final CountDownLatch latch, final int times) {
mLatch = latch;
mTimes = times;
}
abstract void onTestSuccess(T result);
@Override
public void onSuccess(T result) {
onTestSuccess(result);
if (ATOMIC_INTEGER.addAndGet(1) % mTimes == 0) {
mLatch.countDown();
}
}
@Override
public void onCancel() {
System.out.println(Thread.currentThread() + " onCancel: ");
mLatch.countDown();
}
@Override
public void onFail(Throwable t) {
System.out.println(Thread.currentThread() + " onFail: " + t);
mLatch.countDown();
}
}
abstract static class TestTask<T> extends ThreadUtils.Task<T> {
CountDownLatch mLatch;
TestTask(final CountDownLatch latch) {
mLatch = latch;
}
abstract void onTestSuccess(T result);
@Override
public void onSuccess(T result) {
onTestSuccess(result);
mLatch.countDown();
}
@Override
public void onCancel() {
System.out.println(Thread.currentThread() + " onCancel: ");
mLatch.countDown();
}
@Override
public void onFail(Throwable t) {
System.out.println(Thread.currentThread() + " onFail: " + t);
mLatch.countDown();
}
}
<T> void asyncTest(int threadCount, TestRunnable<T> runnable) throws Exception {
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
runnable.run(i, latch);
}
latch.await();
}
interface TestRunnable<T> {
void run(final int index, CountDownLatch latch);
}
}
复制代码
感谢你们一块儿陪伴 AndroidUtilCode 的成长,核心工具类几乎都已囊括,也是聚集了我大量的心血,把开源作到了极致,但愿你们能够用的舒心,大大提高开发效率,早日赢取白富美,走上人生巅峰。
欢迎来个人 狗窝 坐坐哈
后文再添加一个我的对 OkHttp 的线程池的使用分析,算是送上个小福利。
查看 OkHttp 的源码发现,不管是同步请求仍是异步请求,最终都是交给 Dispatcher 作处理,咱们看下该类和线程池有关的的主要代码:
public final class Dispatcher {
// 最大请求数
private int maxRequests = 64;
// 相同 host 最大请求数
private int maxRequestsPerHost = 5;
// 请求执行线程池,懒加载
private @Nullable ExecutorService executorService;
// 就绪状态的异步请求队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// 运行中的异步请求队列,包括还没完成的请求
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 和 CachedThreadPool 很类似
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
synchronized void enqueue(AsyncCall call) {
// 不超过最大请求数而且不超过 host 最大请求数
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 添加到运行中的异步请求队列
runningAsyncCalls.add(call);
// 添加到线程池中运行
executorService().execute(call);
} else {
// 添加到就绪的异步请求队列
readyAsyncCalls.add(call);
}
}
// 当该异步请求结束的时候,会调用此方法,用于将运行中的异步请求队列中的该请求移除并调整请求队列
// 此时就绪队列中的请求就能够进入运行中的队列
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
// 根据 maxRequests 和 maxRequestsPerHost 来调整 runningAsyncCalls 和 readyAsyncCalls
// 使运行中的异步请求不超过两种最大值,而且若是队列有空闲,将就绪状态的请求归类为运行中。
private void promoteCalls() {
// 若是运行中的异步队列不小于最大请求数,直接返回
if (runningAsyncCalls.size() >= maxRequests) return;
// 若是就绪队列为空,直接返回
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
// 遍历就绪队列并插入到运行队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
// 运行队列中的数量到达最大请求数,直接返回
if (runningAsyncCalls.size() >= maxRequests) return;
}
}
}
复制代码
能够发现 OkHttp 不是在线程池中维护线程的个数,线程是经过 Dispatcher 间接控制,线程池中的请求都是运行中的请求,这也就是说线程的重用不是线程池控制的,经过源码咱们发现线程重用的地方是请求结束的地方 finished(AsyncCall call)
,而真正的控制是经过 promoteCalls
方法, 根据 maxRequests
和 maxRequestsPerHost
来调整 runningAsyncCalls
和 readyAsyncCalls
,使运行中的异步请求不超过两种最大值,而且若是队列有空闲,将就绪状态的请求归类为运行中。