Android技能树系列:java
Android基础知识git
Android技能树 — 动画小结github
Android技能树 — Android存储路径及IO操做小结网络
数据结构基础知识ide
算法基础知识
Rx系列相关
Android技能树 — Rxjava取消订阅小结(1):自带方式
Android技能树 — Rxjava取消订阅小结(2):RxLifeCycle
如今不少项目都在使用Rxjava了,对于RxJava的使用,估计都很熟悉了,可是不少人在使用RxJava的时候容易产生内存泄漏问题,好比咱们在用RxJava配合Retrofit的时候,发出请求出去,拿到数据后咱们可能会去刷新界面,可是若是这时候网络比较差,返回比较慢,而咱们的Activity这时候关闭了,那RxJava当拿到返回的数据的时候去刷新界面就会报空指针异常了。因此咱们当Activity关闭的时候,咱们这时候若是RxJava还没执行完,咱们应该取消订阅。
经常使用的主要三种方式:(按照⭐️推荐从低到高来介绍)
本文主要讲解RxLifeCycle方式。
这里确定不会简单的介绍如何使用RxLifeCycle,github上面已经写得很清楚了,RxLifecycle github连接,咱们主要是看具体的实现原理。
简单使用:
假设咱们的Activity是继承RxActivity (PS: 不是必定要继承的,只是说明一种使用状况,具体能够看GitHub)
//手动设定解除订阅的时间:(ps:这里设为onPause的时候解除订阅)
myObservable
.compose(this.bindUntilEvent(ActivityEvent.PAUSE))
.subscrbe();
/**
自动设定解除订阅的时间:
(ps:好比你是在onStart时候订阅,则自动会在onPause时候解除,
若是在onCreate时候订阅,则会自动在onDestory时候解除)
*/
myObservable
.compose(this.bindToLifecycle())
.subscribe();
复制代码
在介绍RxLifeCycle以前,先介绍一些基础知识,加深你们的理解。
咱们知道在RxBus中咱们使用的是Subject ,由于它既能够是观察者又是被观察者。而Subject有不少种类:子类有PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject、SerializedSubject。
具体每种的区别能够看:RxJava中常见的几种Subject
这里咱们主要讲解BehaviorSubject。
Subject that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.
大意是BehaviorSubject会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值(若是有的话)。
正好上面讲到了Subject,顺带提一下冷热Observable。和RxLifeCycle关系不大,可是能够当了解,不想看的能够跳过 1. 2 基础知识。
所谓的冷热和咱们单例模式中的饿汉式和饱汉式有一点点像,冷Observable须要有订阅者的时候才开始发射数据(有点像饱汉式),热Observable并非必定须要订阅者才开始发射数据(有点像饿汉式)。
PS: 你们也能够具体参考文章拥抱 RxJava(三):关于 Observable 的冷热,常见的封装方式以及误区,一些图片及说明我这边也直接引用该文章。
咱们常见的工厂方法提供的都是Cold Observable,包括just(),fromXX,create(),interval(),defer()。 他们有订阅者的时候才会发射数据,而且他们的共同点是当你有多个Subscriber的时候,他们的事件是独立的。
Observable interval = Observable.interval(1,TimeUnit.SECONDS);
复制代码
不一样于Cold Observable, Hot Observable是共享数据的。对于Hot Observable的全部subscriber,他们会在同一时刻收到相同的数据。咱们一般使用publish()操做符来将Cold Observable变为Hot。或者咱们在RxBus中经常用到的Subjects 也是Hot Observable。
而Hot Observable不须要有订阅者,只须要调用connect()
方法就会开始发送数据,这时候当其余订阅这个Observable的时候,并不会从头开始接受数据。
而经常使用的Hot Observable 是 ConnectableObservable。
咱们能够看到takeUtil操做符的功能: 在第二个Observable发射一个内容或终止后放弃第一个Observable发射的内容。
因此咱们立刻就能够想到假设第一个是咱们的网络请求接口的Observable , 而后经过takeUntil绑定了一个其余的Observable , 好比咱们是要在onDestory时候取消订阅,那只须要在onDestory方法里面使第二个Observable发送一个内容便可。
就如同字面意思,起到过滤做用,你写一个条件,只有符合条件的发送信息才会被接收到。
observable.filter(new Predicate<R>() {
@Override
public boolean test(R r) throws Exception {
//根据过滤条件,来决定返回是false/true
return false/true;
}
});
复制代码
这个方法在Rxjava 1 里面叫作asObservable() 。可能不少人没用过,主要仍是用在Subject。
好比你写了一个Subject,你想暴露出去一个接口让别人使用,你可能会这么写:
public class Test {
//假设是BehaviorSubject
private BehaviorSubject subject = BehaviorSubject.create();
//把Observable这块方面经过方法分享出去,可是又不想整个Subject都分享出去。
public Observable getObservable(){
return ((Observable) subject);
}
//好比你调用play方法,按照要求只能发送1,2,3
public void play(){
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
}
}
复制代码
可是这么写没啥卵用,只要获取后强制转换就能够:
//又能够发送相关数据
((BehaviorSubject) getObservable()).onNext(99999);
复制代码
因此这时候须要使用asObservable方法了:这实际上只是将您的主题封装在一个可观察的对象中,这使得消费代码没法将其转换回主题,asObservable是隐藏实现细节的一种防护机制。
//改变暴露的方法:
public Observable getObservable(){
return subject.asObservable();
}
//这时候就算你强转也没用,会报错,由于这时候经过asObservable获取到的对象已经不是Subject对象了。
((BehaviorSubject) getObservable()).onNext(99999);
复制代码
而在Rxjava 2 中只是把这个asObservable 方法改为了 hide方法而已。用法相同。
Tramsformer有不少种:ObservableTransformer,FlowableTransformer,SingleTransformer,MaybeTransformer,CompletableTransformer。
咱们这里已ObservableTransformer为例:
ObservableTransformer其实能够理解为Observable 转换器:能够经过它将一种类型的Observable转换成另外一种类型的Observable。
好比日常时候每一个observable咱们都须要写上这段代码::
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(XXXXX);
复制代码
明明知道大部分的observable要使用的是
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
复制代码
因此有些人就会想到我写一个方法:
public Observable defaultSet(Observable ob){
return ob.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
复制代码
固然这么写是没问题,可是你每一个请求都要求用defaultSet方法包住了:
defaultSet(observable)
.subScribe(XXXXXX);
复制代码
没有了链式调用的清爽了,因此这时候ObservableTransformer 就出现了:
public class Transformer {
public static <T> ObservableTransformer<T, T> switchSchedulers() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
}
复制代码
这时候咱们只须要调用:
observable.compose(Transformer.<Object>switchSchedulers()).subScribe(XXXX);
复制代码
因此咱们知道了,咱们想把一个Observable转变成另一个Observable可使用ObservableTransformer。
两个Observable发射,合并每一个Observable发射的最新内容,而后发出去,看下面的图片就很清楚。
take操做符:
只发出Observable发出的前n个item。
skip操做符: 压制Observable发出的前n个item。
经过对每一个item应用函数来转换Observable发出的item
在Observable发射数据时,有时发送onError通知,致使观察者不能正常接收数据。但是,有时咱们但愿对Observable发射的onError通知作出响应或者从错误中恢复。
具体主要有三种不一样操做符来实现:
具体描述能够参考:RxJava之错误处理
咱们已Activity中取消订阅为例:
RxActivity.java(代码说明具体查看源码里面的备注):
public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
//建立一个BehaviorSubject,用来作takeUntil中的第二个Observable,让其在核实的生命周期发送信息。
private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
@Override
@NonNull
@CheckResult
public final Observable<ActivityEvent> lifecycle() {
//使用hide()方法把这个subject暴露出去
return lifecycleSubject.hide();
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
@Override
@CallSuper
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleSubject.onNext(ActivityEvent.CREATE);
}
@Override
@CallSuper
protected void onStart() {
super.onStart();
//在onStart时候发送信息
lifecycleSubject.onNext(ActivityEvent.START);
}
@Override
@CallSuper
protected void onResume() {
super.onResume();
//在onResume时候发送信息
lifecycleSubject.onNext(ActivityEvent.RESUME);
}
@Override
@CallSuper
protected void onPause() {
//在onPause时候发送信息
lifecycleSubject.onNext(ActivityEvent.PAUSE);
super.onPause();
}
@Override
@CallSuper
protected void onStop() {
//在onStop时候发送信息
lifecycleSubject.onNext(ActivityEvent.STOP);
super.onStop();
}
@Override
@CallSuper
protected void onDestroy() {
//在onDestroy时候发送信息
lifecycleSubject.onNext(ActivityEvent.DESTROY);
super.onDestroy();
}
}
复制代码
同时咱们也注意到一个小细节: 在onCreate , onStart , onResume的时候,都是先调用super.XXX, 而后再用subject 发送相关Event;可是在 onPause , onStop , onDestory 里面倒是先用subject 发送相关Event,而后再调用super.XXXX。为啥会有这个区别。由于通常取消订阅都是在onPause,onStop,onDestory情形下,因此优先先取消订阅,再去执行系统本身的操做。好比onDestory,先去取消订阅,再去执行super.onDestory方法。
咱们先来说解手动设定某个生命周期做为取消订阅,咱们知道主要是使用:
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
复制代码
咱们经过上面的基础知识,应该知道咱们的目的是把咱们本身的Obsevable和RxActivity里面的BehaviorSubject经过takeUntil绑定在一块儿,由于RxActivity里面全部的生命周期都发送了相应的ActivityEvent事件,因此咱们须要使用filter来过滤掉不是咱们关心的生命周期事件 ,最后经过ObservableTransformer来把咱们的Observable进行转换成这个合成好的《Observable & BehaviorSubject》。
因此咱们总体思路知道了,咱们来具体看bindUntilEvent源码:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle, @Nonnull final R event) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(event, "event == null");
return bind(takeUntilEvent(lifecycle, event));
}
复制代码
能够看到主要是bind和 takeUntilEvent二个方法,咱们先看takeUntilEvent方法:
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
return lifecycle.filter(new Predicate<R>() {
@Override
public boolean test(R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(event);
}
});
}
复制代码
由于咱们前面提过,咱们在生命周期中都会让subject发送相应的ActivityEvent事件,因此咱们这里只是把这个subject经过filter过滤,而后只发送咱们指定的生命周期。
咱们再来看bind方法,这时候就知道bind方法的目的是为了帮咱们的Observable和这个已经使用过filter的subject进行绑定并返回:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
return new LifecycleTransformer<>(lifecycle);
}
复制代码
返回了LifecycleTransformer:
public final class LifecycleTransformer<T> implements
ObservableTransformer<T, T>,
FlowableTransformer<T, T>,
SingleTransformer<T, T>,
MaybeTransformer<T, T>,
CompletableTransformer
复制代码
咱们能够看到LifecycleTransformer实现了不少Transformer,由于这样咱们使用Observable或者Single等均可以来进行转换。好比咱们是Observable,那咱们就会调用LifecycleTransformer里面实现的的ObservableTransformer对应的apply方法:
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.takeUntil(observable);
}
复制代码
咱们看到果真调用了takeUntil,把咱们的Observable经过takeUntil与已经处理好指定ActivityEvent的subject进行绑定。
最终咱们只须要:
myObservable.compose(bindUntilEvent(ActivityEvent.PAUSE));
复制代码
自动取消订阅代码:
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
复制代码
咱们能够看到,大体其实和手动指定生命周期的是同样的,惟一的区别就是咱们要根据咱们设置订阅事件的生命周期推算出相对于的取消订阅生命周期。
咱们来看bindActivity源码:
@NonNull
@CheckResult
public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
return bind(lifecycle, ACTIVITY_LIFECYCLE);
}
复制代码
仍是老样子,bind最后确定是返回一个LifecycleTransformer:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(correspondingEvents, "correspondingEvents == null");
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
复制代码
先看takeUntilCorrespondingEvent方法:
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
复制代码
咱们先来看combineLatest里面的二个Observable:
lifecycle.take(1).map(correspondingEvents):
好比咱们在oncreate里面注册了订阅,咱们这时候就要告诉系统咱们要在onDestory里面进行取消订阅,因此咱们要先take(1)获取第一个(由于onstart,onresume等都会发送相应的ActivityEvent),而后经过map操做符来转换成相对的ActivityEvent:private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
new Function<ActivityEvent, ActivityEvent>() {
@Override
public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
switch (lastEvent) {
case CREATE:
return ActivityEvent.DESTROY;
case START:
return ActivityEvent.STOP;
case RESUME:
return ActivityEvent.PAUSE;
case PAUSE:
return ActivityEvent.STOP;
case STOP:
return ActivityEvent.DESTROY;
case DESTROY:
throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
default:
throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
}
}
};
复制代码
因此总结就是第一个Observable用来记录咱们等会要在那个生命周期去取消订阅。
因此总结第二个Observable用来实时发送生命周期的事件。
而后经过combineLatest把二个绑定一块儿,这时候就会在指定的生命周期时候就会发送true,其他时候发送false,最后配合filter操做符,只有在true的时候才能发送便可。这样最终经过takeUntil再把咱们的Observable绑定在一块儿,而后这时候这里发送true的时候,咱们的Observable就会取消订阅了。
有些人会问,为何我使用了RxLifeCycle,就算到了相应生命周期了,仍是会调用onComplete方法,由于有些人可能在这个方法里面有相应逻辑处理代码。由于RxLifeCycle主要使用的是takeUntil,因此最后仍是会执行onComplete,若是想取消订阅的时候不调用这个,仍是能够直接使用原生的Disposable来进行取消订阅。
Why Not RxLifecycle?。这文章的做者就是 RxLifeCycle 的做者 ,说了使用RxLifeCycle会遇到一些窘境 ,而是推荐了AutoDispose: Automatic binding+disposal of RxJava 2 streams.,这是Uber公司的开源Rxjava取消订阅。而RxLifeCycle做者也参与其中,因此一些设计方式也很像,AutoDipose主要是配合了Android的LifeCycle组件。
emmmmmm.......请多多指教。