先贴上这个系列的连接。
rxjava2源码解析(一)基本流程分析
rxjava2源码解析(二)线程切换分析
上一篇说了rxjava2
的线程切换,可是没有深刻说其中的线程池。这篇咱们来深扒一下。java
仍是先说observeOn
,直接看源码:面试
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
这段代码咱们上篇看到过,这里再重复一下。obsererOn
是切换下游观察者线程,咱们看ObserveOnObserver
中的onNext
方法是如何切换线程的。bash
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver继承了runnable接口,意味着能够当作是线程任务来执行。这里表明着在新线程中执行run方法。
worker.schedule(this);
}
}
//ObserveOnObserver继承了runnable接口
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
····//省略一些判断的代码
v = q.poll();
//这里就能够看到,将下游的onNext方法,切换到新线程执行。
a.onNext(v);
}
···
}
}
}
复制代码
这是上游的处理器执行onNext
,传到这里,使用以前设置的线程执行下游的onNext
方法。app
这个worker
究竟是什么?咱们先看schedule
r的createWorker
方法:ide
public abstract Worker createWorker();
复制代码
在Scheduler
类中,createWorker
只是一个接口,子类会重写这个方法,咱们就以Schedulers.newThread()
这个方法建立的Scheduler
为例,来看看这里面的原理。post
//Schedulers类中的newThread静态方法,这里的hock咱们暂且不理,直接返回NEW_THREAD
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//Schedulers类中定义了NEW_THREAD和其余THREAD
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
//NewThreadTask是Schedulers的静态内部类,继承自Callable接口,其中call方法返回一个Scheduler
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
//NewThreadHolder一样是一个静态内部类,里面只有一个静态参数DEFAULT,这里咱们就找到了newThread方法返回的本尊NewThreadScheduler
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
复制代码
如上面代码和注释所示,咱们直接看NewThreadScheduler
的源码:ui
/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
//这里Thread.MIN_PRIORITY为1,Thread.MAX_PRIORITY为10.Thread.NORM_PRIORITY为5.若是咱们不作任何更改,这里的priority的值就为5.
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
复制代码
这里createWorker
方法返回的是一个NewThreadWorker
对象。咱们总算找到了worker
的来源,须要注意这里的构造参数是threadFactory
。来看看NewThreadWorker
的源码。this
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//hock机制
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//用一个ScheduledRunnable把传入的runnable包装一下,本质上没区别。
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
....//省略判断性代码
Future<?> f;
try {
if (delayTime <= 0) {
//executor由构造方法中建立
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
....
}
复制代码
这里咱们就能够看到,前面调用worker.schedule(this)
,最终走到了executor.submit(sr)
。这里的sr
只是前面ObserveOnObserver
的包装。executor
在构造方法中建立。来看看executor
是什么:spa
//SchedulerPoolFactory类中的静态方法
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
复制代码
//Executors类的静态方法
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
复制代码
OK,executor
是一个ScheduledThreadPoolExecutor
,标准的工做线程池。核心线程数为1,threadFactory
是前面NewThreadWorker
构造参数中的RxThreadFactory
。他会给thread
按照命名格式进行命名。线程
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
public RxThreadFactory(String prefix) {
this(prefix, Thread.NORM_PRIORITY, false);
}
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
...
}
复制代码
observeOn
在subscribe
方法中,新建一个worker
对象。这个worker
对象是根据设置的scheduler
建立的。而后在新建一个ObserveOnObserver
对象,将上游与之订阅。ObserveOnObserver
的onNext
方法中,会调用worker.schedule(this)
,将自己做为runnable
传入到worker
中。newThreadScheduler
为例,他建立的worker
是一个NewThreadWorker
实例。在实例构造方法中,会根据传入的threadFactory
新建一个ScheduledThreadPool
线程池。NewThreadWorker
的shedule
方法,就是将ObserveOnObserver
做为一个runnable
放在一个新的线程池中执行。ObserveOnObserver
的run
方法,就是用来执行下游的onNext
,将数据传输下去。从而达到了,切换下游onNext
线程的目的。subscribeOn
是用来切换上游发射器线程。切换原理上一篇有说过,其中线程池相关跟上面observeOn
差很少,这里就不赘述了。
上面就是rxjava2
线程切换原理分析了,后面再有人面试问你rxjava2
里面的线程池是哪种,你就能够自信的说出:ScheduledThreadPool
。