RxLife是一款轻量级别的RxJava生命周期管理库,代码侵入性极低,随用随取,不须要作任何准备工做,支持在Activity/Fragment 的任意生命周期方法断开管道。java
RxLife经过Jetpack 下的 Lifecycle 获取 Activity/Fragment 的生命周期变化,并经过Observable.lift(ObservableOperator)
操做符,注入本身实现的Observer对象(该对象能感知 Activity/Fragment的生命周期变化),从而在onSubscribe(Disposable d)
方法中拿到Disposable对象,随后在相应的生命周期回调里执行Disposable.dispose()
方法断开管道,这样就能将lift
操做符上面的全部Disposable对象所有断开。git
熟悉RxJava的同窗应该都知道trello/RxLifecycle 项目,它在目前的3.0.0版本中经过Lifecycle
感知Activity/Fragment 的生命周期变化,并经过BehaviorSubject
类及compose
、takeUntil
操做符来实现管道的中断,这种实现原理有一点不足的是,它在管道断开后,始终会往下游发送一个onComplete
事件,这对于在onComplete
事件中有业务逻辑的同窗来讲,无疑是致命的。那为何会这样呢?由于takeUntil
操做符内部实现机制就是这样的,有兴趣的同窗能够去阅读takeUntil
操做符的源码,这里不展开。而RxLife就不会有这样问题,由于在原理上RxLife
就与trello/RxLifecycle
不一样,而且RxLife还在lift
操做都的基础上提供了一些额外的api,能有效的避免因RxJava内部类持有Activity/Fragment的引用,而形成的内存泄漏问题,下面开始讲解。github
gradle依赖segmentfault
implementation 'com.rxjava.rxlife:rxlife:1.0.4'
源码下载api
Observable.timer(10, TimeUnit.SECONDS) //默认在onDestroy时中断管道 .lift(RxLife.lift(this)) .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); }); //或者 Observable.timer(10, TimeUnit.SECONDS) //指定在onStop时中断管道 .lift(RxLife.lift(this,Event.ON_STOP)) .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); });
在Activity/Fragment 中,使用Observable的lift()
操做符,方法中传入RxLife.lift(this)
,若是须要指定生命周期方法,额外再传一个Event对象便可。怎么样??是否是极其简单,根本不须要作任何准备工做,代码侵入性极低。安全
咱们来看一个案例app
public void leakcanary(View view) { Observable.timer(100, TimeUnit.MILLISECONDS) .map(new MyFunction<>()) //阻塞操做 .lift(RxLife.lift(this)) .subscribe(new Consumer<Long>() { //这里使用匿名内部类,持有Activity的引用 //注意这里不能使用Lambda表达式,不然leakcanary检测不到内存泄漏 @Override public void accept(Long aLong) throws Exception { Log.e("LJX", "accept =" + aLong); } }); } //这里使用静态内部类,不会持有外部类的引用 static class MyFunction<T> implements Function<T, T> { @Override public T apply(T t) throws Exception { //当dispose时,第一次睡眠会被吵醒,接着便会进入第二次睡眠 try { Thread.sleep(3000); } catch (Exception e) { } try { Thread.sleep(30000); } catch (Exception e) { } return t; } }
上面的代码会形成Activity没法回收,致使内存泄漏,咱们用Leakcannry工具来检测一下,发现确实形成来内存泄漏,以下
咱们已经使用RxLife
库,会自动中断管道,那为何还会形成内存泄漏呢?其实缘由很简单,咱们只是中断了管道,而没有中断上游对下游引用。看上面的截图就能知道,上游始终持有下游的引用,而最下游的匿名内部类Consumer
又持有了Activity的引用,因此就致使了Activity没法回收。框架
那为何中断管道时,不会中断上下游的引用呢?ide
首先有一点咱们须要明确,调用Disposable.dispose()
方法来断开管道,并非真正意义上的将上游与下游断开,它只是改变了管道上各个Observer对象的一个标志位的值,咱们来看一下LambdaObserver
类的源码就会知道工具
@Override public void dispose() { DisposableHelper.dispose(this); }
呃呃,只有一行代码,咱们继续
public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); //此处获得上游的Disposable对象 Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); //更改本身的标志位为DISPOSED if (current != d) { if (current != null) { current.dispose();//关闭上游的Disposable对象 } return true; } } return false; }
能够看到,这里只作了两件事,一是更改本身的标志位,二是调用上游的dispose()
方法,其实你只要多看看,你就发现,RxJava内部大多数Observer在dispose()
方法都会干这两件事。
到这,咱们该如何解决这个内存泄漏问题呢?其实,RxJava早就想到了这一点,它给咱们提供了一个onTerminateDetach()
操做符,这个操做符会在onError(Throwable t)
、onComplete()
、dispose()
这个3个时刻,断开上游对下游的引用,咱们来看看源码,源码在ObservableDetach
类中
@Override public void dispose() { Disposable d = this.upstream; this.upstream = EmptyComponent.INSTANCE;//上游从新赋值 this.downstream = EmptyComponent.asObserver();//下游从新赋值 d.dispose();//调用上游的dispose()方法 } @Override public void onError(Throwable t) { Observer<? super T> a = downstream; this.upstream = EmptyComponent.INSTANCE;//上游从新赋值 this.downstream = EmptyComponent.asObserver();//下游从新赋值 a.onError(t); //调用下游的onError方法 } @Override public void onComplete() { Observer<? super T> a = downstream; this.upstream = EmptyComponent.INSTANCE;//上游从新赋值 this.downstream = EmptyComponent.asObserver();//下游从新赋值 a.onComplete();//调用下游的onComplete方法 }
到这,咱们就知道该怎么作了,下面这样写就安全了
Observable.timer(100, TimeUnit.MILLISECONDS) .map(new MyFunction<>())//阻塞操做 .onTerminateDetach() //管道断开时,中断上游对下游的引用 .lift(RxLife.lift(this)) //默认在onDestroy时断开管道 .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); });
但是,每次都要这样写吗?有没有更简单的,有,RxLife提供了RxLife.compose(LifecycleOwner)
方法,内部就是将onTerminateDetach
、lift
这两个操做符整合在了一块儿,接下来,看看如何使用
Observable.timer(100, TimeUnit.MILLISECONDS) .map(new MyFunction<>())//阻塞操做 //注意这里使用compose操做符 .compose(RxLife.compose(this))//默认在onDestroy时中断管道,并中断下下游之间的引用 .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); });
若是须要指定生命周期的方法,也能够
Observable.timer(100, TimeUnit.MILLISECONDS) .map(new MyFunction<>())//阻塞操做 //注意这里使用compose操做符 .compose(RxLife.compose(this, Event.ON_STOP))//指定在onStop时断开管道 .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); }); }
大多数状况下,咱们但愿观察者能主线程进行回调,也许你会这样写
Observable.timer(100, TimeUnit.MILLISECONDS) .map(new MyFunction<>())//阻塞操做 .observeOn(AndroidSchedulers.mainThread()) //在主线程回调 .compose(RxLife.compose(this, Event.ON_STOP))//指定在onStop回调时中断管道,并中断上下游引用 .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); });
若是你是用RxLife的话,就能够这样写,使用RxLife.composeOnMain
方法
Observable.timer(100, TimeUnit.MILLISECONDS) .map(new MyFunction<>())//阻塞操做 //在主线程进程回调,在onStop回调时中断管道,并中断上下游引用 .compose(RxLife.composeOnMain(this, Event.ON_STOP)) .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); });
RxLife类就只有6个静态方法,以下
结合RxLife使用Observable的lift
、compose
操做符时,下游除了subscribe
操做符外最好不要有其它的操做符,前面讲过,当调用Disposable.dispose()
时,它会往上一层一层的调用上游的dispose()
方法,若是下游有Disposable
对象,是调用不到的,若是此时下游有本身的事件须要发送,那么就没法拦截了。
如:
Observable.just(1) .compose(RxLife.compose(this)) .flatMap((Function<Integer, ObservableSource<Long>>) integer -> { //每隔一秒发送一个数据,共10个 return Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS); }) .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); });
这样,即便Activity关闭了,观察者每隔一秒后,依然能收到来自上游的事件,由于compose
没法切断下游的管道,咱们改一下上面的代码
Observable.just(1) .flatMap((Function<Integer, ObservableSource<Long>>) integer -> { //每隔一秒发送一个数据,共10个 return Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS); }) .compose(RxLife.compose(this)) .subscribe(aLong -> { Log.e("LJX", "accept =" + aLong); });
这样ok了,其实这不是RxLife
的问题,使用鼎鼎大名的trello/RxLifecycle
库也是同样的,由于RxJava的设计就是如此,上游拿不到下游的Disposable
对象,因此,咱们在使用RxLife
时,必定要注意在lift
或者compose
操做符的下游,除了subscribe
操做符外最好不要有其它的操做符,这一点必定须要注意。
RxLife最新版本已经使用as操做符规避这个问题,详情查看Android RxLife 一款轻量级别的RxJava生命周期管理库(二)
RxLife类里面的life、compose系列方法,皆适用于Flowable、Observable、Single、Maybe、Completable这5个被观察者对象,道理都同样,这里不在一一讲解。
Ok,RxLife的使用基本就介绍完了,到这咱们会发现,使用RxLife库,咱们只须要关注一个类便可,那便是RxLife类,api简单功能却强大。敢兴趣的同窗,能够去阅读RxLife
的源码,有疑问,请留言,我会在第一时间做答。
RxLife结合HttpSender发送请求,简直不要太爽。
HttpSender详情请点击HttpSender OkHttp+RxJava超好用、功能超级强大的Http请求框架