RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,因为个人一个库cv4j使用了 RxJava2 来尝鲜,可是 RxJava2 跟 RxJava1 是不能同时存在于一个项目中的,逼不得已我得把本身全部框架中使用 RxJava 的地方以及
App 中使用 RxJava 的地方都升级到最新版本。因此我整理并记录了一些已经填好的坑。html
若是,在同一个module中同时使用RxJava1和RxJava2,相似以下:java
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'复制代码
那么,很不幸你会遇到这样的错误react
同理,在 App 中若是使用了 Rxjava2,可是某个第三方的 library 还在使用 Rxjava1 也会遇到一样的错误。android
上面的错误是由于 RxAndroid 2.0.1 自己依赖了 RxJava 2.0.1。咱们尝试去掉对 RxJava 的依赖,只留下 RxAndroid 。仍是会遇到问题。git
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
//compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
//compile 'io.reactivex:rxjava:1.1.5'复制代码
因此使用RxAndroid不能去掉对RxJava的依赖,我是这样使用的。github
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'复制代码
官方也是这样解释的app
Because RxAndroid releases are few and far between, it is recommended you also
explicitly depend on RxJava's latest version for bug fixes and new features.框架
最后,我建议要升级到 RxJava2 的时候必须全部使用的地方都要升级,而且用最新的版本。ide
RxJava1 中 Observable 不能很好地支持 backpressure ,会抛出MissingBackpressureException。因此在 RxJava2 中 Oberservable 再也不支持 backpressure ,而使用新增的 Flowable 来支持 backpressure 。关于backpressure 的理解,能够看这篇文章。this
Flowable的用法跟原先的Observable是同样的。
ActionN 和 FuncN 遵循Java 8的命名规则。
其中,Action0 更名成Action,Action1更名成Consumer,而Action2更名成了BiConsumer,而Action3 - Action9都再也不使用了,ActionN变成了Consumer
一样,Func更名成Function,Func2更名成BiFunction,Func3 - Func9 更名成 Function3 - Function9,FuncN 由 Function
原先RxJava1的写法:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});复制代码
如今的写法:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});复制代码
结合上一条,ObservableOnSubscribe 再也不使用 Subscriber 而是用 ObservableEmitter 替代。
ObservableEmitter 能够理解为发射器,是用来发出事件的,它能够发出三种类型的事件,经过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)能够分别发出next事件、complete事件和error事件。 若是只关心next事件的话,只需单独使用onNext()便可。
须要特别注意,emitter的onComplete()调用后,Consumer再也不接收任何next事件。
原先RxJava1的写法:
/** * 跟compose()配合使用,好比ObservableUtils.wrap(obj).compose(toMain()) * @param <T> * @return */
public static <T> Observable.Transformer<T, T> toMain() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}复制代码
如今的写法:
/** * 跟compose()配合使用,好比ObservableUtils.wrap(obj).compose(toMain()) * @param <T> * @return */
public static <T> ObservableTransformer<T, T> toMain() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}复制代码
因为新增了Flowable,同理也增长了FlowableTransformer
public static <T> FlowableTransformer<T, T> toMain() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}复制代码
在 RxJava2 中,因为已经存在了 org.reactivestreams.subscription 这个类,为了不名字冲突将原先的 rx.Subscription 更名为 io.reactivex.disposables.Disposable。
刚开始不知道,在升级 RxJava2 时发现 org.reactivestreams.subscription 这个类彻底无法作原先 rx.Subscription 的事情:(
顺便说下,Disposable必须单次使用,用完就要销毁。
官方文档是这么描述的first()的用法
1.x | 2.x |
---|---|
first() | RC3 renamed to firstElement and returns Maybe
|
first(Func1) | dropped, use filter(predicate).first() |
firstOrDefault(T) | renamed to first(T) and RC3 returns Single
|
firstOrDefault(Func1, T) | renamed to first(T) and RC3 returns Single
|
以first(Func1)为例,first(Func1)后面还使用了push(),原先 Rxjava1会这样写
ConnectableObservable<Data> connectableObservable = Observable.concat(Observable.from(list)).first(new Func1<Data, Boolean>() {
@Override
public Boolean call(Data data) {
return DataUtils.isAvailable(data);
}
}).publish();复制代码
RxJava2 改为这样
ConnectableObservable<Data> connectableObservable = Observable.concat(Observable.fromIterable(list)).filter(new Predicate<Data>() {
@Override
public boolean test(@NonNull Data data) throws Exception {
return DataUtils.isAvailable(data);
}
}).firstElement().toObservable().publish();复制代码
在个人框架中存在着一个Optional类,它跟Java 8的Optional做用差很少,原先是使用RxJava1来编写的。
import rx.Observable;
/** * 使用方法: * String s = null; * Optional.ofNullable(s).orElse("default")); // 若是s为null,则显示default,不然显示s的值 * @author Tony Shen * */
public class Optional<T> {
Observable<T> obs;
public Optional(Observable<T> obs) {
this.obs = obs;
}
public static <T> Optional<T> of(T value) {
if (value == null) {
throw new NullPointerException();
} else {
return new Optional<T>(Observable.just(value));
}
}
public static <T> Optional<T> ofNullable(T value) {
if (value == null) {
return new Optional<T>(Observable.<T>empty());
} else {
return new Optional<T>(Observable.just(value));
}
}
public T get() {
return obs.toBlocking().single();
}
public T orElse(T defaultValue) {
return obs.defaultIfEmpty(defaultValue).toBlocking().single();
}
}复制代码
升级到RxJava2以后,get() 和 orElse() 方法都会报错,修改以后是这样的。
import io.reactivex.Observable;
/** * 使用方法: * String s = null; * Optional.ofNullable(s).orElse("default"); // 若是s为null,则显示default,不然显示s的值 * @author Tony Shen * */
public class Optional<T> {
Observable<T> obs;
public Optional(Observable<T> obs) {
this.obs = obs;
}
public static <T> Optional<T> of(T value) {
if (value == null) {
throw new NullPointerException();
} else {
return new Optional<T>(Observable.just(value));
}
}
public static <T> Optional<T> ofNullable(T value) {
if (value == null) {
return new Optional<T>(Observable.<T>empty());
} else {
return new Optional<T>(Observable.just(value));
}
}
public T get() {
return obs.blockingSingle();
}
public T orElse(T defaultValue) {
return obs.defaultIfEmpty(defaultValue).blockingSingle();
}
}复制代码
包括 PublishSubject 以及各类 Subject(ReplaySubject、BehaviorSubject、AsyncSubject) 都再也不支持backpressure。
RxJava2 所带来的变化远远不止这些,之后遇到的话还会继续整理和总结,毕竟我使用的 RxJava2 仍是不多的一部份内容。
RxJava2 最好到文档依然是官方文档。若是是新项目到话,能够坚决果断地使用RxJava2,若是是在线上已经成熟稳定的项目,能够再等等。对于新手的话,能够直接从 RxJava2 学起,RxJava1 就直接略过吧。对于老手,RxJava2 仍是使用原来的思想,区别不大,从 RxJava1 迁移到 Rxjava2 也花不了多少工夫。
参考资料: