RxJava的一个重要优势就在于能够方便的切换线程,因此就想从源码的角度探索下其切换线程的原理java
ObserveOn用于切换下游执行线程,能够屡次调用,每调用一次会切换一次,先来看一个例子bash
fun threadName(desc: String) {
println("$desc ${Thread.currentThread().name}")
}
fun main() {
Observable.create<Int> {
threadName("subscribe")
it.onNext(1)
it.onNext(2)
it.onComplete()
}.observeOn(Schedulers.io())
.subscribe(object : Observer<Int> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
threadName("onSubscribe")
}
override fun onError(e: Throwable) {
threadName("onError")
}
override fun onNext(t: Int) {
threadName("onNext")
}
})
}
复制代码
输出结果网络
onSubscribe main
subscribe main
onNext RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onComplete RxCachedThreadScheduler-1
复制代码
说好的observeOn
切换下游执行线程,怎么onSubscribe
方法会在主线程中调用?缘由是observeOn
方法生成的ObserveOnObserver实例并不会对onSubscribe
事件作切换线程的操做,这个等下看了源码就理解了。那么observeOn
是怎么把下游的onNext
、onComplete
切换到子线程执行的呢?来看看observeOn
的源码实现ide
// Observable.java
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
observeOn
方法调用后会返回一个ObservableObserveOn实例,通过上篇文章的分析主要关注其subscribeActual
方法就行oop
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
复制代码
包装了一下下游Observer,能够猜想这个Observer内部会将onNext
等事件转到其它线程进行执行ui
public void onSubscribe(Disposable d) {
...
downstream.onSubscribe(this);
}
public void onNext(T t) {
queue.offer(t);
schedule();
}
public void onError(Throwable t) {
schedule();
}
public void onComplete() {
schedule();
}
void schedule() {
worker.schedule(this);
}
复制代码
能够看到onSubscribe
直接在当前线程执行了没有进行线程切换,onNext
、onError
、onComplete
则是都调用了schedule
方法this
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
// EventLoopWorker.java
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
// NewThreadWorker.java
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
复制代码
executor是一个SchedulerThreadPoolExecutor实例,最终都会在线程池中运行那个Runnable实例也就是ObserveOnObserver实例,因此就看看其run
方法spa
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
for (;;) {
for (;;) {
try {
v = q.poll();
} catch (Throwable ex) {
a.onError(ex);
return;
}
a.onNext(v);
}
}
}
void drainFused() {
for (;;) {
if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
}
}
复制代码
能够看到onNext
、onError
、onComplete
都在这个方法里面调用,所以这些方法就运行在了线程池中了,这样就成功的切换了线程,那么屡次调用observeOn有效果吗?屡次调用其实就是在一个线程池中的某个线程中再次开启了一个线程,因此是有效果的。接着看看subscribeOn
这个方法线程
subscribeOn用于上游执行线程,而且屡次调用只有第一次会生效,先来看一个例子code
fun main() {
Observable.create<Int> {
threadName("subscribe")
it.onNext(1)
it.onNext(2)
it.onComplete()
}.subscribeOn(Schedulers.io())
.subscribe(object : Observer<Int> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
threadName("onSubscribe")
}
override fun onError(e: Throwable) {
threadName("onError")
}
override fun onNext(t: Int) {
threadName("onNext")
}
})
}
复制代码
输出结果
onSubscribe main
subscribe RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onComplete RxCachedThreadScheduler-1
复制代码
咦!不是说subscribeOn切换的只是上游的执行线程,为何onNext
、onComplete
也会在子线程中执行?其实答案很简单该段代码中没有调用observeOn因此下游执行线程并无发生改变,所以上游在子线程中发送一个onNext
事件过来,下游的onNext
方法天然也会在子线程中执行,那么subscribeOn
内部到底作了什么才会致使上游会在子线程中执行呢,来看看其源码实现
public final Observable<T> subscribeOn(Scheduler scheduler) {
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
建立了一个ObservableSubscribeOn
实例并将Scheduler
实例传入,接着看看其subscribeActual
实现
// ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
复制代码
这里直接回调了下游Observer
实例的onSubscribe
方法,接着执行scheduleDirect
,咱们继续跟进
// Schedule.java
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// createWorker是抽象方法,IoScheduler会返回一个EventLoopWorker实例
final Worker w = createWorker();
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
复制代码
调用到了scheduleActual
方法,这个方法上面在分析observeOn
时已经分析过,内部会在线程池中执行action
这个Runnable
实例,那么主要看看SubscribeTask
的run
方法
// SubscribeTask.java
public void run() {
source.subscribe(parent);
}
复制代码
里面的逻辑很简单就是在线程池中执行上游的subscribe
方法,所以上游的全部事件都将在该线程池中执行。那么为何说subscribeOn
只能生效一次呢?其实真正的来讲subscribeOn
也能够生效屡次,只不过最上游发送事件的线程是由第一次subscribeOn
调用时肯定的,举个例子
fun main() {
Observable.create<Int> {
threadName("subscribe")
it.onNext(1)
it.onNext(2)
it.onComplete()
}.subscribeOn(Schedulers.io())
.map { threadName("map"); it + 1 }
.subscribeOn(Schedulers.computation())
.subscribe(object : Observer<Int> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
threadName("onSubscribe")
}
override fun onError(e: Throwable) {
threadName("onError")
}
override fun onNext(t: Int) {
threadName("onNext")
}
})
// 主线程睡眠下,防止RxJava生成的daemon线程自动退出
Thread.sleep(200)
}
复制代码
输出结果
onSubscribe main
subscribe RxCachedThreadScheduler-1
map RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
map RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onComplete RxCachedThreadScheduler-1
复制代码
在这里例子中调用了两次subscribeOn
可是看起来只有第一次才生效,其实map
方法生成的ObservableMap
实例的subscribe
方法是在计算线程池中执行的,下面来看一下observeOn
和subscribeOn
进行组合的状况
假设有这么一个需求: 先注册而后登陆,须要知足:1. 注册成功后要能更新UI,2. 注册失败将再也不进行登陆,3. 登陆成功或者失败也须要更新UI。因为网络请求不能在主线程执行,所以咱们就须要用到线程切换,下面是示例代码
private var disposable: Disposable? = null
private fun registerAndLogin(listener: Listener) {
getRegisterObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
threadName("doOnNext")
if (!it.success) {
listener.registerFail()
disposable?.dispose()
} else {
listener.registerSuccess()
}
}
.observeOn(Schedulers.io())
.flatMap {
getLoginObservable()
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<LoginModel> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
disposable = d
threadName("onSubscribe")
}
override fun onNext(model: LoginModel) {
if (model.success) listener.loginSuccess(model.token)
else listener.loginFail()
threadName("onNext")
}
override fun onError(e: Throwable) {}
})
}
private fun getRegisterObservable(): Observable<RegisterModel> {
return Observable.create {
// 模拟网络耗时
Thread.sleep(500)
threadName("register")
it.onNext(RegisterModel(true))
it.onComplete()
}
}
private fun getLoginObservable(): Observable<LoginModel> {
return Observable.create {
// 模拟网络耗时
Thread.sleep(500)
threadName("login")
it.onNext(LoginModel(true, "token"))
it.onComplete()
}
}
data class RegisterModel(val success: Boolean)
data class LoginModel(val success: Boolean, val token: String)
interface Listener {
fun registerSuccess()
fun registerFail()
fun loginSuccess(token: String)
fun loginFail()
}
private fun threadName(desc: String) {
Log.d("Thread", "$desc ${Thread.currentThread().name}")
}
复制代码
输出结果以下
onSubscribe main
register RxCachedThreadScheduler-1
doOnNext main
login RxCachedThreadScheduler-2
onNext main
onComplete main
复制代码
这个输出彻底知足了咱们的需求,网络请求在子线程,UI更新在主线程,如今来分析下为何会是这个结果。
subscribeOn
这个方法在这条链上哪一个地方调用都不要紧其并不会影响结果,由于其只是决定了注册操做所在的线程doOnNext
中,咱们知道登陆操做是在子线程,因此咱们这里要使用observeOn
将线程切换到主线程observeOn
将线程再次切换到子线程observeOn
将线程切换到主线程