转载请标明出处:
juejin.im/post/58ce8c…
本文出自:【张旭童的稀土掘金】(gold.xitu.io/user/56de21…)javascript
承接上一篇RxJava2 源码解析(一),
本系列咱们的目的:java
Observable
)是如何将数据发送出去的。Observer
)是如何接收到数据的。本篇计划讲解一下4,5.app
RxJava最强大的莫过于它的线程调度 和 花式操做符。异步
map是一个高频的操做符,咱们首先拿他开刀。
例子以下,源头Observable
发送的是String类型的数字,利用map转换成int型,最终在终点Observer
接受到的也是int类型数据。:ide
final Observable<String> testCreateObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete()
}
});复制代码
Observable<Integer> map = testCreateObservable.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
map.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});复制代码
咱们看一下map
函数的源码:函数
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空略过
ObjectHelper.requireNonNull(mapper, "mapper is null");
//RxJavaPlugins.onAssembly()是hook 上文提到过
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}复制代码
RxJavaPlugins.onAssembly()
是hook 上文提到过,因此咱们只要看ObservableMap
,它就是返回到咱们手里的Observable
:post
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//将function变换函数类保存起来
final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//super()将上游的Observable保存起来 ,用于subscribeActual()中用。
super(source);
this.function = function; } @Override public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); }复制代码
它继承自AbstractObservableWithUpstream
,该类继承自Observable
,很简单,就是将上游的ObservableSource
保存起来,作一次wrapper,因此它也算是装饰者模式的提现,以下:ui
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
//将上游的`ObservableSource`保存起来
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}复制代码
关于ObservableSource
,表明了一个标准的无背压的 源数据接口,能够被Observer
消费(订阅),以下:this
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}复制代码
全部的Observable
都已经实现了它,因此咱们能够认为Observable
和ObservableSource
在本文中是相等的:spa
public abstract class Observable<T> implements ObservableSource<T> {复制代码
因此咱们获得的ObservableMap
对象也很简单,就是将上游的Observable
和变换函数类Function
保存起来。Function
的定义超级简单,就是一个接口,给我一个T,还你一个R.
public interface Function<T, R> {
R apply(T t) throws Exception;
}复制代码
本例写的是将String->int.
重头戏,subscribeActual()
是订阅真正发生的地方,ObservableMap
以下编写,就一句话,用MapObserver订阅上游Observable。:
@Override
public void subscribeActual(Observer<? super U> t) {
//用MapObserver订阅上游Observable。
source.subscribe(new MapObserver<T, U>(t, function)); }复制代码
MapObserver
也是装饰者模式,对终点(下游)Observer
修饰。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//super()将actual保存起来
super(actual);
//保存Function变量
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//done在onError 和 onComplete之后才会是true,默认这里是false,因此跳过
if (done) {
return;
}
//默认sourceMode是0,因此跳过
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//下游Observer接受的值
U v;
//这一步执行变换,将上游传过来的T,利用Function转换成下游须要的U。
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//变换后传递给下游Observer
actual.onNext(v);
}复制代码
到此咱们梳理一下流程:
订阅的过程,是从下游到上游依次订阅的。
Observer
订阅了 map
返回的ObservableMap
。map
的Observable
(ObservableMap
)在被订阅时,会订阅其内部保存上游Observable
,用于订阅上游的Observer
是一个装饰者(MapObserver
),内部保存了下游(本例是终点)Observer
,以便上游发送数据过来时,能传递给下游。Observable
被订阅,根据上节课内容,它开始向Observer发送数据。数据传递的过程,固然是从上游push到下游的,
Observable
传递数据给下游Observer
(本例就是MapObserver
)MapObserver
接收到数据,对其变换操做后(实际的function在这一步执行),再调用内部保存的下游Observer
的onNext()
发送数据给下游Observer
。简化问题,代码以下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
e.onNext("1");
e.onComplete();
}
//只是在Observable和Observer之间增长了一句线程调度代码
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});复制代码
只是在Observable
和Observer
之间增长了一句线程调度代码:.subscribeOn(Schedulers.io())
.
查看subscribeOn()
源码:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//判空略过
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//抛开Hook,重点仍是ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}复制代码
等等,怎么有种似曾相识的感受,你们能够把文章向上翻,看看map()
的源码。
和subscribeOn()
的套路一模一样,那么咱们根据上面的结论,
先猜想ObservableSubscribeOn
类也是一个包装类(装饰者),点进去查看:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//保存线程调度器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//map的源码中咱们分析过,super()只是简单的保存ObservableSource
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//1 建立一个包装Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//2 手动调用 下游(终点)Observer.onSubscribe()方法,因此onSubscribe()方法执行在 订阅处所在的线程
s.onSubscribe(parent);
//3 setDisposable()是为了将子线程的操做加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此时已经运行在相应的Scheduler 的线程中
source.subscribe(parent);
}
}));
}复制代码
和map套路大致一致,ObservableSubscribeOn
自身一样是个包装类,同样继承AbstractObservableWithUpstream
。
建立了一个SubscribeOnObserver
类,该类按照套路,应该也是实现了Observer
、Disposable
接口的包装类,让咱们看一下:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
//真正的下游(终点)观察者
final Observer<? super T> actual;
//用于保存上游的Disposable,以便在自身dispose时,连同上游一块儿dispose
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
//onSubscribe()方法由上游调用,传入Disposable。在本类中赋值给this.s,加入管理。
DisposableHelper.setOnce(this.s, s);
}
//直接调用下游观察者的对应方法
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
//取消订阅时,连同上游Disposable一块儿取消
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//这个方法在subscribeActual()中被手动调用,为了将Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}复制代码
这两个类根据上一节的铺垫加上注释,其余都好理解,稍微很差理解的应该是下面两句代码:
//ObservableSubscribeOn类
//3 setDisposable()是为了将子线程的操做加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此时已经运行在相应的Scheduler 的线程中
source.subscribe(parent);
}
}));
//SubscribeOnObserver类
//这个方法在subscribeActual()中被手动调用,为了将Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}复制代码
其中scheduler.scheduleDirect(new Runnable()..)
方法源码以下:
/** * Schedules the given task on this scheduler non-delayed execution. * ..... */
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}复制代码
从注释和方法名咱们能够看出,这个传入的Runnable
会马上执行。
再继续往里面看:
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
//class Worker implements Disposable ,Worker自己是实现了Disposable
final Worker w = createWorker();
//hook略过
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//开始在Worker的线程执行任务,
w.schedule(new Runnable() {
@Override
public void run() {
try {
//调用的是 run()不是 start()方法执行的线程的方法。
decoratedRun.run();
} finally {
//执行完毕会 dispose()
w.dispose();
}
}
}, delay, unit);
//返回Worker对象
return w;
}复制代码
createWorker()
是一个抽象方法,由具体的Scheduler
类实现,例如IoScheduler
对应的Schedulers.io()
.
public abstract Worker createWorker();复制代码
初看源码,为了了解大体流程,不宜过入深刻,先点到为止。
OK,如今咱们总结一下scheduler.scheduleDirect(new Runnable()..)
的重点:
Runnable
是马上执行的。Worker
对象就是一个Disposable
对象,Runnable
执行时,是直接手动调用的 run()
,而不是 start()
方法.run()
结束后(包括异常终止),都会自动执行Worker
.dispose()
.而返回的Worker
对象也会被parent.setDisposable(...)
加入管理中,以便在手动dispose()
时能取消线程里的工做。
咱们总结一下subscribeOn(Schedulers.xxx())
的过程:
ObservableSubscribeOn
包装类对象subscribeActual()
方法,在其中会马上将线程切换到对应的Schedulers.xxx()
线程。source.subscribe(parent);
,对上游(终点)Observable
订阅Observable
开始发送数据,根据RxJava2 源码解析(一),上游发送数据仅仅是调用下游观察者对应的onXXX()
方法而已,因此此时操做是在切换后的线程中进行。一点扩展,
你们可能看过一个结论:subscribeOn(Schedulers.xxx())
切换线程N次,老是以第一次为准,或者说离源Observable最近的那次为准,而且对其上面的代码生效(这一点对比的ObserveOn()
)。
为何?
subscribeActual()
里开启了Scheduler的工做,source.subscribe(parent);
,从这一句开始切换了线程,因此在这之上的代码都是在切换后的线程里的了。subscribeOn(xxxx)
指定的线程,subscribeOn(xxxx)
的线程里push数据(onXXX()
)给下游。可写以下代码验证:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
//依然是io线程
Log.d(TAG, "apply() called with: s = [" + s + "]" + Thread.currentThread());
return s;
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});复制代码
在上一节的基础上,增长一个observeOn(AndroidSchedulers.mainThread())
,就完成了观察者线程的切换。
.subscribeOn(Schedulers.computation())
//在上一节的基础上,增长一个ObserveOn
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {复制代码
继续看源码吧,我已经能猜出来了,hook+new XXXObservable();
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}复制代码
果真,查看ObservableObserveOn
,:
高能预警,这部分的代码 有些略多,建议读者打开源码边看边读。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//本例是 AndroidSchedulers.mainThread()
final Scheduler scheduler;
//默认false
final boolean delayError;
//默认128
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// false
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//1 建立出一个 主线程的Worker
Scheduler.Worker w = scheduler.createWorker();
//2 订阅上游数据源,
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}复制代码
本例中,就是两步:
AndroidSchedulers.mainThread()
对应的Worker
ObserveOnObserver
订阅上游数据源。这样当数据从上游push下来,会由ObserveOnObserver
对应的onXXX()
处理。static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
//下游的观察者
final Observer<? super T> actual;
//对应Scheduler里的Worker
final Scheduler.Worker worker;
//上游被观察者 push 过来的数据都存在这里
SimpleQueue<T> queue;
Disposable s;
//若是onError了,保存对应的异常
Throwable error;
//是否完成
volatile boolean done;
//是否取消
volatile boolean cancelled;
// 表明同步发送 异步发送
int sourceMode;
....
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//省略大量无关代码
//建立一个queue 用于保存上游 onNext() push的数据
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回调下游观察者onSubscribe方法
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
//1 执行过error / complete 会是true
if (done) {
return;
}
//2 若是数据源类型不是异步的, 默认不是
if (sourceMode != QueueDisposable.ASYNC) {
//3 将上游push过来的数据 加入 queue里
queue.offer(t);
}
//4 开始进入对应Workder线程,在线程里 将queue里的t 取出 发送给下游Observer
schedule();
}
@Override
public void onError(Throwable t) {
//已经done 会 抛异常 和 上一篇文章里提到的同样
if (done) {
RxJavaPlugins.onError(t);
return;
}
//给error存个值
error = t;
done = true;
//开始调度
schedule();
}
@Override
public void onComplete() {
//已经done 会 返回 不会crash 和上一篇文章里提到的同样
if (done) {
return;
}
done = true;
//开始调度
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//该方法须要传入一个线程, 注意看本类实现了Runnable的接口,因此查看对应的run()方法
worker.schedule(this);
}
}
//从这里开始,这个方法已是在Workder对应的线程里执行的了
@Override
public void run() {
//默认是false
if (outputFused) {
drainFused();
} else {
//取出queue里的数据 发送
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
// 1 若是已经 终止 或者queue空,则跳出函数,
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//2 从queue里取出一个值
v = q.poll();
} catch (Throwable ex) {
//3 异常处理 并跳出函数
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
//4 再次检查 是否 终止 若是知足条件 跳出函数
if (checkTerminated(d, empty, a)) {
return;
}
//5 上游还没结束数据发送,可是这边处理的队列已是空的,不会push给下游 Observer
if (empty) {
//仅仅是结束此次循环,不发送这个数据而已,并不会跳出函数
break;
}
//6 发送给下游了
a.onNext(v);
}
//7 对不起这里我也不是很明白,大体猜想是用于 同步原子操做 若有人知道 烦请告知
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//检查 是否 已经 结束(error complete), 是否没数据要发送了(empty 空),
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//若是已经disposed
if (cancelled) {
queue.clear();
return true;
}
// 若是已经结束
if (d) {
Throwable e = error;
//若是是延迟发送错误
if (delayError) {
//若是空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//中止worker(线程)
worker.dispose();
return true;
}
} else {
//发送错误
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
//发送complete
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}复制代码
核心处都加了注释,总结起来就是,
ObserveOnObserver
实现了Observer
和Runnable
接口。onNext()
里,先不切换线程,将数据加入队列queue
。而后开始切换线程,在另外一线程中,从queue
里取出数据,push
给下游Observer
onError()
onComplete()
除了和RxJava2 源码解析(一)提到的同样特性以外,也是将错误/完成信息先保存,切换线程后再发送。observeOn()
影响的是其下游的代码,且屡次调用仍然生效。 Observer
里onXXX()
作的,这是一个主动的push行为(影响下游)。subscribeOn()
切换线程是在subscribeActual()
里作的,只是主动切换了上游的订阅线程,从而影响其发射数据时所在的线程。而直到真正发射数据以前,任何改变线程的行为,都会生效(影响发射数据的线程)。因此subscribeOn()
只生效一次。observeOn()
是一个主动的行为,而且切换线程后会马上发送数据,因此会生效屡次.转载请标明出处:
juejin.im/post/58ce8c…
本文出自:【张旭童的稀土掘金】(gold.xitu.io/user/56de21…)
本文带你们走读分析了三个东西:
map操做符原理:
Observable
进行订阅Observer
.Observable
和其内部订阅者、是装饰者模式的体现。线程调度subscribeOn()
:
Observable
进行订阅,这样上游发送数据时就是处于被切换后的线程里了。Observer
.Observable
中。线程调度observeOn()
:
Observer
对上游Observable
进行订阅Observer
中onXXX()
方法里,将待发送数据存入队列,同时请求切换线程处理真正push数据给下游。源码里那些实现了Runnable
的类或者匿名内部类,最终并无像往常那样被丢给Thread
类执行。
而是先切换线程,再直接执行Runnable
的run()
方法。
这也加深了我对面向对象,对抽象、Runnable
的理解,它就是一个简简单单的接口,里面就一个简简单单的run()
,
我认为,之因此有Runnable
,只是抽象出 一个可运行的任务的概念。也许这句话很平淡,书上也会提到,各位大佬早就知道,可是现在我顺着RxJava2的源码这么走读了一遍,确真真切切的感觉到了这些设计思想的美妙。