本文由
玉刚说写做平台
提供写做赞助java原做者:
四月葡萄
git版权声明:本文版权归微信公众号 玉刚说 全部,未经许可,不得以任何形式转载github
本文主要是对RxJava的消息订阅和线程切换进行源码分析,相关的使用方式等不做详细介绍。数据库
本文源码基于rxjava:2.1.14
。json
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.缓存
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.安全
上面这段话来自于RxJava在github上面的官方介绍。翻译成中文的大概意思就是:微信
RxJava是一个在Java虚拟机上的响应式扩展,经过使用可观察的序列将异步和基于事件的程序组合起来的一个库。markdown
它扩展了观察者模式来支持数据/事件序列,而且添加了操做符,这些操做符容许你声明性地组合序列,同时抽象出要关注的问题:好比低级线程、同步、线程安全和并发数据结构等。网络
简单点来讲, RxJava就是一个使用了观察者模式,可以异步的库。
上面说到,RxJava扩展了观察者模式,那么什么是观察模式呢?咱们先来了解一下。
举个例子,以微信公众号为例,一个微信公众号会不断产生新的内容,若是咱们读者对这个微信公众号的内容感兴趣,就会订阅这个公众号,当公众号有新内容时,就会推送给咱们。咱们收到新内容时,若是是咱们感兴趣的,就会点进去看下;若是是广告的话,就可能直接忽略掉。这就是咱们生活中遇到的典型的观察者模式。
在上面的例子中,微信公众号就是一个被观察者(Observable
),不断的产生内容(事件),而咱们读者就是一个观察者(Observer
) ,经过订阅(subscribe
)就可以接受到微信公众号(被观察者)推送的内容(事件),根据不一样的内容(事件)作出不一样的操做。
RxJava的扩展观察者模式中就是存在这么4种角色:
角色 | 角色功能 |
---|---|
被观察者(Observable ) |
产生事件 |
观察者(Observer ) |
响应事件并作出处理 |
事件(Event ) |
被观察者和观察者的消息载体 |
订阅(Subscribe ) |
链接被观察者和观察者 |
RxJava中的事件分为三种类型:Next
事件、Complete
事件和Error
事件。具体以下:
事件类型 | 含义 | 说明 |
---|---|---|
Next |
常规事件 | 被观察者能够发送无数个Next事件,观察者也能够接受无数个Next事件 |
Complete |
结束事件 | 被观察者发送Complete事件后能够继续发送事件,观察者收到Complete事件后将不会接受其余任何事件 |
Error |
异常事件 | 被观察者发送Error事件后,其余事件将被终止发送,观察者收到Error事件后将不会接受其余任何事件 |
在分析RxJava消息订阅原理前,咱们仍是先来看下它的简单使用步骤。这里为了方便讲解,就不用链式代码来举例了,而是采用分步骤的方式来逐一说明(平时写代码的话仍是建议使用链式代码来调用,由于更加简洁)。其使用步骤以下:
- 建立被观察者(
Observable
),定义要发送的事件。- 建立观察者(
Observer
),接受事件并作出响应操做。- 观察者经过订阅(
subscribe
)被观察者把它们链接到一块儿。
这里咱们就根据上面的步骤来实现这个例子,以下:
//步骤1. 建立被观察者(Observable),定义要发送的事件。 Observable observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onNext("文章3"); emitter.onComplete(); } }); //步骤2. 建立观察者(Observer),接受事件并作出响应操做。 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; //步骤3. 观察者经过订阅(subscribe)被观察者把它们链接到一块儿。 observable.subscribe(observer); 复制代码
其输出结果为:
onSubscribe onNext : 文章1 onNext : 文章2 onNext : 文章3 onComplete 复制代码
下面咱们对消息订阅过程当中的源码进行分析,分为两部分:建立被观察者过程和订阅过程。
首先来看下建立被观察者(Observable
)的过程,上面的例子中咱们是直接使用Observable.create()
来建立Observable
,咱们点进去这个方法看下。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
能够看到,create()
方法中也没作什么,就是建立一个ObservableCreate
对象出来,而后把咱们自定义的ObservableOnSubscribe
做为参数传到ObservableCreate
中去,最后就是调用 RxJavaPlugins.onAssembly()
方法。
咱们先来看看ObservableCreate
类:
public final class ObservableCreate<T> extends Observable<T> {//继承自Observable public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source;//把咱们建立的ObservableOnSubscribe对象赋值给source。 } } 复制代码
能够看到,ObservableCreate
是继承自Observable
的,而且会把ObservableOnSubscribe
对象给存起来。
再看下RxJavaPlugins.onAssembly()
方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { //省略无关代码 return source; } 复制代码
很简单,就是把上面建立的ObservableCreate
给返回。
因此Observable.create()
中就是把咱们自定义的ObservableOnSubscribe
对象从新包装成一个ObservableCreate
对象,而后返回这个ObservableCreate
对象。 注意,这种从新包装新对象的用法在RxJava中会频繁用到,后面的分析中咱们还会屡次遇到。 放个图好理解,包起来哈~
Observable.create()
的时序图以下所示:
接下来咱们就看下订阅过程的代码,一样,点进去Observable.subscribe()
:
public final void subscribe(Observer<? super T> observer) { //省略无关代码 observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); //省略无关代码 } 复制代码
能够看到,实际上其核心的代码也就两句,咱们分开来看下:
public static <T> Observer<? super T> onSubscribe( @NonNull Observable<T> source, @NonNull Observer<? super T> observer) { //省略无关代码 return observer; } 复制代码
跟以前代码同样,这里一样也是把原来的observer
返回而已。 再来看下subscribeActual()
方法。
protected abstract void subscribeActual(Observer<? super T> observer); 复制代码
Observable
类的subscribeActual()
中的方法是一个抽象方法,那么其具体实如今哪呢?还记得咱们前面建立被观察者的过程吗,最终会返回一个ObservableCreate
对象,这个ObservableCreate
就是Observable
的子类,咱们点进去看下:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); //触发咱们自定义的Observer的onSubscribe(Disposable)方法 observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
能够看到,subscribeActual()
方法中首先会建立一个CreateEmitter
对象,而后把咱们自定义的观察者observer
做为参数给传进去。这里一样也是包装起来,放个图:
CreateEmitter
实现了
ObservableEmitter
接口和
Disposable
接口,以下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { //代码省略 } 复制代码
而后就是调用了observer.onSubscribe(parent)
,实际上就是调用观察者的onSubscribe()
方法,即告诉观察者已经成功订阅到了被观察者。
继续往下看,subscribeActual()
方法中会继续调用source.subscribe(parent)
,这里的source
就是ObservableOnSubscribe
对象,即这里会调用ObservableOnSubscribe
的subscribe()
方法。 咱们具体定义的subscribe()
方法以下:
Observable observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onNext("文章3"); emitter.onComplete(); } }); 复制代码
ObservableEmitter
,顾名思义,就是被观察者发射器。 因此,subscribe()
里面的三个onNext()
方法和一个onComplete()
会逐一被调用。 这里的ObservableEmitter
接口其具体实现为CreateEmitter
,咱们看看CreateEmitte
类的onNext()
方法和onComplete()
的实现:
//省略其余代码 @Override public void onNext(T t) { //省略无关代码 if (!isDisposed()) { //调用观察者的onNext() observer.onNext(t); } } @Override public void onComplete() { if (!isDisposed()) { try { //调用观察者的onComplete() observer.onComplete(); } finally { dispose(); } } } 复制代码
能够看到,最终就是会调用到观察者的onNext()
和onComplete()
方法。至此,一个完整的消息订阅流程就完成了。 另外,能够看到,上面有个isDisposed()
方法能控制消息的走向,即可以切断消息的传递,这个后面再来讲。
Observable
(被观察者)和Observer
(观察者)创建链接(订阅)以后,会建立出一个发射器CreateEmitter
,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件作出响应处理。能够看到,是订阅以后,Observable
(被观察者)才会开始发送事件。
放张事件流的传递图:
再来看下订阅过程的时序流程图:
以前有提到过切断消息的传递,咱们先来看下如何使用:
Observable observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onNext("文章3"); emitter.onComplete(); } }); Observer<String> observer = new Observer<String>() { private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe : " + d); mDisposable=d; } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); mDisposable.dispose(); Log.d(TAG, "切断观察者与被观察者的链接"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; observable.subscribe(observer); 复制代码
输出结果为:
onSubscribe : null onNext : 文章1 切断观察者与被观察者的链接 复制代码
能够看到,要切断消息的传递很简单,调用下Disposable
的dispose()
方法便可。调用dispose()
以后,被观察者虽然能继续发送消息,可是观察者却收不到消息了。 另外有一点须要注意,上面onSubscribe
输出的Disposable
值是"null"
,并非空引用null
。
咱们这里来看看下dispose()
的实现。Disposable
是一个接口,能够理解Disposable
为一个链接器,调用dispose()
后,这个链接器将会中断。其具体实如今CreateEmitter
类,以前也有提到过。咱们来看下CreateEmitter
的dispose()
方法:
@Override public void dispose() { DisposableHelper.dispose(this); } 复制代码
就是调用DisposableHelper.dispose(this)
而已。
public enum DisposableHelper implements Disposable { DISPOSED ; //其余代码省略 public static boolean isDisposed(Disposable d) { //判断Disposable类型的变量的引用是否等于DISPOSED //即判断该链接器是否被中断 return d == DISPOSED; } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { //这里会把field给设为DISPOSED current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } } 复制代码
能够看到DisposableHelper
是一个枚举类,而且只有一个值:DISPOSED
。dispose()
方法中会把一个原子引用field
设为DISPOSED
,即标记为中断状态。所以后面经过isDisposed()
方法便可以判断链接器是否被中断。
再回头看看CreateEmitter
类中的方法:
@Override public void onNext(T t) { //省略无关代码 if (!isDisposed()) { //若是没有dispose(),才会调用onNext() observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { //若是dispose()了,会调用到这里,即最终会崩溃 RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { //省略无关代码 if (!isDisposed()) { try { //若是没有dispose(),才会调用onError() observer.onError(t); } finally { //onError()以后会dispose() dispose(); } //若是没有dispose(),返回true return true; } //若是dispose()了,返回false return false; } @Override public void onComplete() { if (!isDisposed()) { try { //若是没有dispose(),才会调用onComplete() observer.onComplete(); } finally { //onComplete()以后会dispose() dispose(); } } } 复制代码
从上面的代码能够看到:
- 若是没有
dispose
,observer.onNext()
才会被调用到。onError()
和onComplete()
互斥,只能其中一个被调用到,由于调用了他们的任意一个以后都会调用dispose()
。- 先
onError()
后onComplete()
,onComplete()
不会被调用到。反过来,则会崩溃,由于onError()
中抛出了异常:RxJavaPlugins.onError(t)
。其实是dispose
后继续调用onError()
都会炸。
上面的例子和分析都是在同一个线程中进行,这中间也没涉及到线程切换的相关问题。可是在实际开发中,咱们一般须要在一个子线程中去进行一些数据获取操做,而后要在主线程中去更新UI,这就涉及到线程切换的问题了,经过RxJava咱们也能够把线程切换写得还简洁。
关于RxJava如何使用线程切换,这里就不详细讲了。 咱们直接来看一个例子,并分别打印RxJava在运行过程当中各个角色所在的线程。
new Thread() { @Override public void run() { Log.d(TAG, "Thread run() 所在线程为 :" + Thread.currentThread().getName()); Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "Observable subscribe() 所在线程为 :" + Thread.currentThread().getName()); emitter.onNext("文章1"); emitter.onNext("文章2"); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "Observer onSubscribe() 所在线程为 :" + Thread.currentThread().getName()); } @Override public void onNext(String s) { Log.d(TAG, "Observer onNext() 所在线程为 :" + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { Log.d(TAG, "Observer onError() 所在线程为 :" + Thread.currentThread().getName()); } @Override public void onComplete() { Log.d(TAG, "Observer onComplete() 所在线程为 :" + Thread.currentThread().getName()); } }); } }.start(); 复制代码
输出结果为:
Thread run() 所在线程为 :Thread-2 Observer onSubscribe() 所在线程为 :Thread-2 Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 Observer onNext() 所在线程为 :main Observer onNext() 所在线程为 :main Observer onComplete() 所在线程为 :main 复制代码
从上面的例子能够看到:
Observer
(观察者)的onSubscribe()
方法运行在当前线程中。Observable
(被观察者)中的subscribe()
运行在subscribeOn()
指定的线程中。Observer
(观察者)的onNext()
和onComplete()
等方法运行在observeOn()
指定的线程中。
下面咱们对线程切换的源码进行一下分析,分为两部分:subscribeOn()
和observeOn()
。
首先来看下subscribeOn()
,咱们的例子中是这么个使用的:
.subscribeOn(Schedulers.io())
复制代码
subscribeOn()
方法要传入一个Scheduler
类对象做为参数,Scheduler
是一个调度类,可以延时或周期性地去执行一个任务。
经过Schedulers
类咱们能够获取到各类Scheduler
的子类。RxJava提供了如下这些线程调度类供咱们使用:
Scheduler类型 | 使用方式 | 含义 | 使用场景 |
---|---|---|---|
IoScheduler | Schedulers.io() |
io操做线程 | 读写SD卡文件,查询数据库,访问网络等IO密集型操做 |
NewThreadScheduler | Schedulers.newThread() |
建立新线程 | 耗时操做等 |
SingleScheduler | Schedulers.single() |
单例线程 | 只需一个单例线程时 |
ComputationScheduler | Schedulers.computation() |
CPU计算操做线程 | 图片压缩取样、xml,json解析等CPU密集型计算 |
TrampolineScheduler | Schedulers.trampoline() |
当前线程 | 须要在当前线程当即执行任务时 |
HandlerScheduler | AndroidSchedulers.mainThread() |
Android主线程 | 更新UI等 |
下面咱们来看下Schedulers.io()
的代码,其余的Scheduler
子类都差很少,就不逐以分析了,有兴趣的请自行查看哈~
@NonNull static final Scheduler IO; @NonNull public static Scheduler io() { //1.直接返回一个名为IO的Scheduler对象 return RxJavaPlugins.onIoScheduler(IO); } static { //省略无关代码 //2.IO对象是在静态代码块中实例化的,这里会建立按一个IOTask() IO = RxJavaPlugins.initIoScheduler(new IOTask()); } static final class IOTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { //3.IOTask中会返回一个IoHolder对象 return IoHolder.DEFAULT; } } static final class IoHolder { //4.IoHolder中会就是new一个IoScheduler对象出来 static final Scheduler DEFAULT = new IoScheduler(); } 复制代码
能够看到,Schedulers.io()
中使用了静态内部类的方式来建立出了一个单例IoScheduler
对象出来,这个IoScheduler
是继承自Scheduler的。这里mark一发,后面会用到这个IoScheduler
的。
而后,咱们就来看下subscribeOn()的代码:
public final Observable<T> subscribeOn(Scheduler scheduler) { //省略无关代码 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } 复制代码
能够看到,首先会将当前的Observable
(其具体实现为ObservableCreate
)包装成一个新的ObservableSubscribeOn
对象。 放个图:
跟前面同样,RxJavaPlugins.onAssembly()
也是将ObservableSubscribeOn
对象原样返回而已,这里就不看了。 能够看下ObservableSubscribeOn
的构造方法:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } 复制代码
也就是把source
和scheduler
这两个保存一下,后面会用到。
而后subscribeOn()
方法就完了。好像也没作什么,就是从新包装一下对象而已,而后将新对象返回。即将一个旧的被观察者包装成一个新的被观察者。
接下来咱们回到订阅过程,为何要回到订阅过程呢?由于事件的发送是从订阅过程开始的啊。 虽然咱们这里用到了线程切换,可是呢,其订阅过程前面的内容跟上一节分析的是同样的,咱们这里就不重复了,直接从不同的地方开始。还记得订阅过程当中Observable
类的subscribeActual()
是个抽象方法吗?所以要看其子类的具体实现。在上一节订阅过程当中,其具体实现是在ObservableCreate
类。可是因为咱们调用subscribeOn()
以后,ObservableCreate
对象被包装成了一个新的ObservableSubscribeOn
对象了。所以咱们就来看看ObservableSubscribeOn
类中的subscribeActual()
方法:
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } 复制代码
subscribeActual()
中一样也将咱们自定义的Observer
给包装成了一个新的SubscribeOnObserver
对象。一样,放张图:
Observer
的
onSubscribe()
方法,能够看到,到目前为止,还没出现过任何线程相关的东西,因此
Observer
的
onSubscribe()
方法就是运行在当前线程中。 而后咱们重点看下最后一行代码,首先建立一个
SubscribeTask
对象,而后就是调用
scheduler.scheduleDirect()
.。 咱们先来看下
SubscribeTask
类:
//SubscribeTask是ObservableSubscribeOn的内部类 final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { //这里的source就是咱们自定义的Observable对象,即ObservableCreate source.subscribe(parent); } } 复制代码
很简单的一个类,就是实现了Runnable
接口,而后run()
中调用Observer.subscribe()
。
再来看下scheduler.scheduleDirect()
方法
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } 复制代码
往下看:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //createWorker()在Scheduler类中是个抽象方法,因此其具体实如今其子类中 //所以这里的createWorker()应当是在IoScheduler中实现的。 //Worker中能够执行Runnable final Worker w = createWorker(); //实际上decoratedRun仍是这个run对象,即SubscribeTask final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //将Runnable和Worker包装成一个DisposeTask DisposeTask task = new DisposeTask(decoratedRun, w); //Worker执行这个task w.schedule(task, delay, unit); return task; } 复制代码
咱们来看下建立Worker
和Worker
执行任务的过程。
final AtomicReference<CachedWorkerPool> pool; public Worker createWorker() { //就是new一个EventLoopWorker,而且传一个Worker缓存池进去 return new EventLoopWorker(pool.get()); } static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); //构造方法 EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); //从缓存Worker池中取一个Worker出来 this.threadWorker = pool.get(); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { //省略无关代码 //Runnable交给threadWorker去执行 return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 复制代码
注意,不一样的Scheduler
类会有不一样的Worker
实现,由于Scheduler
类最终是交到Worker
中去执行调度的。
咱们来看下Worker
缓存池的操做:
static final class CachedWorkerPool implements Runnable { ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { //若是缓冲池不为空,就从缓存池中取threadWorker ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } //若是缓冲池中为空,就建立一个并返回。 ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } } 复制代码
咱们再来看下threadWorker.scheduleActual()
。 ThreadWorker
类没有实现scheduleActual()
方法,其父类NewThreadWorker
实现了该方法,咱们点进去看下:
public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; volatile boolean disposed; public NewThreadWorker(ThreadFactory threadFactory) { //构造方法中建立一个ScheduledExecutorService对象,能够经过ScheduledExecutorService来使用线程池 executor = SchedulerPoolFactory.create(threadFactory); } public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { //这里的decoratedRun实际仍是run对象 Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //将decoratedRun包装成一个新对象ScheduledRunnable ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); //省略无关代码 if (delayTime <= 0) { //线程池中当即执行ScheduledRunnable f = executor.submit((Callable<Object>)sr); } else { //线程池中延迟执行ScheduledRunnable f = executor.schedule((Callable<Object>)sr, delayTime, unit); } //省略无关代码 return sr; } } 复制代码
这里的executor
就是使用线程池去执行任务,最终SubscribeTask
的run()
方法会在线程池中被执行,即Observable
的subscribe()
方法会在IO线程中被调用。这与上面例子中的输出结果符合:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 复制代码
Observer
(观察者)的onSubscribe()
方法运行在当前线程中,由于在这以前都没涉及到线程切换。- 若是设置了
subscribeOn(指定线程)
,那么Observable
(被观察者)中subscribe()
方法将会运行在这个指定线程中去。
来张总的subscribeOn()
切换线程时序图
若是咱们屡次设置subscribeOn()
,那么其执行线程是在哪个呢?先来看下例子
//省略先后代码,看重点部分 .subscribeOn(Schedulers.io())//第一次 .subscribeOn(Schedulers.newThread())//第二次 .subscribeOn(AndroidSchedulers.mainThread())//第三次 复制代码
其输出结果为:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 复制代码
即只有第一次的subscribeOn()
起做用了。这是为何呢? 咱们知道,每调用一次subscribeOn()
就会把旧的被观察者包装成一个新的被观察者,通过了三次调用以后,就变成了下面这个样子:
ObservableSubscribeOn
(第一次)那一层时,管你以前是在哪一个线程,
subscribeOn(Schedulers.io())
都会把线程切到IO线程中去执行,因此屡次设置
subscribeOn()
时,只有第一次生效。
咱们再来看下observeOn()
,仍是先来回顾一下咱们例子中的设置:
//指定在Android主线程中执行 .observeOn(AndroidSchedulers.mainThread()) 复制代码
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //省略无关代码 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } 复制代码
一样,这里也是新包装一个ObservableObserveOn
对象,注意,这里包装的旧被观察者是ObservableSubscribeOn
对象了,由于以前调用过subscribeOn()
包装了一层了,因此如今是以下图所示:
RxJavaPlugins.onAssembly()
也是原样返回。
咱们看看ObservableObserveOn
的构造方法。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } 复制代码
里面就是一些变量赋值而已。
和subscribeOn()
差很少,咱们就直接来看ObservableObserveOn
的subscribeActual()
方法了。
@Override protected void subscribeActual(Observer<? super T> observer) { //判断是否当前线程 if (scheduler instanceof TrampolineScheduler) { //是当前线程的话,直接调用里面一层的subscribe()方法 //即调用ObservableSubscribeOn的subscribe()方法 source.subscribe(observer); } else { //建立Worker //本例子中的scheduler为AndroidSchedulers.mainThread() Scheduler.Worker w = scheduler.createWorker(); //这里会将Worker包装到ObserveOnObserver对象中去 //注意:source.subscribe没有涉及到Worker,因此仍是在以前设置的线程中去执行 //本例子中source.subscribe就是在IO线程中执行。 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } 复制代码
一样,这里也将observer
给包装了一层,以下图所示:
source.subscribe()
中将会把事件逐一发送出去,咱们这里只看下ObserveOnObserver
中的onNext()
方法的处理,onComplete()
等就不看了,实际上都差很少。
@Override public void onNext(T t) { //省略无关代码 if (sourceMode != QueueDisposable.ASYNC) { //将信息存入队列中 queue.offer(t); } schedule(); } 复制代码
就是调用schedule()
而已。
void schedule() { if (getAndIncrement() == 0) { //ObserveOnObserver一样实现了Runnable接口,因此就把它本身交给worker去调度了 worker.schedule(this); } } 复制代码
Android主线程调度器里面的代码就不分析了,里面其实是用handler
来发送Message
去实现的,感兴趣的能够看下。 既然ObserveOnObserver
实现了Runnable
接口,那么就是其run()
方法会在主线程中被调用。 咱们来看下ObserveOnObserver
的run()
方法:
@Override public void run() { //outputFused默认是false if (outputFused) { drainFused(); } else { drainNormal(); } } 复制代码
这里会走到drainNormal()
方法。
void drainNormal() { int missed = 1; //存储消息的队列 final SimpleQueue<T> q = queue; //这里的actual其实是SubscribeOnObserver final Observer<? super T> a = actual; //省略无关代码 //从队列中取出消息 v = q.poll(); //... //这里调用的是里面一层的onNext()方法 //在本例子中,就是调用SubscribeOnObserver.onNext() a.onNext(v); //... } 复制代码
至于SubscribeOnObserver.onNext()
,里面也没切换线程的逻辑,就是调用里面一层的onNext()
,因此最终会调用到咱们自定义的Observer
中的onNext()
方法。所以,Observer
的onNext()
方法就在observeOn()
中指定的线程中给调用了,在本例中,就是在Android主线程中给调用。
- 若是设置了
observeOn(指定线程)
,那么Observer
(观察者)中的onNext()
、onComplete()
等方法将会运行在这个指定线程中去。subscribeOn()
设置的线程不会影响到observeOn()
。
最后,来张observeOn()时序图:
因本人水平有限,若有错误,欢迎指出并交流~四月葡萄的博客