先贴上系列的连接:
rxjava2源码解析(一)基本流程分析
rxjava2源码解析(二)线程切换分析
rxjava2源码解析(三)线程池原理分析java
上一篇介绍了rxjava2源码解析(一)基本流程分析,这篇准备说说线程切换。bash
仍是先从最基本的使用开始看:异步
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "s = " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
相较于前面的实例,多了两段代码,subscribeOn
,observeOn
。咱们知道,subscribeOn
是用来调整被观察者(发射源)的线程,而observeOn
是调整观察者(处理器)的线程。ide
咱们先从observeOn
的源码来看它是如何控制处理器的线程:post
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//判空代码,和hock相关机制,咱们能够忽略,直接看ObservableObserveOn
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
首先咱们知道,observeOn
、subscribeOn
都是observable
这个装饰器的方法,他们的返回值也都是observable
(前面讲过这是装饰器模式)。这里仍是老样子,直接看ObservableObserveOn
这个对象就好了。ui
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
//将上游source存在本地
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//若是是当前线程,就不作处理
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
重点仍是subscribeActual
这个方法。咱们看到,初始化ObservableObserveOn
的时候传入了咱们设置的scheduler
。因此在subscribeActual
里,先判断scheduler
是不是TrampolineScheduler
。TrampolineScheduler
是什么东西呢?咱们看官方注释:this
/**
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
* after the current unit of work is completed.
*计划在当前线程上工做,但不会当即执行。 将工做放入队列并在当前工做单元完成后执行。
*/
复制代码
OK,一目了然,若是是当前线程,不作任何处理,直接用绑定起来。不然,新建一个ObserveOnObserver
对象,将上游装饰器(这里的上游是代码流程上的上游,即调用observeOn
的装饰器)先于这个对象绑定。这个看起来是否是很眼熟?咱们回看上一篇里面说的ObservableCreate
里的subscribeActual
方法,对比一下有什么不一样。spa
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
能够看到,二者都有在内部新建一个对象,并将上游装饰器(或者是发射源)与之绑定。区别在于,ObservableCreate
内部是新建一个发射器CreateEmitter
对象,而ObservableObserveOn
内部是新建一个处理器ObserveOnObserver
对象。ObservableCreate
是将以前存储的上游发射源与发射器绑定,ObservableObserveOn
是将上游装饰器与处理器绑定。
根据这一点,咱们能够将装饰器分为两种样式:线程
ObservableCreate
相似,属于每一条流水线的开端,自己是装饰器,上游是发射源,内部生成一个发射器,处理最开始的发射流程。咱们称之为起始装饰器ObservableObserveOn
相似,属于流水线中间流程,自己是装饰器,上游是装饰器,内部新建一个处理器来处理上游事件,下游是处理器或者其余装饰器。咱们称之为流程装饰器。下面咱们能够看看ObserveOnObserver
的源码。code
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
//同步异步相关,暂时无论
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//重点是这个
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver继承了runnable接口,意味着能够当作是线程任务来执行。这里表明着在新线程中执行run方法。
worker.schedule(this);
}
}
//ObserveOnObserver继承了runnable接口
@Override
public void run() {
//同步异步相关,咱们直接看drainNormal()
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
····//省略一些判断的代码
v = q.poll();
//这里就能够看到,将下游的onNext方法,切换到新线程执行。
a.onNext(v);
}
···
}
}
}
复制代码
这里咱们就不深刻细究了(会在下一篇中详细来讲),只须要知道,这是上游的处理器执行onNext
,传到这里,使用以前设置的线程执行下游的onNext
方法。因此这里就完成了线程切换功能。而且这个切换,延续到下游全部处理的onNext
。若是在下游再次调用ObserverOn
,就会将后面的处理器切换到另一个线程。
因此,咱们能够获得结论,ObserverOn
能够屡次调用,每次调用会做用于下游的全部处理器,直到遇到新的ObserverOn
。
接下来能够看看,SubscribeOn
的源码内容。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
//判空和hock机制代码,忽略。直接看ObservableSubscribeOn
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
同样的配方,同样的味道,咱们能够直接看ObservableSubscribeOn
类的subscribeActual
方法。
@Override
public void subscribeActual(final Observer<? super T> observer) {
//建立SubscribeOnObserver内部类对象
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//这里调用了下游的onSubscribe
observer.onSubscribe(parent);
//scheduler.scheduleDirect方法返回一个disposable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
复制代码
这里能够看到,ObservableSubscribeOn
一样是一个流程装饰器,在调用subscribe
的时候,内部新建一个处理器,这个处理器与下游处理器相互持有。这里与ObservableObserveOn
有所不一样,并无当即执行subscribe
将上游装饰器与内部处理器链接起来,而是执行了:
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
复制代码
咱们看看SubscribeTask
是啥:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//这里的source是上游装饰器,parent是内部处理器
source.subscribe(parent);
}
}
复制代码
很简单,SubscribeTask
继承了Runnable
,其中run
方法是执行上游装饰器的subscribe
方法。咱们再看scheduleDirect
方法。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
复制代码
这里createWorker
方法,是每一个不一样的scheduler
自行重写的。咱们不深究这块的代码,只须要理解为,在subscribeActual
中,根据前面设置的scheduler
,新建一个线程,在线程中当即执行上游source
的subscribe
方法,与内部处理器SubscribeOnObserver
绑定。SubscribeOnObserver
处理器与通常处理器没什么区别,就不贴代码了。
那么上面就实现了,在另一个线程执行subscribe
方法。看过前面基本流程的都知道,这里基本上就肯定了,上游全部subscribe
方法执行的线程。因此咱们知道了,subscribeOn()
方法会切换上游全部的subscribe
方法,至于发射源所在的线程,只跟离它最近的subscribeOn()
方法中所切换的线程有关。因此说,subscribeOn()
方法只须要执行一次,且只有第一次是生效的。
咱们看到,终端Observer
这里有一个onSubscribe
方法,咱们通常在这里进行一些初始化的操做,而前面的源码中也有不少地方有onSubscribe
方法。那这个方法究竟是执行在哪一个线程呢?咱们从源码中寻找答案。
想要知道onSubscribe
方法在哪一个线程,只须要看,在离终端处理器最近的上游装饰器中,是在哪一个线程调用onSubscribe()
的。咱们来看看subscribeOn
和observeOn
有没有更改它所运行的线程。 先看ObserveOn
,毕竟这是做用于下游处理器的线程切换方法。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
从上面的源码能够看到,ObservableObserveOn
类中的subscribeActual
并无onSubscribe
相关的内容。咱们看看ObserveOnObserver
的源码:
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
·····//不相干代码
//这里的downstream是指下游处理器
downstream.onSubscribe(this);
}
}
复制代码
咱们看到,在ObserveOnObserver
被调用onSubscribe
的时候,会调用下游的onSubscribe
,参数是自己。也就是说,ObserveOn
并无切换onSubscribe
方法的线程。
再看subscribeOn
方法,很明显是直接在subscribeActual
中执行下游处理器的onSubscribe
方法。
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
复制代码
这两个线程切换的方法,都没有更改onSubscribe()
方法的线程。因此咱们能肯定,在终端的处理器Observer
里面的onSubscribe()
方法,是跟外部在同一个线程上。
observeOn
做用于下游的全部处理器,能够屡次调用。每个处理器所运行的线程,决定于它最近的上游observeOn
方法中指定的线程。subscribeOn
做用于上游的发射源,主要是用来指定subscribe
方法所在的线程。针对于发射源,只有离它最近的下游subscribeOn
方法中所指定的线程才生效。因此subscribeOn
方法屡次调用并无效果。onSubscribe()
方法并不会跟随内部线程切换而切换线程。运行在哪一个线程,只跟外部建立这一整套观察者模式的线程一致。OK,上面就是本篇的所有内容了,下一篇咱们深刻看一看,rxjava2 里面的线程池和线程切换究竟是怎么实现的!
每周更新,敬请期待~