RxAndroid Tutorial
响应式编程(Reactive programming)不是一种API,而是一种新的很是有用的范式,而RxJava就是一套基于此思想的框架,在Android开发中咱们经过这个框架就能探索响应式的世界,同时结合另外一个库,RxAndroid,这是一个扩展库,更好的兼容了Android特性,好比主线程,UI事件等。
在这篇指南中,你将会学习到如下这些内容:html
从 the starter project for this tutorial 能够下载这篇文章中项目的全部代码, 能够直接在Android Studio中打开。
大部分的代码都在 CheeseActivity.java
这个类里面,继承于 BaseSearchActivity
;里面有一些基础方法:
showProgressBar(): 显示一个进度条
hideProgressBar(): 隐藏一个进度条
showResult(List<String> result): 显示一个列表数据
mCheeseSearchEngine: CheeseSearchEngine类的一个对象,内部有一个search方法,接收一个数据查询并返回一个匹配的列表list。
直接运行的话,跑出来是这样子,就是一个查询的界面:java
在建立第一个observable以前,先看一下响应式编程的理论 :]react
通常的程序是这样的,表达式只会计算一次,而后把赋值给变量android
int a = 2; int b = 3; int c = a * b; // c is 6 a = 10; // c is still 6
在a从新赋值后,前面的c并不会变化,而响应式编程会对值的变化作出响应。
有时候颇有可能你已经作过一些响应式编程,可是并无意识到这一点。
好比Excel中的表格,咱们能够在表格里面填上一些值,同时将某个格子的值设为一个表达式,就像下面这样git
设置这个表格里面 B1区域的值为2,B2区域的值为3,B3是一个表达式,B3 = B1* B2,当其中一个值改变的时候,这个观察者B3也会变化,如图把B1改为10,B3就会自动计算成30。github
RxJava使用的是观察者模式,其中有两个关键的接口:Observable 和 Observer,当Observable(被观察的对象)状态改变,全部subscribed(订阅)的Observer(观察者)会收到一个通知。
在Observable的接口中有一个方法 subscribe()
,这样Observer 能够调用来进行订阅。
一样,在Observer 接口中有三个方法,会被Observable 回调:编程
做为一个表现良好的Observable,发射0到多个数据时后面都会跟上一个completion 或是error的回调。
听起来有点复杂,可是一些例子能够很清晰的解释。小程序
一个网络请求observable 一般只发射一个数据而且马上completes。网络
每个圆表明了从observable 发射出去的item数据,黑色的block表明告终束或是错误。
一个鼠标的移动observable 将会不断的发送鼠标当前坐标,而且从不会结束。app
在一个observable 已经结束后不能再发射新的item数据,下面这个就是一个很差的示范,违反了Observable 的准则
在已经发信号结束后依然发射了一个item。
你能够直接经过 Observable.create()
建立一个Observable
Observable<T> create(ObservableOnSubscribe<T> source)
看起来十分的简洁,可是这段代码是什么意思呢?这个 “source” 又是什么? 想要理解这个,只须要知道 ObservableOnSubscribe
是什么。 这是一个接口,其中有一个方法:
public interface ObservableOnSubscribe<T> { void subscribe(ObservableEmitter<T> emitter) throws Exception; }
这个你建立Observable 时的一个“source” 须要暴露一个 subscribe()
方法,从这里又引出来另外一个 emitter(发射器),那么什么又是emitter?
RxJava中的 Emitter
接口和 Observer 比较类似,都有如下方法
public interface Emitter<T> { void onNext(T value); void onError(Throwable error); void onComplete(); }
ObservableEmitter
提供了一个方法用来取消订阅,用一个实际场景来形容一下。想象一个水龙头和水流,这个管道就至关于Observable,从里面能放出水,ObservableEmitter 就至关因而水龙头,控制开关,而水龙头链接到管道就是 Observable.create()。
举个例子省得前面描述太过于抽象,先来看看第一个例子
在 CheeseActivity
类中有这么一段代码
// 1 private Observable<String> createButtonClickObservable() { // 2 return Observable.create(new ObservableOnSubscribe<String>() { // 3 @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { // 4 mSearchButton.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { // 5 emitter.onNext(mQueryEditText.getText().toString()); } }); // 6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { // 7 mSearchButton.setOnClickListener(null); } }); } }); }
上面这段代码作了如下几件事情
Observable.create()
建立了一个observable ,并提供了一个ObservableOnSubscribe。subscribe()
方法。setCancellable()
方法。经过重写cancel()方法,而后当Observable 被处理的时候这个实现会被回调,好比已经结束或者是全部的观察者都解除了订阅。如今被观察者Observable 已经有了,还须要观察者来进行订阅,在此以前,咱们先看看另外一个接口, Consumer
,它能够十分简单的从emitter 接收到数据。
public interface Consumer<T> { void accept(T t) throws Exception; }
若是仅是想要简单的订阅一下Observable,这个接口是很方便的。
Observable 的接口方法 subscribe() 能够接收不少类型的参数,你能够订阅一个全参数的版本,只要你实现其中全部的方法就能够。若是只是想要接收一下发射的数据,可使用单一的 Consumer 的版本,这样只须要实现一个方法,并且也是 onNext
。
咱们能够直接在Activity的OnStart方法中来实现这个
@Override protected void onStart() { super.onStart(); // 1 Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 2 .subscribe(new Consumer<String>() { //3 @Override public void accept(String query) throws Exception { // 4 showResult(mCheeseSearchEngine.search(query)); } }); }
其中Consumer须要导的包是
import io.reactivex.functions.Consumer;
依次解释一下上面每一步
这样一个简单的实现也写完了,运行一下APP,跑出来的结果就像下面这样
虽然已经像模像样的写了一个小程序,但其实存在一些问题。当按钮按下去后这个UI线程实际上被阻塞住了
若是在控制台可能能够看到这样的提示
> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames! The application may be doing too much work on its main thread.
这是因为search 发生在主线程,若是是一个网络请求的话,Android会直接crash,抛出一个NetworkOnMainThreadException 的异常。若是不指定线程,那么RxJava的操做会一直在一个线程上。
经过 subscribeOn
和 observeOn
两个操做符能改变线程的执行状态。subscribeOn
在操做链上最好只调用一次,若是屡次调用,依然只有第一次生效subscribeOn
用来指定 observable 在哪一个线程上建立执行操做,若是想要经过observables 发射事件给Android的View,那么须要保证订阅者在Android的UI线程上执行操做。
另外一方面, observeOn
能够在链上调用屡次,它主要是用来指定下一个操做在哪个线程上执行,来个例子:
myObservable // observable will be subscribed on i/o thread .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(/* this will be called on main thread... */) .doOnNext(/* ...and everything below until next observeOn */) .observeOn(Schedulers.io()) .subscribe(/* this will be called on i/o thread */);
主要用到三种schedulers:
Schedulers.io(): 适合I/O类型的操做,好比网络请求,磁盘操做。
Schedulers.computation(): 适合计算任务,好比事件循环或者回调处理。
AndroidSchedulers.mainThread() : 回调主线程,好比UI操做。
map操做符经过运用一个方法把从一个observable 发射的数据再返回成另外一个observable给那些调用的。
好比你有一个observable称之为numbers,而且会发射一系列的值,以下所示
经过map操做符的apply方法
numbers.map(new Function<Integer, Integer>() { @Override public Integer apply(Integer number) throws Exception { return number * number; } }
而后结果就像下面这样
再来个实例,咱们用这个操做符可以把前面的代码拆分一下
@Override protected void onStart() { super.onStart(); Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 1 .observeOn(Schedulers.io()) // 2 .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) // 3 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { showResult(result); } }); }
简述一下代码,首先,指定下一次操做在I/O线程上,而后经过给的String,执行search返回一个结果列表,
再将线程从I/O上变动为主线程,showResult
,展现返回的数据。
为了用户体验,咱们须要一个进度条
这里能够引入 doOnNext
操做符,doOnNext
有一个 Consumer
,而且在每次observable 发射数据的时候都会被调用,再改一下前面的代码
@Override protected void onStart() { super.onStart(); Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 1 .observeOn(AndroidSchedulers.mainThread()) // 2 .doOnNext(new Consumer<String>() { @Override public void accept(String s) { showProgressBar(); } }) .observeOn(Schedulers.io()) .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { // 3 hideProgressBar(); showResult(result); } }); }
每次在点击按钮的时候就能收到一个事件
首先把线程切换到主线程,而后在 doOnNext
里面来显示进度条,再把线程切换到子线程,来进行请求数据,最后在切换回来关闭进度条,展现数据。RxJava很是适合这种需求,代码也很清晰。
把这个例子跑起来的效果就像下面这样,点击的时候就显示进度条:
除了经过点击按钮来搜索,更好的方式就是根据EditText的text内容变化自动的搜索。
首先,就须要对EditText的内容变化进行订阅观察,先看代码实例:
//1 private Observable<String> createTextChangeObservable() { //2 Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { //3 final TextWatcher watcher = new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) {} @Override public void afterTextChanged(Editable s) {} //4 @Override public void onTextChanged(CharSequence s, int start, int before, int count) { emitter.onNext(s.toString()); } }; //5 mQueryEditText.addTextChangedListener(watcher); //6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { mQueryEditText.removeTextChangedListener(watcher); } }); } }); // 7 return textChangeObservable; }
分析一下上面这几步代码:
Observable.create
建立一个textChangeObservable ,传入一个ObservableOnSubscribe 对象beforeTextChanged()
和 afterTextChanged()
,在onTextChanged 里面,把这个数据经过emitter.onNext 发射出去,这样订阅的观察者就能接收到实现了这个Observable后就能够把前面的给替换掉
Observable<String> searchTextObservable = createTextChangeObservable();
再跑一次程序,就能够边输入边搜索了
如今可能有一个需求是在输入长度比较短的时候不进行搜索,达到必定字符后才搜索,RxJava引入了一个 filter
操做符。
filter只会经过那些知足条件的item,filter经过一个 Predicate
,这个接口内部有一个 test
方法用来决定是否知足条件,最后会返回一个boolean 值。
这里,Predicate 拿到的是一个输入字符String,若是长度大于或等于2,就返回true,表示知足条件。
return textChangeObservable .filter(new Predicate<String>() { @Override public boolean test(String query) throws Exception { return query.length() >= 2; } });
注意Predicate须要导的包是:
import io.reactivex.functions.Predicate;
再前面建立Observable的代码后面加一个 filter
后,当query的长度不足2时,那这个值就不会被发射出去,而后订阅的就收不到这个消息。
跑起来就像这样,只输一个数,返回false,不会触发搜索。
再输一个字符就经过了filter的过滤。
有时咱们对于EditText内容频繁变化的场景并不想每次变化都去新发送一个请求,因此,这里又引入了一个新的操做符 debounce
,意思就是防抖动,这个和filter比较相似,也是一种拦截的策略。
这个操做符是根据item被发射的时间来进行过滤。每次在一个item被发射后,debounce 会等待一段指定长度的时间,而后才去发射下一个item。
若是在这段时间内都没有一个item发生,那么上一个最后的item会被发射出去,这样能保证起码有一个item能被发射成功。
从图里看到,2,3,4,5触发的时间很是的接近,因此这一段时间内前三个都被过滤了,只留下了5。
在前面的 createTextChangeObservable()
中,咱们再添加一个 debounce
操做符在 filter
的后面
return textChangeObservable .filter(new Predicate<String>() { @Override public boolean test(String query) throws Exception { return query.length() >= 2; } }).debounce(1000, TimeUnit.MILLISECONDS); // add this line
再跑一下APP,能够看到中间阶段直接省略了,最后搜索了一下结果值
一开始咱们实现了一个observable 是监听点击按钮的事件,而后又实现了一个observable 是监听EditText的内容变化,那么怎么把这两个合二为一呢。
RxJava提供了不少的操做符来联合observables,可是其中最有用和简单的就是 merge
。merge
能够将两个或更多的observable 联合起来,合成一个单一的observable。
这里咱们把前面两个observable 绑定起来
Observable<String> buttonClickStream = createButtonClickObservable(); Observable<String> textChangeStream = createTextChangeObservable(); Observable<String> searchTextObservable = Observable.merge(textChangeStream, buttonClickStream);
如今的效果就是前面的两种效果的结合体,不管是自动搜索仍是手动搜索都是能够触发的。
前面咱们实现过 setCancellable
方法,这个方法会在解除订阅的时候回调。Observable.subscribe()
会返回一个Disposable,Disposable是一个接口,其中有两个方法:
public interface Disposable { void dispose(); // ends a subscription boolean isDisposed(); // returns true if resource is disposed (unsubscribed) }
咱们先在 CheeseActivity
中定义一个Disposable
private Disposable mDisposable;
在 onStart()
中,把 subscribe()
的返回值赋给mDisposable
mDisposable = searchTextObservable // change this line .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<String>() { @Override public void accept(String s) { showProgressBar(); } }) .observeOn(Schedulers.io()) .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { hideProgressBar(); showResult(result); } });
最后咱们就能在 onStop()
中去解除这个订阅,代码以下:
@Override protected void onStop() { super.onStop(); if (!mDisposable.isDisposed()) { mDisposable.dispose(); } }
这样就解除了订阅。
你能够下载这篇文章中的代码程序,下载地址
固然这篇文章只是讲到了RxJava世界的一小点,好比,JakeWharton大神的库 RxBinding ,这个库里面包括大量的Android View的API,你能够经过调用 RxView.clicks(viewVariable)
来建立一个点击事件observable 。
除此以外,学习更多有关RxJava的知识,能够看 官方文档。
做者:sheepm连接:http://www.jianshu.com/p/031745744bfa來源:简书著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。