以前就写过一篇关于Rxjava最友好的文章,反响很不错,因为那篇文章的定位就是简单友好,所以尽量的摒弃复杂的概念,只抓住关键的东西来说,以保证你们都能看懂。javascript
不过那篇文章写完以后,我就以为应该还得有一篇文章给RxJava作一个深刻的讲解才算完美,因而就有了今天的进阶篇。由于一个团队里可能你们都会用RxJava,可是必需要有一我的很懂这个,否则碰到问题可就麻烦了。java
在前一篇文章中的最后,咱们得出结论:RxJava就是在观察者模式的骨架下,经过丰富的操做符和便捷的异步操做来完成对于复杂业务的处理。今天咱们仍是就结论中的观察者模式和操做符来作深刻的拓展。git
在进入正题以前,仍是但愿你们先去看看关于Rxjava最友好的文章。github
前一篇文章首先就重点谈到了观察者模式,咱们认为观察者模式RxJava的骨架*。在这里不是要推翻以前的结论,而是但愿从深刻它的内部的去了解它的实现。架构
依然使用以前文章中关于开关和台灯的代码app
//建立一个被观察者(开关)
Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("On");
subscriber.onNext("Off");
subscriber.onNext("On");
subscriber.onNext("On");
subscriber.onCompleted();
}
});
//建立一个观察者(台灯)
Subscriber light=new Subscriber<String>() {
@Override
public void onCompleted() {
//被观察者的onCompleted()事件会走到这里;
Log.d("DDDDDD","结束观察...\n");
}
@Override
public void onError(Throwable e) {
//出现错误会调用这个方法
}
@Override
public void onNext(String s) {
//处理传过来的onNext事件
Log.d("DDDDD","handle this---"+s)
}
//订阅
switcher.subscribe(light);复制代码
以上就是一个RxJava观察者架构,
看到这样的代码不知道你会不会有一些疑惑:框架
其实,这些问题均可以经过了解OnSubscribe来解决。异步
那咱们先来看看关于OnSubscribe的定义ide
//上一篇文章也提到Acton1这个接口,内部只有一个待实现call()方法
//没啥特别,人畜无害
public interface Action1<T> extends Action {
void call(T t);
}
//OnSubscribe继承了这个Action1接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// OnSubscribe仍然是个接口
}复制代码
那么也就是说,OnSubscribe本质上也是和 Action1同样的接口,只不过它专门用于Observable内部。函数
而在Observable观察者的类中,OnSubscribe是它惟一的属性,同时也是Observable构造函数中惟一必须传入的参数,也就是说,只要建立了Observable,那么内部也必定有一个OnSubscribe对象。
固然,Observable是没有办法直接new的,咱们只能经过create(),just()等等方法建立,固然,这些方法背后去调用了new Observable(onSubscribe)
public class Observable<T> {
//惟一的属性
final OnSubscribe<T> onSubscribe;
//构造函数,由于protected,咱们只能使用create函数
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
//create(onSubscribe) 内部调用构造函数。
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
....
....
}复制代码
当建立了Observable和Subscribe以后,调用subscribe(subscriber)方法时,发生了什么呢?
//传入了观察者对象
public final Subscription subscribe(final Observer<? super T> observer) {
....
//往下调用
return subscribe(new ObserverSubscriber<T>(observer));
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
//往下调用
return Observable.subscribe(subscriber, this);
}
//调用到这个函数
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
// add a significant depth to already huge call stacks.
try {
// 在这里简单讲,对onSubscribe进行封装,没必要紧张。
OnSubscribe onSubscribe=RxJavaHooks.onObservableStart(observable, observable.onSubscribe);
//这个才是重点!!!
//这个调用的具体实现方法就是咱们建立观察者时
//写在Observable.create()中的call()呀
//而调用了那个call(),就意味着事件开始发送了
onSubscribe.call(subscriber);
//不信你往回看
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
....
....
}
return Subscriptions.unsubscribed();
}
}复制代码
代码看到这里,咱们就能够对上面三个问题作统一的回答了:
到这里,你是否是对于RxJava的观察者模式了解更加清晰了呢?咱们用流程图复习一下刚才的过程。
了解了上面这些,咱们就能够更进一步作如下总结:
以上的结论对于下面咱们理解操做符的原理十分有帮助,所以必定要看明白。
观察者模式介绍到这里,才敢说讲完了。
上一篇文章讲了一些操做符,而且在github上放了不少其余的操做符使用范例给你们,所以在这里不会介绍更多操做符的用法,而是讲解操做符的实现原理。他是如何拦截事件,而后变换处理以后,最后传递到观察者手中的呢?
相信了解相关内容的人可能会想到lift()操做符,它原本是其余操做符作变换的基础,不过那已是好几个版本之前的事情了。可是目前版本中RxJava已经不同了了,直接把lift()的工做下放到每一个操做符中,把lift的弱化了(可是依然保留了lift()操做符)。
所以,咱们在这里没必要讲解lift,直接拿一个操做符作例子,来了解它的原理便可,由于基本上操做符的实现原理都是同样的。
以map()为例,依然拿以前文章里面的例子:
Observable.just(getFilePath())
//使用map操做来完成类型转换
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//显然自定义的createBitmapFromPath(s)方法,是一个极其耗时的操做
return createBitmapFromPath(s);
}
})
.subscribe(
//建立观察者,做为事件传递的终点处理事件
new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
Log.d("DDDDDD","结束观察...\n");
}
@Override
public void onError(Throwable e) {
//出现错误会调用这个方法
}
@Override
public void onNext(Bitmap s) {
//处理事件
showBitmap(s)
}
);复制代码
看看map背后到底作了什么
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
//建立了全新代理的的Observable,构造函数传入的参数是OnSubscribe
//OnSubscribeMap显然是OnSubscribe的一个实现类,
//也就是说,OnSubscribeMap须要实现call()方法
//构造函数传入了真实的Observable对象
//和一个开发者本身实现的Func1的实例
return create(new OnSubscribeMap<T, R>(this, func));
}复制代码
看OnSubscribeMap的具体实现:
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
//用于保存真实的Observable对象
final Observable<T> source;
//还有咱们传入的那个Func1的实例
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
//实现了call方法,咱们知道call方法传入的Subscriber
//就是订阅以后,外部传入真实的的观察者
@Override
public void call(final Subscriber<? super R> o) {
//把外部传入的真实观察者传入到MapSubscriber,构造一个代理的观察者
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
//让外部的Observable去订阅这个代理的观察者
source.unsafeSubscribe(parent);
}
//Subscriber的子类,用于构造一个代理的观察者
static final class MapSubscriber<T, R> extends Subscriber<T> {
//这个Subscriber保存了真实的观察者
final Subscriber<? super R> actual;
//咱们本身在外部本身定义的Func1
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
//外部的Observable发送的onNext()等事件
//都会首先传递到代理观察者这里
@Override
public void onNext(T t) {
R result;
try {
//mapper其实就是开发者本身建立的Func1,
//call()开始变换数据
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
//调用真实的观察者的onNext()
//从而在变换数据以后,把数据送到真实的观察者手中
actual.onNext(result);
}
//onError()方法也是同样
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}复制代码
map操做符的原理基本上就讲完了,其余的操做符和map在原理上是一致的。
假如你想建立自定义的操做符(这实际上是不建议的),你应该按照上面的步骤
我知道你会有点晕,不要紧,我后面会写一个自定义操做符放在个人github上,能够关注下。
下面,咱们先经过一个流程图巩固一下前面学习的成果。
下次你使用操做符时,内心应该清楚,每使用一个操做符,都会建立一个代理观察者和一个代理被观察者,以及他们之间是如何相互调用的。相信有了这一层的理解,从此使用RxJava应该会更加驾轻就熟。
不过最后,我想给你们留一个思考题:使用一个操做符时,流程图是这样的,那么使用多个操做符呢?
暂无
到这里,关于RxJava的讲解就基本能够告一段落了,
我相信,两篇文章读下来,对于RxJava的理解应该已经到了比较高的一个层次了,个人目标也就达到了。
接下来....
由于RxJava是一个事件的异步处理框架,理论上,他能够封装任何其余的库,那么.....