Author: Dorae
Date: 2018年12月3日17:10:31 转载请注明出处html
首先问本身几个问题,若是很是清楚这几个问题的目的与答案,那么恭喜你,不用继续往下看了-_-。java
如图1-1所示react
参见这里api
一款为了简化异步调用,且功能比较全面的框架。网络
参见Java8中的sink链,在RxJava中一样实现了链式处理。如代码片断code1-1所示,咱们对其结构进行分析:框架
code1-1异步
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Dorae");
}
})
.filter(e -> e.contains("o"))
.map(e -> "AfterMap: " + e)
.filter(e -> e.contains("D"))
.subscribe(new Observer<String>() {
@Override
public void onNext(@NonNull String o) {
System.out.println("观察者 onNext: " + o);
}
@Override
public void onSubscribe(Disposable d) {
System.out.println("观察者onSubscribe: " + d + "### " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
观察者onSubscribe: io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver@52d455b8### mainide
观察者 onNext: AfterMap: Dorae源码分析
首先了解下RxJava中的几种基本角色:spa
是否是感受上边的一堆废话很是枯燥?先上一张RxJava的核心结构,如图3-1所示。
如今咱们再来看看code1-1,其最终造成的Observable链如图3-2所示,每次调用map、filter等操做,都会生成一个新对象,而且保持了一个对上游的引用(用于生成Observer链)。
Observer链如图3-3所示,整个事件流程由CreateEmitter触发,最终交由咱们的实现Observer$1处理。
看了上边几张图以后,是否是感受清晰了不少?那么让咱们进一步看下Rxjava如何完成了一键线程切换。
一般咱们使用RxJava的线程切换功能时,只须要在调用链中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述,其实就是一个包装了ThreadPool的调度器。那么咱们先来看下相关源码。
一、subscribeOn
如代码code4-1所示,为subscribeOn的核心代码。很明显,其中在新线程中只是简单的直接调用了source,也就是说这里以后的全部操做均在一个新线程中进行,和单线程并无什么区别。
code 4-1
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>() {
@Override
public void subscribeActual(final Observer<? super T> observer) {
scheduler.createWorker().schedule(new SubscribeTask() {
@Override
public void run() {
source.subscribe(e);
}
});
}
};
}
复制代码
二、observeOn
如代码段code4-2所示,为observeOn的核心逻辑,能够看出其在订阅阶段(生成Observer链的阶段)仍是在当前线程执行,只有触发以后,到了ObserverOn的Observer的节点时才会真正的切换到新线程。
code 4-2
public final Observable<T> observeOn(Scheduler scheduler) {
return new ObservableOnSubscribe<T>() {
@Override
public void subscribeActual(@NonNull Observer<Object> e) {
source.subscribe(new Observer<T>() {
@Override
public void onNext(T var1) {
scheduler.createWorker().schedule(new Runnable() {
@Override
public void run() {
e.onNext(var1);
}
});
}
});
}
};
}
复制代码
经过上述code4-一、code4-2的分析,是否是能够推断出当屡次subscribeOn时会发生什么?没错,虽然每次subscribeOn都会产生一次线程切换,可是真正起做用的只有最开始的一次subscribeOn,也就至关于只在最初的位置调用了subscribeOn;对于observeOn也是相似,每次都会产生新线程,可是每次都会产生必定的影响,也就是每一个线程都承担了一部分工做。
经过本文,咱们能够简要了解到RxJava的基本原理,可是对于其丰富的api还须要在实践中进行磨合。可是,RxJava既然做为一个异步框架,其必然有必定的局限,好比其切换线程时没法阻塞当前线程(这种对于Android等须要渲染或者网络IO的需求来讲很是适用),可是对于常见的服务端业务来讲,还须要额外引入阻塞当前线程的操做(由于大部分的server代码仍是单线程模型),假若彻底不用线程切换在服务端强行引入,可能会得不偿失。我的更推荐Java8的CompletableFuture。