GitHub关于RxJava的介绍:编程
a library for composing asynchronous and event-based programs by using observable sequences安全
他的意思就是 一个经过可观测的序列来组成异步和基于事件的库。bash
RxJava的出现消除同步问题、线程安全等问题app
总的来讲就是方便咱们异步编程。异步
异步async
链式调用结构ide
使用复杂的异步调用方式的时候依旧能够保持简洁异步编程
学习成本比较高,入门的门槛比较高学习
难以理解的API,须要查看源码才能理解API的具体效果ui
首先明白他的基础使用步骤:
正常建立被观察者:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("ONE");
emitter.onNext("TWO");
emitter.onNext("THREE");
emitter.onComplete();
}
});
复制代码
在这里面一共产生了四个事件:One、Two、Three、结束。
PS:
非正常建立第一弹:
Observable observable = Observable.just("ONE","TWO","THREE");
非正常建立第二弹:
String[] values = {"ONE", "TWO", "THREE"};
Observable observable = Observable.fromArray(values);
复制代码
其实这样的非正常建立是内部将这些信息包装成onNext()这样的事件发送给观察者。
正常建立:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("z", "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.i("z", "onNext: s = " + s);
}
@Override
public void onError(Throwable e) {
Log.i("z", "onError: ");
}
@Override
public void onComplete() {
Log.i("z", "onComplete: ");
}
};
复制代码
非正常建立:
Consumer<String> observer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("z", "accept: s = " + s);
}
};
复制代码
observable.subscribe(observer);
你已经注意到不同的地方,为何被观察者订阅了观察者?
之因此会这样,是由于RxJava为了保持链式调用的流畅性。
RxJava既然是异步库,固然对于异步的处理会更好
在咱们看RxJava的异步调用以前,咱们先来学习下其中比较重要的两个点
这个表示Observable在一个指定的环境下建立,只能使用一次,屡次建立的话会以第一次为准。
表示 事件传递和 最终处理发生在哪一个环境下,能够屡次调用,每次指定以后,下一步就生效。
好比:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("ONE");
emitter.onNext("TWO");
emitter.onNext("THREE");
emitter.onComplete();
}
}) // 被观察者在一个新的线程中建立
.subscribeOn(Schedulers.newThread())
// 下面这个操做是在io线程中
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s.toLowerCase();
}
})
// 切换,观察者是在主线程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
复制代码
先看一下基础的调用方式:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe: ");
emitter.onNext("ONE");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: e = " + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
}
});
复制代码
结果:
onSubscribe:
subscribe:
onNext: s = ONE
复制代码
咱们先从订阅开始看,也就是subscribe
方法
public final void subscribe(Observer<? super T> observer) {
... // 忽略部分源码
subscribeActual(observer);
... // 忽略部分源码
}
复制代码
直接找到主要的方法subscribeActual(observer)
,这个是抽象的方法,会被实如今子类中。
因此咱们接着看看Observable
的子类实现:
咱们进入到create
方法中:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
其实 返回的就是 ObservableCreate
的对象,
须要注意的是: ObservableCreate
是 Observable
的一个子类 ObservableCreate
被建立都会传入一个source
的字段,这个source
就是 ObservableOnSubscribe
。
在 ObservableCreate
具体实现了 subscribeActual
方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 在这里触发 observer#onSubscribe()
observer.onSubscribe(parent);
try {
// 在这里回调
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
在这里方法能够看到 观察者observer
的 onSubscribe
会先于回调发生。
而后调用 ObservableOnSubscribe
的方法 subscribe
具体的事件后由开发者去作, 能够看到在案例中调用了 CreateEmitter
,能够进入到 CreateEmitter
看看onNext()
的实现
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
复制代码
能够看到 在CreateEmitter
的onNext()
中调用了 观察者observer
的onNext()
方法.
而后能够看到案例中的调用:
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
复制代码