推荐几篇在学习Rxjava中的阅读的文章。尤为是大神W_BinaryTree的文章,给学习过程当中带来了很多启发。html
函数响应式编程介绍java
Rxjava2.0 Api介绍和使用android
Github地址github
背压策略编程
背压的理解缓存
Android RxJava 实际应用讲解:(无条件)网络请求轮询安全
Android RxJava 实际应用讲解:(有条件)网络请求轮询bash
Android RxJava 实际应用讲解:网络请求嵌套回调网络
Android RxJava 实际应用讲解:从磁盘 / 内存缓存中 获取缓存数据
Android RxJava:细说 线程控制(切换 / 调度 )(含Retrofit实例讲解)
Android RxJava 实际应用讲解:网络请求出错重连(结合Retrofit)
什么是 Monad (Functional Programming)?
Functors, Applicatives, And Monads In Pictures
观察者模式(Observer Mode)是定义对象间的一对多的依赖关系,当被观察者的状态发生改变时,全部依赖于它的对象都获得通知并自动刷新。
在观察者模式中有如下四个主要角色:
抽象主题[抽象被观察者](Subject):定义添加和删除观察者的方法,内部经过集合维护观察者序列。
具体主题[具体被观察者](Concrete Subject):抽象主题的实现对象,在具体主题内部状态发生变化时,通知全部的观察者更新状态。
抽象观察者(Observer):定义观察者的统一接口和方法。
具体观察者(Concrete Observer):抽象观察者的具体实现类,实现抽象观察者定义的统一接口,以便使自己的状态与主题状态协调。 经典的观察者模式UML类图:
为了方便分析问题,下面给出Rxjava实现的最简单的被观察者(主题)发送数据观察者打印数据的代码。从代码中分析Rxjava中是如何定义而且实现观察者模式中不一样的角色。为了方便说明问题,把Rxjava中的链式(Chain)拆分红最基本的3段。
(1)Observable对象建立,抽象类Observable是接口ObservableSource下的一个抽象实现,经过Observable建立一个可观察对象发射事件流。
(2)Observer对象建立,建立一个观察者Observer来接受并响应可观察对象发射的事件。
(3)Observer订阅Observable,经过subscribe方法,使Observer与Observable创建订阅关系,Observer与Observable便成为了一个总体,Observer即可对Observable中的行为做出响应。
PS: 虽然从代码上看上去像是Observable订阅了Observer,可是其实仍是观察者订阅了被观察者,Rxjava这么设计是为了保持链式调用(Chain)。 这里问了说明问题,没有采用极简的代码实现。 java实现:
private void rxjavaDemo() {
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("R");
e.onNext("X");
e.onComplete();
}
});
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object s) {
Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
}
复制代码
Kotlin实现:
private fun rxjavaDemo() {
val mObservable = Observable.create<String>{
it.onNext("R")
it.onNext("X")
it.onComplete()
}
val mObserver = object : Observer<String>{
override fun onComplete() {
}
override fun onSubscribe(d: Disposable?) {
}
override fun onNext(value: String?) {
Log.e(RxjavaDemoActivity::class.java.simpleName, "object : $value")
}
override fun onError(e: Throwable?) {
}
}
mObservable.subscribe(mObserver)
}
复制代码
在上面代码中咱们调用了Observable的create方法来建立被观察者。
(1)在Observable类内部提供了众多的静态方法来建立被观察者。诸如:create、just、interval、from、zip、contact、merge等方法。
(2)requireNonNull方法,是Rxjava的判空实现,防止出现空指针异常。
(3)在create方法中会建立ObservableCreate的被观察者。
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
被观察者ObservableCreate类继承自抽象类Observable,内部实现了父类的subscribeActual方法。
public final class ObservableCreate<T> extends Observable<T>{
..........
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
.................
}
复制代码
被观察者的抽象类Observable,Observable又是接口ObservableSource下的一个抽象实现。
(1)内部实现了ObservableSource接口定义的subscribe方法。subscribe方法内部主要是调用了subscribeActual方法,全部判定订阅关系是在subscribeActual方法内部实现的。
(2)因此要实现订阅关系,观察者真正须要复写的是subscribeActual方法。好比ObservableCreate类就复写了该方法。
public abstract class Observable<T> implements ObservableSource<T> {
..........
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
..............
}
复制代码
Observable实现了ObservableSource接口,在ObservableSource内部定义了subscribe方法用来实现订阅观察者(Observer)。
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}
复制代码
总结:
(1)ObservableSource就是扮演着抽象被观察者的角色。
(2)在ObservableSource 接口中定义了subscribe方法用来用来实现订阅观察者(Observer)。
(3)Observable类实现了ObservableSource接口而且实现了其subscribe方法,可是它并无真正的去完成主题和观察者之间的订阅关系,而是内部调用了另外一个抽象方法subscribeActual。
(4)在Observable内部提供了一系列建立型操做符, 用来建立不一样场景的Observable。
通过上面的介绍,咱们已经明白了在Rxjava中被观察者(Observable)是如何建立的,以及是谁扮演者抽象观察者的角色。可是咱们并无在ObservableCreate类中发现具体发送事件的实现。那么这里就有一个问题:
问题: ObservableCreate等内部是如何发送事件到观察者(Observer)的?
经过上面的代码和分析,咱们知道Observer扮演着抽象观察者的角色。下面分别解释一下Observer类内部定义的四个主要的方法:
(1)onSubscribe(Disposable d)里面的Disposable对象,Disposable翻译过来是可随意使用的。至关于观察者和被观察者之间的订阅关系,若是观察者不想订阅被观察者了,能够调用 mDisposable.dispose()取消订阅关系。
(2)onCompleted(): 事件队列完成。RxJava 不只把每一个事件单独处理,还会把它们看作一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,须要触发 onCompleted() 方法做为标志。
(3)onError(): 事件队列异常。在事件处理过程当中出异常时,onError() 会被触发,同时队列自动终止,不容许再有事件发出。
(4)onNext():接收数据。
(5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,而且是事件序列中的最后一个。并且onCompleted() 和 onError() 两者也是互斥的,即在队列中调用了其中一个,就不该该再调用另外一个。
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param value
* the item emitted by the Observable
*/
void onNext(T value);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
复制代码
既然Observer是个接口,那么就应该是个抽象观察者,具体的观察者是咱们在实际运用的时候直接new一个实例对象。
通过上面的介绍,咱们已经明白了在Rxjava中观察者是如何建立的以及各个方法的做用。那么观察者是如何接受被观察者发送的事件的呢?
问题: Observer是如何接受数据到被观察者发送的数据?
前提:观察者订阅被观察者后,被观察者才会开始发送事件。 PS:可是并非全部的被观察者都须要被订阅才会发送数据,好比Observable.just的方法返回的ObservableJust被观察者者。 示例:执行下面的代码会输出JustObservable,可是将just换成create方法就不会输出,全部得出结论:并非全部的被观察者都须要被订阅才会发送数据
Observable.just(new JustObservable());
class JustObservable{
JustObservable(){
Log.e("rx","JustObservable");
}
}
复制代码
下面咱们来分析上面遗留的两个问题:即观察者和被观察者是如何发送和接受事件的
Observable.create生成的被观察者须要被订阅后才会发送数据到观察者。 根据上面的分析咱们知道:这里的observable对象是ObservableCreate类的实例。
observable.subscribe(observer);
复制代码
这里的subscribe是父类Observable的方法,在里面又会调用subscribeActual方法,Observable的子类ObservableCreate会复写subscribeActual方法。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
下面来分析一下ObservableCreate类。
(1)在ObservableCreate的构造方法中有个ObservableOnSubscribe类型的形参。
(2)而且正如咱们上面所说内部实现了subscribeActual方法。
(3)因此真正处理被观察者和观察者之间实现订阅的逻辑在Observable的subscribeActual方法中。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
.............
}
复制代码
那么在ObservableCreate的构造方法的形参的赋值确定是在ObservableCreate对象初始化的时候,然而ObservableCreate的初始化,是经过Observable的create方法,下面咱们回到Observable的create的方法。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
在调用create方法的时候须要传递ObservableOnSubscribe的对象做为参数,而这个对象最终会传入到ObservableCreate的构造方法中。 下面来看一下ObservableOnSubscribe的做用:
(1)内部声明了一个subscribe方法,subscribe 接收到一个ObservableEmitter对象。
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(ObservableEmitter<T> e) throws Exception;
}
复制代码
下面再来看一下ObservableEmitter类的做用:
(1)ObservableEmitter以一种能够安全取消的形式发送事件到观察者,经过调用setDisposable方法。
(2)继承了Emitter接口。
public interface ObservableEmitter<T> extends Emitter<T> {
/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param d the disposable, null is allowed
*/
void setDisposable(Disposable d);
/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(Cancellable c);
/**
* Returns true if the downstream disposed the sequence.
* @return true if the downstream disposed the sequence
*/
boolean isDisposed();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/
ObservableEmitter<T> serialize();
}
复制代码
下面咱们来看一下Emitter类的做用:
(1)Emitter翻译过来就是发射器的意识,到这里咱们能够想到,该类内部应该定义了一些跟事件发送相关的方法。
(2)而且在实例化ObservableOnSubscribe的时候,咱们正好是调用了ObservableEmitter的onNext向观察者发送数据的。
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("R");
e.onNext("X");
e.onComplete();
}
});
复制代码
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
复制代码
由此咱们能够知道Emitter内部声明了三种事件类型,而ObservableEmitter 扩展了Emiiter的功能,添加了Disposable相关的方法,能够用来安全取消事件的发送即取消观察者和被观察者之间的订阅关系。 由上诉分析咱们已经知道了ObservableCreate的ObservableOnSubscribe变量的来历和基本做用以及被观察者的建立过程。 下面继续回到ObservableCreate类的subscribeActual方法来看看事件是如何从被观察者发送到观察者的。
(1)在subscribeActual方法内部建立了CreateEmitter类对象,而且接受Observer做为参数,CreateEmitter实现了ObservableEmitter接口。因此该类是负责事件发送,到这里咱们已经明确了事件的发送类即ObservableEmitter。
(2)在subscribeActual方法内部,调用了ObservableOnSubscribe的subscribe方法而且传递ObservableEmitter对象实例做为参数,而后就能够调用ObservableEmitter的方法发送事件了。
(3)在subscribeActual方法内部,调用了Observer的onSubscribe方法而且传递CreateEmitter做为参数,这样观察者就持有了发送事件(被观察者)的直接引用,方便观察者取消订阅关系。
到这里咱们已经肯定到了:观察者是如何和被观察者订阅的。以及事件是如何发送到被观察者的。而且确认了只有发生了订阅关系,事件才能够发送。
下面在来看一下CreateEmitter类的实现逻辑。
(1)CreateEmitter的构造函数,传递一个Observer的对象做为形参。这样就能够将事件发送到对应的观察者了。
(2)CreateEmitter实现了ObservableEmitter接口,做为事件发送器。
(3)onNext事件中,不能够发送参数为null的类型,在事件序列没有中断的状况下把事件从被观察者传递给观察者。
(4)onComplete事件,用于通知观察者事件队列已经没有事件发送了。
(5)onError 事件,事件队列异常。在事件处理过程当中出异常时,onError() 会被触发,同时队列自动终止,不容许再有事件发出。
(6)setDisposable、setCancellable方法,观察者根据获取到的Emitter的实例对象,能够取消被观察者和观察者之间的订阅关系。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
复制代码
小结:
(1)在RxJava中Observer经过onSubscribe方法获取了发送事件中的Disposable对象,这样他就能够控制观察者和被观察者之间的订阅关系。
(2)被观察者并无直接控制事件的发送,而是将事件的发送给Disposable对象的发送。
(3)订阅关系并无发生在subscribe方法中,而是在subscribeActual方法中实现了订阅关系。
下面的这段代码实现了最简单的Rxjava线程切换。发送事件就能够在非UI线程(RxNewThreadScheduler 的线程执行,将耗时操做放在子线程中,避免阻塞UI线程。接受事件又切换回了Android UI线程,Android禁止在非UI线程操做UI。这样就简单的实现了在自线程处理耗时操做而后在UI线程刷新UI的逻辑。
为了说明问题,把代码拆分红五段,能够看出,其实每次的链式调用都会生成不一样的Observable对象,因此咱们在平时开发的时候,应该尽量避免长链式的调用,规避掉不须要的中间操做。
private void rxjavaDemo() {
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.e("rxjava","ObservableEmitter current thread :"+ Thread.currentThread().getName());
e.onNext("R");
e.onNext("X");
e.onComplete();
}
});
Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.e("rxjava","onSubscribe current thread :"+ Thread.currentThread().getName());
}
@Override
public void onNext(Object s) {
Log.e("rxjava","onNext current thread :"+ Thread.currentThread().getName());
Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
Observable observableObserveOn = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
observableObserveOn.subscribe(observer);
}
复制代码
下面咱们将结合上面的分析和代码,分析一下在observable.subscribeOn内部都作了那些操做。
(1)实例化了ObservableSubscribeOn对象。而且传入的两个参数分别是ObservableCreate对象和Scheduler对象。
(2)ObservableCreate对象是咱们上面分析的Observable.create生成的一个被观察者。
(3)Scheduler对象,如今猜想应该是和线程调度有关的类。接下来会分析到。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
复制代码
下面在来分析ObservableSubscribeOn类。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
..........
}
复制代码
(1)继承了AbstractObservableWithUpstream类。AbstractObservableWithUpstream类内部存储了ObservableSource类对象,根据上下文,这里的source就是ObservableCreate类对象。
(2)实现了subscribeActual方法,而且在在subscribeActual方法内部建立了SubscribeOnObserver对象,这里的SubscribeOnObserver其实也是个观察者,能够理解成事件通过SubscribeOnObserver观察者中转了才最终到达咱们建立的观察者。SubscribeOnObserver 是AtomicReference的子类(保证原子性),实现了 Observer接口 和 Disposable 接口。
(3)内部调用onSubscribe方法将SubscribeOnObserver传递给观察者,这样观察者就能够控制事件的接受了,即获取了事件发送(被观察者发送数据)的控制权。
(3)ObservableSubscribeOn和ObservableCreate同样,也是Observable的一个子类。而且在ObservableSubscribeOn内部持有它上一步的被观察者Observable的引用(这里就是ObservableCreate)。
(4) source.subscribe(parent)实现了观察者和被观察者之间的订阅关系,并将经过SubscribeOnObserver类对象传递给ObservableOnSubscribe的subscribe方法。SubscribeOnObserver类对象就能够实现事件的发送了。
下面在来看一下SubscribeOnObserver类:做用和代码基本同于CreateEmitter类,看上面的CreateEmitter类分析便可。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
复制代码
通过上面的分析,咱们明确了observable.subscribeOn(Schedulers.newThread())建立的被观察者(Observable)和观察者(Observer)之间的订阅关系。下面来分析一下subscribeOn是如何实现线程切换的。首先咱们思考一下,若是要实现线程切换,确定要建立子线程。 问题: 子线程是如何建立 在调用subscribeOn的时候,传入了Scheduler参数,Scheduler翻译过来就是调度者的意识。经过调用Schedulers的newThread方法,建立子线程(RxNewThreadScheduler)。下面咱们以RxNewThreadScheduler线程为例。看看线程是如何被建立的。
Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());
复制代码
在Schedulers类内部定义了newThread静态方法用于生成Scheduler对象。 (1)其中NEW_THREAD为默认的生成的一个Scheduler对象。
static final Scheduler NEW_THREAD;
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
static {
..........
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}
复制代码
接下来咱们就来看看,initNewThreadScheduler() 是如何生成一个Scheduler实例的。
(1)在initNewThreadScheduler方法中通过一系列的条件判断,最终会执行到call方法(延迟初始化)。
(2)NewThreadHolder.DEFAULT会返回一个NewThreadScheduler对象(单例模式)
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}
复制代码
下面再来看看单例模式(饿汉式)的NewThreadScheduler类,看名字就能够猜想到是线程调度者。
(1)NewThreadScheduler 继承自Scheduler抽象类。
(2)经过静态代码块中建立了RxThreadFactory线程工厂对象,该类实现了ThreadFactory接口,而且在RxThreadFactory类的newThread方法中建立了优先级为5的线程Thread。
(3)在NewThreadScheduler的createWorker()方法中,建立了NewThreadWorker 对象。
public final class NewThreadScheduler extends Scheduler {
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public static NewThreadScheduler instance() {
return INSTANCE;
}
private NewThreadScheduler() {
}
@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
}
}
复制代码
接下来咱们就来看看NewThreadWorker 都作了写什么。
(1)在NewThreadWorker的构造函数中,经过调用SchedulerPoolFactory.create的方法而且传入NewThreadScheduler中提供的线程工厂RxThreadFactory建立了一个ScheduledExecutorService对象。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
.........
}
复制代码
在来看一下SchedulerPoolFactory类
(1)经过create方法建立了核心线程数量为1的线程池。
/**
* Creates a ScheduledExecutorService with the given factory.
* @param factory the thread factory
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
复制代码
分析到这里咱们明确了,经过Schedulers.newThread()会建立一个核心线程数量为1的线程池。 建立完线程,下面就是启动和运行线程了,而且将事件的发送,放在子线程中进行处理。而且咱们都知道调用屡次 subscribeOn 指定子线程只有第一次会生效 ,下面咱们将带着这两个疑问,来分析一下。 在ObservableSubscribeOn的subscribeActual方法中,经过source.subscribe(parent)调用实现了观察者和被观察者之间的订阅关系。咱们能够看到该方法的执行是放在了Runnable里面执行的。因此线程的切换,应该就发生子此处。
(1)通过上面的分析,这里的scheduler对象是NewThreadScheduler类。而且调用了Schedule的scheduleDirect方法。
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
复制代码
下面来看看Scheduler类的scheduleDirect方法。
(1)内部调用了重载的scheduleDirect方法。
(2)createWorker返回的是NewThreadWorker类对象。而且调用了NewThreadWorker类的schedule方法。
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
复制代码
下面来看看NewThreadWorker类的schedule方法。
(1)在schedulerActual方法中,经过ScheduledExecutorService执行submit或schedule执行一个Runnable任务,即开启了线程池里面的线程任务。
@Override
public Disposable schedule(final Runnable run) {
return schedule(run, 0, null);
}
@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
复制代码
分析到这里咱们知道了线程的开启是在NewThreadWorker类中进行的。
那么还有个疑问:调用屡次 subscribeOn 指定子线程只有第一次会生效
(1)这里的生效并非指其余的subscribeOn方法建立的线程没有生效,而是会被第一次的subscribeOn建立的线程“掩盖掉”
(2)屡次调用subscribeOn,会生成若干个Observable对象,每一个新生成的对象都有切换线程的能力,可是只有第一次的subscribeOn才生效,由于后续的线程切换被第一个“掩盖掉”了。 这么说可能有点抽象,下面以一张图来讲明:
(1)每次调用subscribeOn方法,都会生成一个Observable,而且回持有上游的Observable对象。
(2)事件的发送是在第一次的subscribeOn建立的子线程中发送的,中间不会切换线程。
经过上面subscribeOn的流程梳理,咱们知道了上游事件是被如何切换都子线程的。下面咱们将分析事件是如何被切换到下游线程的,大部分状况下就是咱们的Android UI线程。 下面咱们将结合上面的分析和代码,分析一下在observable.observeOn内部都作了那些操做。
observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())
复制代码
下面来看一下Observable的observeOn方法: (1) observeOn 方法返回了一个 ObservableObserveOn类对象。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
接下来看看 ObservableObserveOn类。
(1)该类ObservableSubscribeOn基本一致,继承了 AbstractObservableWithUpstream ,拥有ObservableSource类型对象,这里是ObservableSubscribeOn实例对象。
(2)在subscribeActual 方法内部,scheduler是HandlerScheduler类型对象,这里的scheduler就是展开分析了,基本上就是利用Android的Handler机制实现线程切换的。
(3)经过 scheduler.createWorker() 建立了 HandlerWorker的Worker对象。
(4)建立了一个ObserveOnObserver对象,该 类实现了Observer 接口,全部它是个Observer,同时实现了一个Runnable接口,这样经过Handler就能够执行到ObserveOnObserver的run方法。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...........
}
复制代码
下面就来看看这个 ObserveOnObserver,经过上面咱们知道,线程切换是经过Handler实现的。 (1)actual 参数是咱们建立的Observer对象。
(2)Worker参数是HandlerWorker对象。经过AndroidSchedulers.mainThread()的调用是建立的。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
复制代码
下面来看一下run方法里面的操做: outputFused参数默认是false,因此接下来看看drainNormal方法。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
复制代码
(1)queue参数是在onSubscribe方法里面建立的。而onSubscribe方法的调用,则是在上游的subscribeActual方法中调用的。
(2)内部经过轮训队列里面的事件,将事件最终发送到Observer。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
复制代码
下面以一张图,总结一下:observeOn和subscribeOn的流程。
(1)subscribeOn 控制上游线程切换,subscribeOn屡次调用只有第一次的subscribeOn会起做用。
(2)observeOn控制下游线程切换。observeOn可使用屡次。而且observeOn 后面的全部操做都会在observeOn指定的线程中执行。
(3)subscribeOn和observeOn之间的操做,会在subscribeOn 指定的线程中执行,直到执行了observeOn操做。