RxJava很方便的切换主子线程、指定任务运行的线程,在这个便利以后还隐藏着不少问题。好比IO scheduler是一个无上限线程池,若是短期并发量过大,在手机端可能出现OOM或者pthread_create错误。另外,在实际业务中咱们须要对执行的业务进行优先级区分,以便优先级高的任务先执行,想实现这个需求必然须要对RxJava默认的scheduler进行改造。本文将从RxJava IO scheduler分析、介绍线程池相关知识、如何对IO scheduler进行改造等方向进行介绍,而且对应用旧代码作到无侵入式的替换。java
在介绍主体内容以前,咱们先回顾下线程池的相关知识,这样能更好的理解本文章内容。从字面意思上来讲,线程池确定是一个装着线程的"池",小则是鱼塘,装的少,可是家里没矿只能承包这么大的鱼塘;固然若是是大佬,说不定这一片海都是他的。线程池确定不是简单的承载容纳线程的池子,既然做为相似仓库的属性,必然有管理之意。git
线程是做为一个任务的执行承载者,接收来自程序的诸多执行请求,其中子线程是合理利用cpu性能避免阻塞主线程的存在。好东西都是不可贪多,线程若是不加以管理,确定会被程序各处的代码随意建立,这样会浪费或者影响cpu某个时刻的性能,甚至致使当前进程出现异常。github
阻塞队列,这是线程池待执行任务的容器,负责管理要执行的任务。阻塞二字说明它的入队和出队是可控的,所谓阻塞,在某些状况下会挂起线程(即阻塞),一旦条件知足,被挂起的线程又会自动被唤醒。按照队列的数据结构进行出入某一个任务时会阻塞,这样在多线程环境下才更安全的生产任务和消费任务。缓存
比较通俗的一个例子有仓库提取货物,若是仓库里只有有限的几个小车运输货物,此时有不少运输员来提货,确定要遇到争夺小车、等待小车。小车相似于cpu的核数,也能够理解为线程池容许建立的总数。总而言之,当前仓库(线程池)只有有限的几辆小车同时工做,每一个运输员(程序代码块)想要获取货物(执行代码)就必需要争夺小车资源。当有空闲小车时,会按照必定规则分配给某个经销商,多是队列的简单前后入队等待顺序,也多是优先级(毕竟氪金无敌VIP)。固然有多是大佬,家里矿多,说小车不够,我去买,这样会造成一个无限制上限线程池模型。不过这样作有一种风险就是仓库体积没法随意扩容(整个程序所承载的机器性能有限),买了太多小车放不下,而后整个仓库就瘫痪了,这时候可能OOM或pthread_create错误。小车每次使用完后,都会继续被分配到下一个任务,固然若是经销商的事情都处理完了,可能就都闲置了,有可能晚上没活,仓库就把小车都封存起来,整理回收到固定地方(超时闲置后回收非核心线程),有可能留下几辆预先说好的小车以便晚上有紧急货物时处理(核心线程常驻)。 以下图所示,大体概念以下。绿色的取货车多是由于取货车不够,临时采购或者借调的,相似于线程池临时开启新线程。红色的取货车区域是核心线程,有限的。安全
通常核心线程数根据CPU数量来肯定,线程池数大于CPU数量,看似是并发执行任务,实际上是操做系统帮咱们在按照必定时间片进行调度来执行任务,达到一种同时执行的效果,因此大量线程同时执行对CPU负载性能要求,会让机器达到50%甚至100%高负荷运做,此时整机机器发生出错的几率增大。因此同时执行不少任务致使频繁切换线程自己也是一种额外的开销 ,不建议如此操做,尤为是部分任务是低优先级且不重要、可延迟的。bash
单个线程大概1MB左右开销,在Java内部开发版的JDK中,加入fiber这种新的任务调度模型,开销只有200KB左右,经过100W级任务调度测试,据介绍性能比thread优异不少,release版预测还会提升,特性相似于协程。不过咱们Android不太可能使用付费JDK。数据结构
经过上面的描述咱们简单的了解线程池与线程之间的关系,线程和调用者之间的联系,而且线程池运做时是有本身的规则设定,调用者和每一个被管理的线程必须遵照规则。多线程
java经过Executors提供了四种经常使用线程池,这四种本质上也是封装了自定义线程池的基本参数,简化了建立流程,提供特定的功能。并发
先介绍下自定义线程池的基本参数和含义,这样更好理解下面的java封装好的线程池。ide
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
一个任务被提交时,它的大体流程都是类似的。下面是流程图(下图取自文章https://juejin.im/post/5b052dd7f265da0ba567e7f1)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
复制代码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
通常都是继承ThreadPoolExecutor类(不继承也能够,可是继承是为了作更多的方法执行监控),而后根据须要设置下面7种参数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
比较重要的就是阻塞队列workQueue和拒绝策略handler的选择,这个会在后续RxJava的IO scheduler监控方案里再次介绍。java提供的默认队列的种类有无限大小和有限,带优先级、带延时等,根据须要能够选择不一样类型的队列。ThreadFactory 是构造新的thread的工厂,这里自定义一个能够进行新建线程的监控。拒绝策略handler有提供四种默认的策略,也能够本身实现接口RejectedExecutionHandler本身作特殊策略,好比移交任务到另一个执行者,或者判断下这个任务的重要性,而后再抛弃。
//ThreadPoolExecutor.CallerRunsPolicy:在调用者所在线程执行该任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
//ThreadPoolExecutor.AbortPolicy:放弃执行任务,抛出RejectedExecutionException异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
//ThreadPoolExecutor.DiscardPolicy:放弃执行任务,不抛出异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
//ThreadPoolExecutor.DiscardOldestPolicy:poll出队一个最先任务,而后尝试执行它
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
复制代码
RxJavaSchedulersHook类 里会生成IO Scheduler,默认调用CachedThreadScheduler。 里面的CachedWorkerPool维护了一个线程管理的队列expiringWorkerQueue, 默认是每隔60s就去经过evictor清除已通过期的线程,线程池没有上限。所以若是短期内有大量任务要执行,会致使不停地建立新线程,因此存在出现pthread_create、OOM、耗费大量系统资源形成卡顿等问题。
public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
private static final long KEEP_ALIVE_TIME;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
static final ThreadWorker SHUTDOWN_THREADWORKER;
static final CachedWorkerPool NONE;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
static {
SHUTDOWN_THREADWORKER = new ThreadWorker(RxThreadFactory.NONE);
SHUTDOWN_THREADWORKER.unsubscribe();
NONE = new CachedWorkerPool(null, 0, null);
NONE.shutdown();
KEEP_ALIVE_TIME = Integer.getInteger("rx.io-scheduler.keepalive", 60);
}
static final class CachedWorkerPool {
private final ThreadFactory threadFactory;
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final CompositeSubscription allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
this.threadFactory = threadFactory;
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeSubscription();
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(thread.getName() + " (Evictor)");
return thread;
}
});
NewThreadWorker.tryEnableCancelPolicy(evictor);
task = evictor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
evictExpiredWorkers();
}
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
);
}
evictorService = evictor;
evictorTask = task;
}
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
//每60s执行一次清除队列中的已过期线程
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
long now() {
return System.nanoTime();
}
void shutdown() {
try {
if (evictorTask != null) {
evictorTask.cancel(true);
}
if (evictorService != null) {
evictorService.shutdownNow();
}
} finally {
allWorkers.unsubscribe();
}
}
}
public CachedThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update =
new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
@Override
public void shutdown() {
for (;;) {
CachedWorkerPool curr = pool.get();
if (curr == NONE) {
return;
}
if (pool.compareAndSet(curr, NONE)) {
curr.shutdown();
return;
}
}
}
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.once = new AtomicBoolean();
this.threadWorker = pool.get();
}
@Override
public void unsubscribe() {
if (once.compareAndSet(false, true)) {
// unsubscribe should be idempotent, so only do this once
// Release the worker _after_ the previous action (if any) has completed
threadWorker.schedule(this);
}
innerSubscription.unsubscribe();
}
@Override
public void call() {
pool.release(threadWorker);
}
@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; } } static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } } } 复制代码
在NewThreadWorker里最终设置executor的初始化构造,这里能够看到是一个定时周期任务线程池,核心线程为1.
/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
复制代码
本方案提供一些自定义的基础类,也能够选择实现接口,自由替换为本身想要的策略。
主要是利用RxJava自己RxJavaPlugins作替换线程池的操做。而后自定义一些scheduler所须要的类,完成相关逻辑。
RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHookImpl());
复制代码
管理逻辑类:IOTaskPriorityType、IOMonitorManager、AppSchedulers IOMonitorManager:负责调用基础监控类,而且替换默认RxJava的IO scheduler,做为主要的逻辑管理类,负责初始化整个监控模块,以及配置基本参数。
AppSchedulers做为替换Schedulers的存在,用于应用层显式地使用自定义IO Scheduler,区别于原来的RxJava的Sheduler.IO。
IOTaskPriorityType:定义不一样的任务类型,用于区分管理。
大体结构以下图所示。
改造后的任务执行流程也作了相应的改变,有别于上面提到的原生流程。对何时建立线程作了调整,优先在限制线程池大小范围内建立新线程执行任务,而不是之前的无限建立或者先阻塞队列等待执行。为了方便对比,把上面的图片也拷贝下来。
关于custominterface包:自定义IO scheduler或者scheduler所使用的线程池时,须要关注这个包下面的接口和抽象类 customScheduler包:已经自定义好的scheduler相关以及提供的基础线程池,能够参考这里的实现,去自定义应用本身的线程池管理的scheduler
大部分时候你只须要关心IOMonitorManager这个入口管理类,其它只在须要自定义或者策略改动时才修改。
/*
* description 自定义IO线程监控管理类,配置监视器的基本参数,开启各类debug方法等
* 监控要求:
* 一、自定义的scheduler须要继承AbstractScheduler
* 二、参考或者直接使用ExecutorSchedulerWorker建立任务
* 三、若是是要替换原始RxJava的IO线程池,须要额外实现IThreadPool,建立本身的线程池类
* 四、IThreadPool的实现类里,线程池的构造使用MonitorThreadPoolExecutor类,便于监控
* 五、编写新的scheduler参考自定义的IOScheduler类
* 六、若是须要自定义SchedulerWork,须要实现Runnable, IBaseWork 接口,继承Scheduler.Worker
* 七、默认提供IOThreadPool和LimitCoreThreadPool两种基础线程池,还有自定义的IOScheduler(用来替换本来RxJava的IOScheduler)
* 八、IOTaskPriorityType优先级类型,RxJava observable在subscribeOn时能够选择传入
* 九、AbstractRejectedExecutionHandler能够作一些拒绝任务的策略动做
*
* @see ExecutorSchedulerWorker
* 使用方式:
* 一、全部public方法提供配置参数
* 二、在基础参数配置完后调用此方法startMonitor
* modify by
*/
//必须在应用第一次使用observable以前设置,这里会替换rxjava的默认IO scheduler。
// 若是只调用setReplaceIOScheduler方法,则替换时用基础库里自带的自定义IO scheduler
//LimitCoreThreadPool不是基础库默认的IO scheduler实现,通常都是替换线程池实现,不直接修改自定义的IO scheduler
IOMonitorManager.getInstance().setReplaceIOScheduler(true)
.setIOThreadPool(LimitCoreThreadPool.getInstance()
.build(2, BuildConfig.DEBUG ? 35 : 35, 15, 1000, false));
List<String> targetList = new ArrayList<>(2);
targetList.add("com.xtc");
//配置基本的监视器参数,下面参数能够在IOMonitorManager的startMonitor后修改
IOMonitorManager.getInstance().setCostTimeLimit(0)
//超出这个活跃线程数就输出到日志
.setThreadActiveCountLimit(30)
//打印当前被监控的线程池信息
.setLogThreadPoolInfo(BuildConfig.DEBUG)
//打印其它非RxJava的IO线程信息
.setLogOtherThreadInfo(true)
.setPackageName(getPackageName())
// .setTargetNameList(list)
//监控器轮询时间,每隔这么久打印一些线程信息
.setMonitorIntervalTime(10)
//是否输出更多日志信息,看方法注释
.setLogMoreInfo(BuildConfig.DEBUG)
//监控器是否启用
.setMonitorEnable(BuildConfig.DEBUG)
//通常在调试时才开启,监控子线程重复切换线程
.setLogRepeatChangeThread(false)
//打印全部任务执行状况,适合打桩分析当前时间段有哪些任务触发
.setLogAllTaskRunningInfo(true)
//是否过滤堆栈
.setFilterStack(!BuildConfig.DEBUG);
复制代码