1、什么是RxJava
2、为何要用RxJava
3、RxJava使用详解
4、项目源码下载
5、参考文章java
Rx(Reactive Extensions)是一个库,用来处理事件和异步任务,在不少语言上都有实现,RxJava是Rx在Java上的实现。简单来讲,RxJava就是处理异步的一个库,最基本是基于观察者模式来实现的。经过Obserable和Observer的机制,实现所谓响应式的编程体验。react
好比说一个庞大的项目,一个事件传递的整个过程可能要经历不少方法,方法套方法,每一个方法的位置七零八落,一个个方法跳进去看,跳过去跳过来很容易把脑壳弄晕,不够直观。可是Rxjava能够把全部逻辑用链式加闭包的方式呈现,作了哪些操做,谁在前谁在后很是直观,逻辑清晰,维护就会很是轻松。就算不是你写的你也能够很快的了解,你能够把它看做一条河流,整个过程就是对里面的水流作进行加工。懂了这个特性咱们才知道在复杂的逻辑中运用Rxjava是多么的重要。
假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的做用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增长显示的图片。如今须要程序将一个给出的目录数组 File[] folders 中每一个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。须要注意的是,因为读取图片的这一过程较为耗时,须要放在后台执行,而图片的显示则必须在 UI 线程执行。经常使用的实现方式有多种,我这里贴出其中一种:android
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start();
而若是使用 RxJava ,实现方式是这样的:数据库
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } });
RxJava 的异步实现,是经过一种扩展的观察者模式来实现的。
观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,须要在 B 变化的一瞬间作出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察须要在小偷伸手做案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察须要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不一样,观察者不须要时刻盯着被观察者(例如 A 不须要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我须要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener
。对设置 OnClickListener
来讲, View
是被观察者, OnClickListener
是观察者,两者经过 setOnClickListener()
方法达成订阅关系。订阅以后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener
。采起这样被动的观察方式,既省去了反复检索状态的资源消耗,也可以获得最高的反馈速度。固然,这也得益于咱们能够随意定制本身程序中的观察者和被观察者,而警察叔叔明显没法要求小偷『你在做案的时候务必通知我』。编程
OnClickListener 的模式大体以下图:设计模式
RxJava 的观察者模式
RxJava 有四个基本概念:Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)、事件。Observable
和 Observer
经过 subscribe()
方法实现订阅关系,从而 Observable
能够在须要的时候发出事件来通知 Observer
。
与传统观察者模式不一样, RxJava 的事件回调方法除了普通事件 onNext()
(至关于 onClick()
/ onEvent()
)以外,还定义了两个特殊的事件:onCompleted()
和 onError()
。数组
onCompleted()
: 事件队列完结。RxJava 不只把每一个事件单独处理,还会把它们看作一个队列。RxJava 规定,当不会再有新的 onNext()
发出时,须要触发 onCompleted()
方法做为标志。onError()
: 事件队列异常。在事件处理过程当中出异常时,onError()
会被触发,同时队列自动终止,不容许再有事件发出。onCompleted()
和 onError()
有且只有一个,而且是事件序列中的最后一个。须要注意的是,onCompleted()
和 onError()
两者也是互斥的,即在队列中调用了其中一个,就不该该再调用另外一个。RxJava 的观察者模式大体以下图:bash
开始接入RxJava之间,添加依赖网络
dependencies {
compile 'io.reactivex:rxandroid:1.2.1' compile 'io.reactivex:rxjava:1.1.6' }
方式1:简单建立Rxjava闭包
/** * 简单建立Rxjava * * Observable是被观察者,建立后传入一个OnSubscribe对象,当Observable(观察者)调用subscribe进行注册观察者时,OnSubscribe的call方法会触发。 ObservableEmitter: Emitter 是发射器的意思,它能够发出三种类型的事件,与之对应的。 Observer有三个回调方法: onNext:接受到一个事件 onCompleted:接受完事件后调用,只会调用一次 onError :发生错误时调用,并中止接受事件,调用一次 注:onCompleted和onError不会同时调用,只会调用其中之一 */ public static void createOne() { //建立被观察者 Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("吴"); subscriber.onNext("晓畅"); subscriber.onCompleted(); } }); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println("Item: " + s); } ////事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,须要触发 onCompleted() 方法做为标志。 @Override public void onCompleted() { System.out.println("Completed!"); } ////事件队列异常。在事件处理过程当中出异常时,onError() 会被触发,同时队列自动终止,不容许再有事件发出。 @Override public void onError(Throwable e) { System.out.println("Error!"); } }; observable.subscribe(subscriber); }
运行结果以下所示:
方式2:just(T...): 将传入的参数依次发送出来
public static void createTwo() { //至关于 // 将会依次调用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted(); Observable observable = Observable.just("Hello", "wu", "xiaochang"); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println("Item: " + s); } ////事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,须要触发 onCompleted() 方法做为标志。 @Override public void onCompleted() { System.out.println("Completed!"); } ////事件队列异常。在事件处理过程当中出异常时,onError() 会被触发,同时队列自动终止,不容许再有事件发出。 @Override public void onError(Throwable e) { System.out.println("Error!"); } }; observable.subscribe(subscriber); }
运行结果以下所示:
方式3:将传入的数组或 Iterable 拆分红具体对象后,依次发送出来
public static void createThree() { String[] words = {"Hello", "wu", "xiaochang"}; //至关于 // 将会依次调用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted(); Observable observable = Observable.from(words); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println("Item: " + s); } ////事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,须要触发 onCompleted() 方法做为标志。 @Override public void onCompleted() { System.out.println("Completed!"); } ////事件队列异常。在事件处理过程当中出异常时,onError() 会被触发,同时队列自动终止,不容许再有事件发出。 @Override public void onError(Throwable e) { System.out.println("Error!"); } }; observable.subscribe(subscriber); }
运行结果以下图所示:
方式4:发送多种类型参数
/** *发送多种类型参数 */ public static void createFour() { //Just相似于From,可是From会将数组或Iterable的元素具取出而后逐个发射,而Just只是简单的原样发射,将数组或Iterable当作单个数据。 //Just接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable Observable justObservable = Observable.just(1, "someThing", false, 3.256f, "NewYork"); justObservable.subscribe(new Subscriber() { @Override public void onCompleted() { System.out.println("onCompleted!"); } @Override public void onError(Throwable e) { System.out.println(e.getMessage()); } @Override public void onNext(Object o) { System.out.println(o); } }); }
运行结果以下所示:
方式5:自定义Subscriber
/** * 自定义Subscriber */ public static void createFive() { Observable observable = Observable.just("Hello", "Hi", "Aloha"); Action1<String> onNextAction = new Action1<String>() { // onNext() @Override public void call(String s) { System.out.println(s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { // onError() @Override public void call(Throwable throwable) { // Error handling } }; Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { System.out.println("completed"); } }; // 自动建立 Subscriber ,并使用 onNextAction 来定义 onNext() observable.subscribe(onNextAction); // 自动建立 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自动建立 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction); }
运行结果以下图所示:
建立方式以下:
Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } }; //建立方式2 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d("MainActivity", "Item: " + s); } @Override public void onCompleted() { Log.d("MainActivity", "Completed!"); } @Override public void onError(Throwable e) { Log.d("MainActivity", "Error!"); } };
实质上,在 RxJava 的 subscribe 过程当中,Observer 也老是会先被转换成一个 Subscriber 再使用。因此若是你只想使用基本功能,选择 Observer 和 Subscriber 是彻底同样的。它们的区别对于使用者来讲主要有两点:
onStart(): 这是 Subscriber 增长的方法。它会在 subscribe 刚开始,而事件还未发送以前被调用,能够用于作一些准备工做,例如数据的清零或重置。这是一个可选方法,默认状况下它的实现为空。须要注意的是,若是对准备工做的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,由于它老是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来作准备工做,可使用 doOnSubscribe() 方法,具体能够在后面的文中看到。
unsubscribe(): 这是 Subscriber 所实现的另外一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将再也不接收事件。通常在这个方法调用前,可使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,由于在 subscribe() 以后, Observable 会持有 Subscriber 的引用,这个引用若是不能及时被释放,将有内存泄露的风险。因此最好保持一个原则:要在再也不使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以免内存泄露的发生。
在不指定线程的状况下, RxJava 遵循的是线程不变的原则,即:在哪一个线程调用 subscribe(),就在哪一个线程生产事件;在哪一个线程生产事件,就在哪一个线程消费事件。若是须要切换线程,就须要用到 Scheduler (调度器)。
在RxJava 中,Scheduler ——调度器,至关于线程控制器,RxJava 经过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,至关于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 老是启用新线程,并在新线程执行操做。
Schedulers.io(): I/O 操做(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差很少,区别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空闲的线程,所以多数状况下 io() 比 newThread() 更有效率。不要把计算工做放在 io() 中,能够避免建立没必要要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操做将在 Android 主线程运行。
有了这几个 Scheduler ,就可使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫作事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫作事件消费的线程。
public class RxJavaScheduler { public static void showScheduler() { Observable.just(1, 2, 3, 4) // .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 // .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { System.out.println("number:" + number); } }); } public static void main(String[] args) { showScheduler(); } }
连接:https://pan.baidu.com/s/1Na7DH_N2rf-pXEadmQxgUQ
密码:xvr2