本文由
玉刚说写做平台
提供写做赞助java原做者:
四月葡萄
git版权声明:本文版权归微信公众号 玉刚说 全部,未经许可,不得以任何形式转载github
本文主要是对RxJava的消息订阅和线程切换进行源码分析,相关的使用方式等不做详细介绍。数据库
本文源码基于rxjava:2.1.14
。json
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.缓存
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.安全
上面这段话来自于RxJava在github上面的官方介绍。翻译成中文的大概意思就是:微信
RxJava是一个在Java虚拟机上的响应式扩展,经过使用可观察的序列将异步和基于事件的程序组合起来的一个库。网络
它扩展了观察者模式来支持数据/事件序列,而且添加了操做符,这些操做符容许你声明性地组合序列,同时抽象出要关注的问题:好比低级线程、同步、线程安全和并发数据结构等。数据结构
简单点来讲, RxJava就是一个使用了观察者模式,可以异步的库。
上面说到,RxJava扩展了观察者模式,那么什么是观察模式呢?咱们先来了解一下。
举个例子,以微信公众号为例,一个微信公众号会不断产生新的内容,若是咱们读者对这个微信公众号的内容感兴趣,就会订阅这个公众号,当公众号有新内容时,就会推送给咱们。咱们收到新内容时,若是是咱们感兴趣的,就会点进去看下;若是是广告的话,就可能直接忽略掉。这就是咱们生活中遇到的典型的观察者模式。
在上面的例子中,微信公众号就是一个被观察者(Observable
),不断的产生内容(事件),而咱们读者就是一个观察者(Observer
) ,经过订阅(subscribe
)就可以接受到微信公众号(被观察者)推送的内容(事件),根据不一样的内容(事件)作出不一样的操做。
RxJava的扩展观察者模式中就是存在这么4种角色:
角色 | 角色功能 |
---|---|
被观察者(Observable ) |
产生事件 |
观察者(Observer ) |
响应事件并作出处理 |
事件(Event ) |
被观察者和观察者的消息载体 |
订阅(Subscribe ) |
链接被观察者和观察者 |
RxJava中的事件分为三种类型:Next
事件、Complete
事件和Error
事件。具体以下:
事件类型 | 含义 | 说明 |
---|---|---|
Next |
常规事件 | 被观察者能够发送无数个Next事件,观察者也能够接受无数个Next事件 |
Complete |
结束事件 | 被观察者发送Complete事件后能够继续发送事件,观察者收到Complete事件后将不会接受其余任何事件 |
Error |
异常事件 | 被观察者发送Error事件后,其余事件将被终止发送,观察者收到Error事件后将不会接受其余任何事件 |
在分析RxJava消息订阅原理前,咱们仍是先来看下它的简单使用步骤。这里为了方便讲解,就不用链式代码来举例了,而是采用分步骤的方式来逐一说明(平时写代码的话仍是建议使用链式代码来调用,由于更加简洁)。其使用步骤以下:
- 建立被观察者(
Observable
),定义要发送的事件。- 建立观察者(
Observer
),接受事件并作出响应操做。- 观察者经过订阅(
subscribe
)被观察者把它们链接到一块儿。
这里咱们就根据上面的步骤来实现这个例子,以下:
//步骤1. 建立被观察者(Observable),定义要发送的事件。
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
//步骤2. 建立观察者(Observer),接受事件并作出响应操做。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
//步骤3. 观察者经过订阅(subscribe)被观察者把它们链接到一块儿。
observable.subscribe(observer);
复制代码
其输出结果为:
onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete
复制代码
下面咱们对消息订阅过程当中的源码进行分析,分为两部分:建立被观察者过程和订阅过程。
首先来看下建立被观察者(Observable
)的过程,上面的例子中咱们是直接使用Observable.create()
来建立Observable
,咱们点进去这个方法看下。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
能够看到,create()
方法中也没作什么,就是建立一个ObservableCreate
对象出来,而后把咱们自定义的ObservableOnSubscribe
做为参数传到ObservableCreate
中去,最后就是调用 RxJavaPlugins.onAssembly()
方法。
咱们先来看看ObservableCreate
类:
public final class ObservableCreate<T> extends Observable<T> {//继承自Observable
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;//把咱们建立的ObservableOnSubscribe对象赋值给source。
}
}
复制代码
能够看到,ObservableCreate
是继承自Observable
的,而且会把ObservableOnSubscribe
对象给存起来。
再看下RxJavaPlugins.onAssembly()
方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
//省略无关代码
return source;
}
复制代码
很简单,就是把上面建立的ObservableCreate
给返回。
因此Observable.create()
中就是把咱们自定义的ObservableOnSubscribe
对象从新包装成一个ObservableCreate
对象,而后返回这个ObservableCreate
对象。 注意,这种从新包装新对象的用法在RxJava中会频繁用到,后面的分析中咱们还会屡次遇到。 放个图好理解,包起来哈~
Observable.create()
的时序图以下所示:
接下来咱们就看下订阅过程的代码,一样,点进去Observable.subscribe()
:
public final void subscribe(Observer<? super T> observer) {
//省略无关代码
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
//省略无关代码
}
复制代码
能够看到,实际上其核心的代码也就两句,咱们分开来看下:
public static <T> Observer<? super T> onSubscribe(
@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
//省略无关代码
return observer;
}
复制代码
跟以前代码同样,这里一样也是把原来的observer
返回而已。 再来看下subscribeActual()
方法。
protected abstract void subscribeActual(Observer<? super T> observer);
复制代码
Observable
类的subscribeActual()
中的方法是一个抽象方法,那么其具体实如今哪呢?还记得咱们前面建立被观察者的过程吗,最终会返回一个ObservableCreate
对象,这个ObservableCreate
就是Observable
的子类,咱们点进去看下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//触发咱们自定义的Observer的onSubscribe(Disposable)方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
能够看到,subscribeActual()
方法中首先会建立一个CreateEmitter
对象,而后把咱们自定义的观察者observer
做为参数给传进去。这里一样也是包装起来,放个图:
CreateEmitter
实现了
ObservableEmitter
接口和
Disposable
接口,以下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
//代码省略
}
复制代码
而后就是调用了observer.onSubscribe(parent)
,实际上就是调用观察者的onSubscribe()
方法,即告诉观察者已经成功订阅到了被观察者。
继续往下看,subscribeActual()
方法中会继续调用source.subscribe(parent)
,这里的source
就是ObservableOnSubscribe
对象,即这里会调用ObservableOnSubscribe
的subscribe()
方法。 咱们具体定义的subscribe()
方法以下:
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
复制代码
ObservableEmitter
,顾名思义,就是被观察者发射器。 因此,subscribe()
里面的三个onNext()
方法和一个onComplete()
会逐一被调用。 这里的ObservableEmitter
接口其具体实现为CreateEmitter
,咱们看看CreateEmitte
类的onNext()
方法和onComplete()
的实现:
//省略其余代码
@Override
public void onNext(T t) {
//省略无关代码
if (!isDisposed()) {
//调用观察者的onNext()
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//调用观察者的onComplete()
observer.onComplete();
} finally {
dispose();
}
}
}
复制代码
能够看到,最终就是会调用到观察者的onNext()
和onComplete()
方法。至此,一个完整的消息订阅流程就完成了。 另外,能够看到,上面有个isDisposed()
方法能控制消息的走向,即可以切断消息的传递,这个后面再来讲。
Observable
(被观察者)和Observer
(观察者)创建链接(订阅)以后,会建立出一个发射器CreateEmitter
,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件作出响应处理。能够看到,是订阅以后,Observable
(被观察者)才会开始发送事件。
放张事件流的传递图:
再来看下订阅过程的时序流程图:
以前有提到过切断消息的传递,咱们先来看下如何使用:
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe : " + d);
mDisposable=d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
mDisposable.dispose();
Log.d(TAG, "切断观察者与被观察者的链接");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
复制代码
输出结果为:
onSubscribe : null
onNext : 文章1
切断观察者与被观察者的链接
复制代码
能够看到,要切断消息的传递很简单,调用下Disposable
的dispose()
方法便可。调用dispose()
以后,被观察者虽然能继续发送消息,可是观察者却收不到消息了。 另外有一点须要注意,上面onSubscribe
输出的Disposable
值是"null"
,并非空引用null
。
咱们这里来看看下dispose()
的实现。Disposable
是一个接口,能够理解Disposable
为一个链接器,调用dispose()
后,这个链接器将会中断。其具体实如今CreateEmitter
类,以前也有提到过。咱们来看下CreateEmitter
的dispose()
方法:
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
复制代码
就是调用DisposableHelper.dispose(this)
而已。
public enum DisposableHelper implements Disposable {
DISPOSED
;
//其余代码省略
public static boolean isDisposed(Disposable d) {
//判断Disposable类型的变量的引用是否等于DISPOSED
//即判断该链接器是否被中断
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
//这里会把field给设为DISPOSED
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
}
复制代码
能够看到DisposableHelper
是一个枚举类,而且只有一个值:DISPOSED
。dispose()
方法中会把一个原子引用field
设为DISPOSED
,即标记为中断状态。所以后面经过isDisposed()
方法便可以判断链接器是否被中断。
再回头看看CreateEmitter
类中的方法:
@Override
public void onNext(T t) {
//省略无关代码
if (!isDisposed()) {
//若是没有dispose(),才会调用onNext()
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
//若是dispose()了,会调用到这里,即最终会崩溃
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//省略无关代码
if (!isDisposed()) {
try {
//若是没有dispose(),才会调用onError()
observer.onError(t);
} finally {
//onError()以后会dispose()
dispose();
}
//若是没有dispose(),返回true
return true;
}
//若是dispose()了,返回false
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//若是没有dispose(),才会调用onComplete()
observer.onComplete();
} finally {
//onComplete()以后会dispose()
dispose();
}
}
}
复制代码
从上面的代码能够看到:
- 若是没有
dispose
,observer.onNext()
才会被调用到。onError()
和onComplete()
互斥,只能其中一个被调用到,由于调用了他们的任意一个以后都会调用dispose()
。- 先
onError()
后onComplete()
,onComplete()
不会被调用到。反过来,则会崩溃,由于onError()
中抛出了异常:RxJavaPlugins.onError(t)
。其实是dispose
后继续调用onError()
都会炸。
上面的例子和分析都是在同一个线程中进行,这中间也没涉及到线程切换的相关问题。可是在实际开发中,咱们一般须要在一个子线程中去进行一些数据获取操做,而后要在主线程中去更新UI,这就涉及到线程切换的问题了,经过RxJava咱们也能够把线程切换写得还简洁。
关于RxJava如何使用线程切换,这里就不详细讲了。 咱们直接来看一个例子,并分别打印RxJava在运行过程当中各个角色所在的线程。
new Thread() {
@Override
public void run() {
Log.d(TAG, "Thread run() 所在线程为 :" + Thread.currentThread().getName());
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "Observable subscribe() 所在线程为 :" + Thread.currentThread().getName());
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer onSubscribe() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(TAG, "Observer onNext() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer onError() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "Observer onComplete() 所在线程为 :" + Thread.currentThread().getName());
}
});
}
}.start();
复制代码
输出结果为:
Thread run() 所在线程为 :Thread-2 Observer onSubscribe() 所在线程为 :Thread-2 Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 Observer onNext() 所在线程为 :main Observer onNext() 所在线程为 :main Observer onComplete() 所在线程为 :main 复制代码
从上面的例子能够看到:
Observer
(观察者)的onSubscribe()
方法运行在当前线程中。Observable
(被观察者)中的subscribe()
运行在subscribeOn()
指定的线程中。Observer
(观察者)的onNext()
和onComplete()
等方法运行在observeOn()
指定的线程中。
下面咱们对线程切换的源码进行一下分析,分为两部分:subscribeOn()
和observeOn()
。
首先来看下subscribeOn()
,咱们的例子中是这么个使用的:
.subscribeOn(Schedulers.io())
复制代码
subscribeOn()
方法要传入一个Scheduler
类对象做为参数,Scheduler
是一个调度类,可以延时或周期性地去执行一个任务。
经过Schedulers
类咱们能够获取到各类Scheduler
的子类。RxJava提供了如下这些线程调度类供咱们使用:
Scheduler类型 | 使用方式 | 含义 | 使用场景 |
---|---|---|---|
IoScheduler | Schedulers.io() |
io操做线程 | 读写SD卡文件,查询数据库,访问网络等IO密集型操做 |
NewThreadScheduler | Schedulers.newThread() |
建立新线程 | 耗时操做等 |
SingleScheduler | Schedulers.single() |
单例线程 | 只需一个单例线程时 |
ComputationScheduler | Schedulers.computation() |
CPU计算操做线程 | 图片压缩取样、xml,json解析等CPU密集型计算 |
TrampolineScheduler | Schedulers.trampoline() |
当前线程 | 须要在当前线程当即执行任务时 |
HandlerScheduler | AndroidSchedulers.mainThread() |
Android主线程 | 更新UI等 |
下面咱们来看下Schedulers.io()
的代码,其余的Scheduler
子类都差很少,就不逐以分析了,有兴趣的请自行查看哈~
@NonNull
static final Scheduler IO;
@NonNull
public static Scheduler io() {
//1.直接返回一个名为IO的Scheduler对象
return RxJavaPlugins.onIoScheduler(IO);
}
static {
//省略无关代码
//2.IO对象是在静态代码块中实例化的,这里会建立按一个IOTask()
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
//3.IOTask中会返回一个IoHolder对象
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
//4.IoHolder中会就是new一个IoScheduler对象出来
static final Scheduler DEFAULT = new IoScheduler();
}
复制代码
能够看到,Schedulers.io()
中使用了静态内部类的方式来建立出了一个单例IoScheduler
对象出来,这个IoScheduler
是继承自Scheduler的。这里mark一发,后面会用到这个IoScheduler
的。
而后,咱们就来看下subscribeOn()的代码:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//省略无关代码
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
能够看到,首先会将当前的Observable
(其具体实现为ObservableCreate
)包装成一个新的ObservableSubscribeOn
对象。 放个图:
跟前面同样,RxJavaPlugins.onAssembly()
也是将ObservableSubscribeOn
对象原样返回而已,这里就不看了。 能够看下ObservableSubscribeOn
的构造方法:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
复制代码
也就是把source
和scheduler
这两个保存一下,后面会用到。
而后subscribeOn()
方法就完了。好像也没作什么,就是从新包装一下对象而已,而后将新对象返回。即将一个旧的被观察者包装成一个新的被观察者。
接下来咱们回到订阅过程,为何要回到订阅过程呢?由于事件的发送是从订阅过程开始的啊。 虽然咱们这里用到了线程切换,可是呢,其订阅过程前面的内容跟上一节分析的是同样的,咱们这里就不重复了,直接从不同的地方开始。还记得订阅过程当中Observable
类的subscribeActual()
是个抽象方法吗?所以要看其子类的具体实现。在上一节订阅过程当中,其具体实现是在ObservableCreate
类。可是因为咱们调用subscribeOn()
以后,ObservableCreate
对象被包装成了一个新的ObservableSubscribeOn
对象了。所以咱们就来看看ObservableSubscribeOn
类中的subscribeActual()
方法:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
复制代码
subscribeActual()
中一样也将咱们自定义的Observer
给包装成了一个新的SubscribeOnObserver
对象。一样,放张图:
Observer
的
onSubscribe()
方法,能够看到,到目前为止,还没出现过任何线程相关的东西,因此
Observer
的
onSubscribe()
方法就是运行在当前线程中。 而后咱们重点看下最后一行代码,首先建立一个
SubscribeTask
对象,而后就是调用
scheduler.scheduleDirect()
.。 咱们先来看下
SubscribeTask
类:
//SubscribeTask是ObservableSubscribeOn的内部类
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//这里的source就是咱们自定义的Observable对象,即ObservableCreate
source.subscribe(parent);
}
}
复制代码
很简单的一个类,就是实现了Runnable
接口,而后run()
中调用Observer.subscribe()
。
再来看下scheduler.scheduleDirect()
方法
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
复制代码
往下看:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorker()在Scheduler类中是个抽象方法,因此其具体实如今其子类中
//所以这里的createWorker()应当是在IoScheduler中实现的。
//Worker中能够执行Runnable
final Worker w = createWorker();
//实际上decoratedRun仍是这个run对象,即SubscribeTask
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将Runnable和Worker包装成一个DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
//Worker执行这个task
w.schedule(task, delay, unit);
return task;
}
复制代码
咱们来看下建立Worker
和Worker
执行任务的过程。
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
//就是new一个EventLoopWorker,而且传一个Worker缓存池进去
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
//构造方法
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
//从缓存Worker池中取一个Worker出来
this.threadWorker = pool.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//省略无关代码
//Runnable交给threadWorker去执行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
复制代码
注意,不一样的Scheduler
类会有不一样的Worker
实现,由于Scheduler
类最终是交到Worker
中去执行调度的。
咱们来看下Worker
缓存池的操做:
static final class CachedWorkerPool implements Runnable {
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
//若是缓冲池不为空,就从缓存池中取threadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
//若是缓冲池中为空,就建立一个并返回。
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
}
复制代码
咱们再来看下threadWorker.scheduleActual()
。 ThreadWorker
类没有实现scheduleActual()
方法,其父类NewThreadWorker
实现了该方法,咱们点进去看下:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
//构造方法中建立一个ScheduledExecutorService对象,能够经过ScheduledExecutorService来使用线程池
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//这里的decoratedRun实际仍是run对象
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将decoratedRun包装成一个新对象ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//省略无关代码
if (delayTime <= 0) {
//线程池中当即执行ScheduledRunnable
f = executor.submit((Callable<Object>)sr);
} else {
//线程池中延迟执行ScheduledRunnable
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
//省略无关代码
return sr;
}
}
复制代码
这里的executor
就是使用线程池去执行任务,最终SubscribeTask
的run()
方法会在线程池中被执行,即Observable
的subscribe()
方法会在IO线程中被调用。这与上面例子中的输出结果符合:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 复制代码
Observer
(观察者)的onSubscribe()
方法运行在当前线程中,由于在这以前都没涉及到线程切换。- 若是设置了
subscribeOn(指定线程)
,那么Observable
(被观察者)中subscribe()
方法将会运行在这个指定线程中去。
来张总的subscribeOn()
切换线程时序图
若是咱们屡次设置subscribeOn()
,那么其执行线程是在哪个呢?先来看下例子
//省略先后代码,看重点部分
.subscribeOn(Schedulers.io())//第一次
.subscribeOn(Schedulers.newThread())//第二次
.subscribeOn(AndroidSchedulers.mainThread())//第三次
复制代码
其输出结果为:
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1 复制代码
即只有第一次的subscribeOn()
起做用了。这是为何呢? 咱们知道,每调用一次subscribeOn()
就会把旧的被观察者包装成一个新的被观察者,通过了三次调用以后,就变成了下面这个样子:
ObservableSubscribeOn
(第一次)那一层时,管你以前是在哪一个线程,
subscribeOn(Schedulers.io())
都会把线程切到IO线程中去执行,因此屡次设置
subscribeOn()
时,只有第一次生效。
咱们再来看下observeOn()
,仍是先来回顾一下咱们例子中的设置:
//指定在Android主线程中执行
.observeOn(AndroidSchedulers.mainThread())
复制代码
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略无关代码
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
一样,这里也是新包装一个ObservableObserveOn
对象,注意,这里包装的旧被观察者是ObservableSubscribeOn
对象了,由于以前调用过subscribeOn()
包装了一层了,因此如今是以下图所示:
RxJavaPlugins.onAssembly()
也是原样返回。
咱们看看ObservableObserveOn
的构造方法。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
复制代码
里面就是一些变量赋值而已。
和subscribeOn()
差很少,咱们就直接来看ObservableObserveOn
的subscribeActual()
方法了。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//判断是否当前线程
if (scheduler instanceof TrampolineScheduler) {
//是当前线程的话,直接调用里面一层的subscribe()方法
//即调用ObservableSubscribeOn的subscribe()方法
source.subscribe(observer);
} else {
//建立Worker
//本例子中的scheduler为AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
//这里会将Worker包装到ObserveOnObserver对象中去
//注意:source.subscribe没有涉及到Worker,因此仍是在以前设置的线程中去执行
//本例子中source.subscribe就是在IO线程中执行。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
一样,这里也将observer
给包装了一层,以下图所示:
source.subscribe()
中将会把事件逐一发送出去,咱们这里只看下ObserveOnObserver
中的onNext()
方法的处理,onComplete()
等就不看了,实际上都差很少。
@Override
public void onNext(T t) {
//省略无关代码
if (sourceMode != QueueDisposable.ASYNC) {
//将信息存入队列中
queue.offer(t);
}
schedule();
}
复制代码
就是调用schedule()
而已。
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver一样实现了Runnable接口,因此就把它本身交给worker去调度了
worker.schedule(this);
}
}
复制代码
Android主线程调度器里面的代码就不分析了,里面其实是用handler
来发送Message
去实现的,感兴趣的能够看下。 既然ObserveOnObserver
实现了Runnable
接口,那么就是其run()
方法会在主线程中被调用。 咱们来看下ObserveOnObserver
的run()
方法:
@Override
public void run() {
//outputFused默认是false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
复制代码
这里会走到drainNormal()
方法。
void drainNormal() {
int missed = 1;
//存储消息的队列
final SimpleQueue<T> q = queue;
//这里的actual其实是SubscribeOnObserver
final Observer<? super T> a = actual;
//省略无关代码
//从队列中取出消息
v = q.poll();
//...
//这里调用的是里面一层的onNext()方法
//在本例子中,就是调用SubscribeOnObserver.onNext()
a.onNext(v);
//...
}
复制代码
至于SubscribeOnObserver.onNext()
,里面也没切换线程的逻辑,就是调用里面一层的onNext()
,因此最终会调用到咱们自定义的Observer
中的onNext()
方法。所以,Observer
的onNext()
方法就在observeOn()
中指定的线程中给调用了,在本例中,就是在Android主线程中给调用。
- 若是设置了
observeOn(指定线程)
,那么Observer
(观察者)中的onNext()
、onComplete()
等方法将会运行在这个指定线程中去。subscribeOn()
设置的线程不会影响到observeOn()
。
最后,来张observeOn()时序图:
因本人水平有限,若有错误,欢迎指出并交流~四月葡萄的博客