a library for composing asynchronous and event-based programs using observable sequences for the Java VM
解释:一个对于构成使用的Java虚拟机观察序列异步
和基于事件的程序库java
RxJava 是一个响应式编程框架,采用观察者设计模式。因此天然少不了 Observable 和 Subscriber 这两个东东了。
RxJava 是一个开源项目,地址:https://github.com/ReactiveX/RxJava
RxAndroid,用于 Android 开发,添加了 Android 用的接口。地址: https://github.com/ReactiveX/RxAndroidandroid
网上关于
RxJava
的博文也有不少,我也看过许多,其中不乏有优秀的文章,但绝大部分文章都有一个共同点,就是侧重于讲RxJava中各类强大的操做符,而忽略了最基本的东西——概念,因此一开始我也看的一脸懵逼,看到后面又忘了前面的,脑子里全是问号,这个是什么,那个又是什么,这两个长得怎么那么像。举个不太恰当的例子,概念之于初学者,就像食物之于人,当你饿了,你会想吃面包、牛奶,那你为何不去吃土呢,由于你知道面包牛奶是用来干吗的,土是用来干吗的。同理,前面已经说过,RxJava
无非是发送数据与接收数据,那么什么是发射源,什么是接收源,这就是你应该明确的事,也是RxJava
的入门条件之一,下面就依我我的理解,对发射源和接收源作个归类,以及RxJava
中频繁出现的几个“单词”解释一通;git
Observable
:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;github
Observer
:接收源,英文释义“观察者”,没错!就是观察者模式中的“观察者”,可接收Observable
、Subject
发射的数据;web
Subject
:Subject
是一个比较特殊的对象,既可充当发射源,也可充当接收源,为避免初学者被混淆,本章将不对Subject
作过多的解释和使用,重点放在Observable
和Observer
上,先把最基本方法的使用学会,后面再学其余的都不是什么问题;数据库
Subscriber
:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber
实现了Observer
接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你再也不想接收数据了,能够调用unsubscribe( )
方法中止接收,Observer 在 subscribe() 过程当中,最终也会被转换成 Subscriber 对象,通常状况下,建议使用Subscriber做为接收源;编程
Subscription
:Observable
调用subscribe( )
方法返回的对象,一样有unsubscribe( )
方法,能够用来取消订阅事件;设计模式
Action0
:RxJava
中的一个接口,它只有一个无参call()
方法,且无返回值,一样还有Action1,Action2…Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推;api
Func0
:与Action0很是类似,也有call()方法,可是它是有返回值的,一样也有Func0、Func1…Func9;网络
RxJava最核心的两个东西是Observables
(被观察者,事件源)和Subscribers
(观察者)。Observables
发出一系列事件,Subscribers
处理这些事件。这里的事件能够是任何你感兴趣的东西(触摸事件,web接口调用返回的数据…)
一个Observable
能够发出零个或者多个事件,知道结束或者出错。每发出一个事件,就会调用它的Subscriber
的onNext
方法,最后调用Subscriber.onNext()
或者Subscriber.onError()
结束。
Rxjava
的看起来很想设计模式中的观察者模式
,可是有一点明显不一样,那就是若是一个Observerble
没有任何的的Subscriber
,那么这个Observable
是不会发出任何事件的。
使用create( )
,最基本的建立方式:
1 Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() { 2 @Override 3 public void call(Subscriber<? super String> subscriber) { 4 subscriber.onNext("Hello, world!"); //发射一个"Hello, world!"的String 5 subscriber.onCompleted();//发射完成,这种方法须要手动调用onCompleted,才会回调Observer的onCompleted方法 6 }});
能够看到,这里传入了一个 OnSubscribe
对象做为参数。OnSubscribe
会被存储在返回的 Observable
对象中,它的做用至关于一个计划表,当 Observable
被订阅的时候,OnSubscribe
的 call()
方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber
将会被调用一次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式
。
这个例子很简单:事件的内容是字符串,而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式同样是待肯定的(例如网络请求的结果在请求返回以前是未知的);全部事件在一瞬间被所有发送出去,而不是夹杂一些肯定或不肯定的时间间隔或者通过某种触发器来触发的。总之,这个例子看起来毫无实用价值。但这是为了便于说明,实质上只要你想,各类各样的事件发送规则你均可以本身来写。至于具体怎么作,后面都会讲到,但如今不行。只有把基础原理先说明白了,上层的运用才能更容易说清楚。
上面定义的Observable
对象仅仅发出一个Hello World字符串,而后就结束了。接着咱们建立一个Subscriber
来处理Observable
对象发出的字符串:
1 Subscriber<String> mySubscriber = new Subscriber<String>() { 2 @Override 3 public void onNext(String s) { 4 System.out.println(s); //打印出"Hello, world!" 5 } 6 7 @Override 8 public void onCompleted() { } 9 10 @Override 11 public void onError(Throwable e) { } 12 };
除了 Observer
接口以外,RxJava 还内置了一个实现了 Observer
的抽象类:Subscriber
。 Subscriber
对 Observer
接口进行了一些扩展,但他们的基本使用方式是彻底同样的:
1 Observer<String> myObserver = new Observer<String>() { 2 @Override 3 public void onNext(String s) { 4 System.out.println(s); //打印出"Hello, world!" 5 } 6 7 @Override 8 public void onCompleted() { } 9 10 @Override 11 public void onError(Throwable e) { } 12 };
不只基本使用方式同样,实质上,在 RxJava
的 subscribe
过程当中,Observer
也老是会先被转换成一个 Subscriber
再使用。因此若是你只想使用基本功能,选择 Observer
和 Subscriber
是彻底同样
的。它们的区别对于使用者来讲主要有两点:
onStart()
: 这是 Subscriber
增长的方法。它会在 subscribe 刚开始,而事件还未发送以前被调用,能够用于作一些准备工做,例如数据的清零或重置。这是一个可选方法,默认状况下它的实现为空。须要注意的是,若是对准备工做的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),onStart()
就不适用了,由于它老是在 subscribe
所发生的线程被调用,而不能指定线程。要在指定的线程来作准备工做,可使用 doOnSubscribe()
方法,具体能够在后面的文中看到。
unsubscribe()
: 这是 Subscriber
所实现的另外一个接口 Subscription
的方法,用于取消订阅。在这个方法被调用后,Subscriber
将再也不接收事件。通常在这个方法调用前,可使用 isUnsubscribed()
先判断一下状态。 unsubscribe()
这个方法很重要,由于在 subscribe() 以后, Observable
会持有 Subscriber
的引用,这个引用若是不能及时被释放,将有内存泄露的风险。因此最好保持一个原则:要在再也不使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以免内存泄露的发生。
这里subscriber
仅仅就是打印observable
发出的字符串。经过subscribe
函数就能够将咱们定义的myObservable
对象和mySubscriber
对象关联起来,这样就完成了subscriber
对observable
的订阅。
1 myObservable.subscribe(myObserver); 2 // 或者: 3 myObservable.subscribe(mySubscriber);
一旦mySubscriber
订阅了myObservable
,myObservable
就是调用mySubscriber
对象的onNext
和onComplete
方法,mySubscriber
就会打印出Hello World!
当调用Observable.subscribe()
,会返回一个Subscription
对象。这个对象表明了被观察者和订阅者之间的联系。
1 Subscription subscription = Observable.just("Hello, World!") 2 .subscribe(s -> System.out.println(s));
你能够在后面使用这个Subscription
对象来操做被观察者和订阅者之间的联系.
1 subscription.unsubscribe();//接触订阅关系 2 System.out.println("Unsubscribed=" + subscription.isUnsubscribed()); 3 // Outputs "Unsubscribed=true"
RxJava的另一个好处就是它处理unsubscribing
的时候,会中止整个调用链。若是你使用了一串很复杂的操做符,调用unsubscribe
将会在他当前执行的地方终止。不须要作任何额外的工做!
Observable
:是否是以为仅仅为了打印一个hello world要写这么多代码太啰嗦?我这里主要是为了展现RxJava背后的原理而采用了这种比较啰嗦的写法,RxJava其实提供了不少便捷的函数来帮助咱们减小代码。
首先来看看如何简化Observable
对象的建立过程。RxJava内置了不少简化建立Observable
对象的函数,好比Observable.just
就是用来建立只发出一个事件就结束的Observable
对象,上面建立Observable
对象的代码能够简化为一行:
1 Observable<String> myObservable = Observable.just("Hello, world!"); //发送"Hello, world!"
其余方法:
1.使用just( ),将为你建立一个Observable并自动为你调用onNext( )发射数据:
1 justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"
2.使用from( ),遍历集合,发送每一个item:
1 List<String> list = new ArrayList<>(); 2 list.add("from1"); 3 list.add("from2"); 4 list.add("from3"); 5 fromObservable = Observable.from(list); //遍历list 每次发送一个 6 /** 注意,just()方法也能够传list,可是发送的是整个list对象,而from()发送的是list的一个item** /
3.使用defer( ),有观察者订阅时才建立Observable,而且为每一个观察者建立一个新的Observable:
1 deferObservable = Observable.defer(new Func0<Observable<String>>() { 2 @Override 3 //注意此处的call方法没有Subscriber参数 4 public Observable<String> call() { 5 return Observable.just("deferObservable"); 6 }});
4.使用interval( ),建立一个按固定时间间隔发射整数序列的Observable,可用做定时器:
1 intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次
5.使用range( ),建立一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,若是为0则不发送,负数则抛异常:
1 rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14
6.使用timer( ),建立一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法:
1 timeObservable = Observable.timer(3, TimeUnit.SECONDS); //3秒后发射一个值
7.使用repeat( ),建立一个重复发射特定数据的Observable:
1 repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次
Subscriber
:接下来看看如何简化Subscriber,上面的例子中,咱们其实并不关心OnComplete和OnError,咱们只须要在onNext的时候作一些处理,这时候就可使用Action1类。
1 Action1<String> onNextAction = new Action1<String>() { 2 @Override 3 public void call(String s) { 4 System.out.println(s); 5 } 6 };
subscribe
方法有一个重载版本,接受三个Action1类型的参数,分别对应OnNext,OnComplete, OnError函数:
1 myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);
这里咱们并不关心onError和onComplete,因此只须要第一个参数就能够
1 myObservable.subscribe(onNextAction); 2 // Outputs "Hello, world!"
上面的代码最终能够写成这样:
1 Observable.just("Hello, world!") 2 .subscribe(new Action1<String>() { 3 @Override 4 public void call(String s) { 5 System.out.println(s); 6 } 7 });
使用java8的lambda可使代码更简洁:
不熟悉Lambda的能够看我以前写的:Java8之Lambda表达式(Android用法)
1 Observable.just("Hello, world!") 2 .subscribe(s -> System.out.println(s));
简单解释一下这段代码中出现的 Action1
和 Action0
。 Action0
是 RxJava
的一个接口,它只有一个方法 call()
,这个方法是无参无返回值的;因为 onCompleted()
方法也是无参无返回值的,所以 Action0
能够被当成一个包装对象,将 onCompleted()
的内容打包起来将本身做为一个参数传入 subscribe()
以实现不完整定义的回调。这样其实也能够看作将onCompleted()
方法做为参数传进了 subscribe(),至关于其余某些语言中的『闭包』。 Action1
也是一个接口,它一样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0
同理,因为 onNext(T obj)
和 onError(Throwable error)
也是单参数无返回值的,所以 Action1 能够将 onNext(obj) 和 onError(error)
打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最普遍,但 RxJava
是提供了多个 ActionX
形式的接口 (例如 Action2, Action3) 的,它们能够被用以包装不一样的无返回值的方法。
注:正如前面所提到的,Observer
和 Subscriber
具备相同的角色,并且 Observer
在 subscribe()
过程当中最终会被转换成 Subscriber
对象,所以,从这里开始,后面的描述我将用 Subscriber
来代替 Observer
,这样更加严谨。
操做符就是为了解决对Observable对象的 变换
(关键词) 的问题,操做符用于在Observable和最终的Subscriber之间修改Observable发出的事件。RxJava提供了不少颇有用的操做符。 好比map操做符,就是用来把把一个事件转换为另外一个事件的。
1 Observable.just("images/logo.png") // 输入类型 String 2 .map(new Func1<String, Bitmap>() { 3 @Override 4 public Bitmap call(String filePath) { // 参数类型 String 5 return getBitmapFromPath(filePath); // 返回类型 Bitmap 6 } 7 }) 8 .subscribe(new Action1<Bitmap>() { 9 @Override 10 public void call(Bitmap bitmap) { // 参数类型 Bitmap 11 showBitmap(bitmap); 12 } 13 });
使用lambda
能够简化为:
1 Observable.just("images/logo.png") // 输入类型 String 2 .map( 3 filePath -> getBitmapFromPath(filePath); // 返回类型 Bitmap 4 ) 5 .subscribe( 6 bitmap -> showBitmap(bitmap); 7 );
能够看到,map()
方法将参数中的 String
对象转换成一个 Bitmap
对象后返回,而在通过 map()
方法后,事件的参数类型也由 String
转为了 Bitmap
。这种直接变换对象并返回的,是最多见的也最容易理解的变换。不过 RxJava
的变换远不止这样,它不只能够针对事件对象,还能够针对整个事件队列,这使得 RxJava
变得很是灵活。
map()
操做符进阶:
1 Observable.just("Hello, world!") 2 .map(s -> s.hashCode()) 3 .map(i -> Integer.toString(i)) 4 .subscribe(s -> System.out.println(s));
是否是很酷?map()
操做符就是用于变换Observable
对象的,map操做符返回一个Observable
对象,这样就能够实现链式调用
,在一个Observable对象上屡次
使用map操做符,最终将最简洁的数据传递给Subscriber
对象。
假设我有这样一个方法: 这个方法根据输入的字符串返回一个网站的url列表
1 Observable<List<String>> query(String text);
Observable.flatMap()接收一个Observable的输出做为输入,同时输出另一个Observable。直接看代码:
1 query("Hello, world!") 2 .flatMap(new Func1<List<String>, Observable<String>>() { 3 @Override 4 public Observable<String> call(List<String> urls) { 5 return Observable.from(urls); 6 } 7 }) 8 .subscribe(url -> System.out.println(url));
这里我贴出了整个的函数代码,以方便你了解发生了什么,使用lambda能够大大简化代码长度:
1 query("Hello, world!") 2 .flatMap(urls -> Observable.from(urls)) 3 .subscribe(url -> System.out.println(url));
flatMap()
是否是看起来很奇怪?为何它要返回另一个Observable
呢?理解flatMap
的关键点在于,flatMap
输出的新的Observable
正是咱们在Subscriber
想要接收的。如今Subscriber
再也不收到List<String>
,而是收到一些列单个
的字符串,就像Observable.from()
的输出同样。
flatMap()
和map()
有一个相同点:它也是把传入的参数转化以后返回另外一个对象。但须要注意,和 map()
不一样的是, flatMap()
中返回的是个 Observable
对象,而且这个 Observable
对象并非被直接发送到了 Subscriber
的回调方法中。flatMap()
的原理是这样的:
Observable
对象;Observable
, 而是将它激活,因而它开始发送事件;Observable
发送的事件,都被汇入同一个 Observable
,而这个 Observable
负责将这些事件统一交给 Subscriber
的回调方法。这三个步骤,把事件拆成了两级,经过一组新建立的 Observable
将初始的对象『铺平』
以后经过统一路径分发了下去。而这个『铺平』
就是 flatMap()
所谓的 flat
。值得注意的是.from()
是Observable建立时候用的,.flatMap()
才是操做符;
目前为止,咱们已经接触了两个操做符,RxJava中还有更多的操做符,那么咱们如何使用其余的操做符来改进咱们的代码呢?
更多RxJava的操做符请查看:RxJava操做符大全
getTitle()
返回null若是url不存在。咱们不想输出”null”,那么咱们能够从返回的title列表中过滤掉null值!
1 query("Hello, world!") 2 .flatMap(urls -> Observable.from(urls)) 3 .flatMap(url -> getTitle(url)) 4 .filter(title -> title != null) 5 .subscribe(title -> System.out.println(title));
filter()
输出和输入相同的元素,而且会过滤掉那些不知足检查条件的。
若是咱们只想要最多5个结果:
1 query("Hello, world!") 2 .flatMap(urls -> Observable.from(urls)) 3 .flatMap(url -> getTitle(url)) 4 .filter(title -> title != null) 5 .take(5) 6 .subscribe(title -> System.out.println(title));
take()
输出最多指定数量的结果。
若是咱们想在打印以前,把每一个标题保存到磁盘:
1 query("Hello, world!") 2 .flatMap(urls -> Observable.from(urls)) 3 .flatMap(url -> getTitle(url)) 4 .filter(title -> title != null) 5 .take(5) 6 .doOnNext(title -> saveTitle(title)) 7 .subscribe(title -> System.out.println(title));
doOnNext()
容许咱们在每次输出一个元素以前作一些额外的事情,好比这里的保存标题。
看到这里操做数据流是多么简单了么。你能够添加任意多的操做,而且不会搞乱你的代码。
RxJava
包含了大量的操做符。操做符的数量是有点吓人,可是很值得你去挨个看一下,这样你能够知道有哪些操做符可使用。弄懂这些操做符可能会花一些时间,可是一旦弄懂了,你就彻底掌握了RxJava
的威力。
感受如何?
好吧,你是一个怀疑主义者,而且还很难被说服,那为何你要关心这些操做符呢?
由于操做符可让你对数据流作任何操做。
将一系列的操做符连接起来就能够完成复杂的逻辑。代码被分解成一系列能够组合的片断。这就是响应式函数编程
的魅力。用的越多,就会越多的改变你的编程思惟。
假设你编写的Android app
须要从网络请求数据。网络请求须要花费较长的时间,所以你打算在另一个线程中加载数据。那么问题来了!
编写多线程的Android
应用程序是很难的,由于你必须确保代码在正确的线程中运行,不然的话可能会致使app
崩溃。最多见的就是在非主线程更新UI。
在不指定线程的状况下, RxJava
遵循的是线程不变的原则,即:在哪一个线程调用 subscribe()
,就在哪一个线程生产事件;在哪一个线程生产事件,就在哪一个线程消费事件。若是须要切换线程,就须要用到 Scheduler
(调度器)。
使用RxJava,你可使用subscribeOn()
指定观察者代码运行的线程,使用observerOn()
指定订阅者运行的线程
在RxJava 中,Scheduler
——调度器,至关于线程控制器,RxJava
经过它来指定每一段代码应该运行在什么样的线程。RxJava
已经内置了几个 Scheduler
,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,至关于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 老是启用新线程,并在新线程执行操做。
Schedulers.io(): I/O 操做(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差很少,区别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空闲的线程,所以多数状况下 io() 比 newThread() 更有效率。不要把计算工做放在 io() 中,能够避免建立没必要要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操做将在 Android 主线程运行。
有了以上这几个 Scheduler
,就可使用 subscribeOn()
和 observeOn()
两个方法来对线程进行控制了。
subscribeOn(): 指定 subscribe()
所发生的线程,即 Observable.OnSubscribe
被激活时所处的线程。或者叫作事件产生的线程。
observeOn(): 指定 Subscriber
所运行在的线程。或者叫作事件消费的线程。
注意:observeOn()
指定的是 Subscriber
的线程,而这个 Subscriber
并不必定是 subscribe()
参数中的 Subscriber
(这块参考RxJava变换部分),而是 observeOn()
执行时的当前 Observable
所对应的 Subscriber
,即它的直接下级 Subscriber
。
换句话说,observeOn()
指定的是它以后的操做所在的线程。所以若是有屡次切换线程的需求,只要在每一个想要切换线程的位置调用一次 observeOn()
便可。
代码示例:
1 Observable.just(1, 2, 3, 4) 2 .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 3 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 4 .subscribe(new Action1<Integer>() { 5 @Override 6 public void call(Integer number) { 7 Log.d(tag, "number:" + number); 8 } 9 });
上面这段代码中,因为 subscribeOn(Schedulers.io())
的指定,被建立的事件的内容 一、二、三、4 将会在 IO
线程发出; 而因为 observeOn(AndroidScheculers.mainThread())
的指定,所以 subscriber
数字的打印将发生在主线程
。 事实上,这种在 subscribe()
以前写上两句subscribeOn(Scheduler.io())
和 observeOn(AndroidSchedulers.mainThread())
的使用方式很是常见,它适用于多数的 『后台线程取数据,主线程显示』
的程序策略。
下面的实例,在Observable.OnSubscribe
的call()
中模拟了长时间获取数据过程,在Subscriber
的noNext()
中显示数据到UI。
1 Observable.create(new Observable.OnSubscribe<String>() { 2 @Override 3 public void call(Subscriber<? super String> subscriber) { 4 subscriber.onNext("info1"); 5 6 SystemClock.sleep(2000); 7 subscriber.onNext("info2-sleep 2s"); 8 9 SystemClock.sleep(3000); 10 subscriber.onNext("info2-sleep 3s"); 11 12 SystemClock.sleep(5000); 13 subscriber.onCompleted(); 14 } 15 }) 16 .subscribeOn(Schedulers.io()) //指定 subscribe() 发生在 IO 线程 17 .observeOn(AndroidSchedulers.mainThread()) //指定 Subscriber 的回调发生在主线程 18 .subscribe(new Subscriber<String>() { 19 @Override 20 public void onCompleted() { 21 Log.v(TAG, "onCompleted()"); 22 } 23 24 @Override 25 public void onError(Throwable e) { 26 Log.v(TAG, "onError() e=" + e); 27 } 28 29 @Override 30 public void onNext(String s) { 31 showInfo(s); //UI view显示数据 32 } 33 });
至此,咱们能够看到call()
将会发生在 IO 线程,而showInfo(s)
则被设定在了主线程。这就意味着,即便加载call()
耗费了几十甚至几百毫秒的时间,也不会形成丝毫界面的卡顿。
值得注意:subscribeOn ()
与 observeOn()
都会返回了一个新的Observable
,所以若不是采用上面这种直接流方式,而是分步调用方式,须要将新返回的Observable
赋给原来的Observable
,不然线程调度将不会起做用。
使用下面方式,最后发现“OnSubscribe”仍是在默认线程中运行;缘由是subscribeOn这类操做后,返回的是一个新的Observable。
1 observable.subscribeOn(Schedulers.io()); 2 observable.observeOn(AndroidSchedulers.mainThread()); 3 observable .subscribe(subscribe);
能够修改成下面两种方式:
1 observable = observable.subscribeOn(Schedulers.io()); 2 observable = observable.observeOn(AndroidSchedulers.mainThread()); 3 observable .subscribe(subscribe); 4 //OR 5 observable.subscribeOn(Schedulers.io()) 6 .observeOn(AndroidSchedulers.mainThread()) 7 .subscribe(subscribe);
前面讲到了,能够利用 subscribeOn()
结合 observeOn()
来实现线程控制,让事件的产生和消费发生在不一样的线程。但是在了解了 map()
flatMap()
等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几回线程?
答案是:能。 由于 observeOn()
指定的是 Subscriber
的线程,而这个 Subscriber
并非(严格说应该为『不必定是』,但这里不妨理解为『不是』)subscribe()
参数中的 Subscriber
,而是 observeOn()
执行时的当前 Observable
所对应的 Subscriber
,即它的直接下级 Subscriber
。换句话说,observeOn()
指定的是它以后的操做所在的线程。所以若是有屡次切换线程的需求,只要在每一个想要切换线程的位置调用一次 observeOn()
便可。上代码:
1 Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 2 .subscribeOn(Schedulers.io()) 3 .observeOn(Schedulers.newThread()) 4 .map(mapOperator) // 新线程,由 observeOn() 指定 5 .observeOn(Schedulers.io()) 6 .map(mapOperator2) // IO 线程,由 observeOn() 指定 7 .observeOn(AndroidSchedulers.mainThread) 8 .subscribe(subscriber); // Android 主线程,由 observeOn() 指定
如上,经过 observeOn() 的屡次调用,程序实现了线程的屡次切换。
不过,不一样于 observeOn()
, subscribeOn()
的位置放在哪里均可以,但它是只能调用一次的。
又有好事的(其实仍是当初的我)问了:若是我非要调用屡次 subscribeOn()
呢?会有什么效果?
这个问题先放着,咱们仍是从 RxJava
线程控制的原理提及吧。
其实, subscribeOn()
和 observeOn()
的内部实现,也是用的 lift()
。具体看图(不一样颜色的箭头表示不一样的线程):
subscribeOn()
原理图:
observeOn()
原理图:
从图中能够看出,subscribeOn()
和 observeOn()
都作了线程切换的工做(图中的 “schedule…” 部位)。不一样的是, subscribeOn()
的线程切换发生在 OnSubscribe
中,即在它通知上一级 OnSubscribe
时,这时事件尚未开始发送,所以 subscribeOn()
的线程控制能够从事件发出的开端就形成影响;而 observeOn()
的线程切换则发生在它内建的 Subscriber
中,即发生在它即将给下一级 Subscriber
发送事件时,所以 observeOn()
控制的是它后面的线程。
最后,我用一张图来解释当多个 subscribeOn()
和 observeOn()
混合使用时,线程调度是怎么发生的(因为图中对象较多,相对于上面的图对结构作了一些简化调整):
图中共有 5 处含有对事件的操做。由图中能够看出,①和②两处受第一个 subscribeOn()
影响,运行在红色线程;③和④处受第一个 observeOn()
的影响,运行在绿色线程;⑤处受第二个 onserveOn()
影响,运行在紫色线程;而第二个 subscribeOn()
,因为在通知过程当中线程就被第一个 subscribeOn()
截断,所以对整个流程并无任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn()
的时候,只有第一个 subscribeOn()
起做用。
doOnSubscribe()
通常用于执行一些初始化操做.
然而,虽然超过一个的 subscribeOn()
对事件处理的流程没有影响,但在流程以前倒是能够利用的。
在前面讲 Subscriber
的时候,提到过 Subscriber
的 onStart()
能够用做流程开始前的初始化。然而 onStart()
因为在 subscribe()
发生时就被调用了,所以不能指定线程,而是只能执行在 subscribe()
被调用时的线程。这就致使若是 onStart()
中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar
,这必须在主线程执行),将会有线程非法的风险,由于有时你没法预测 subscribe()
将会在什么线程执行。
而与 Subscriber.onStart()
相对应的,有一个方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
一样是在 subscribe()
调用后并且在事件发送前执行,但区别在于它能够指定线程。默认状况下, doOnSubscribe()
执行在 subscribe()
发生的线程;而若是在 doOnSubscribe()
以后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
示例:
1 Observable.create(onSubscribe) 2 .subscribeOn(Schedulers.io()) 3 .doOnSubscribe(new Action0() { 4 @Override 5 public void call() { 6 progressBar.setVisibility(View.VISIBLE); // 须要在主线程执行 7 } 8 }) 9 .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 10 .observeOn(AndroidSchedulers.mainThread()) 11 .subscribe(subscriber);
如上,在 doOnSubscribe()
的后面跟一个 subscribeOn()
,就能指定准备工做的线程了。
Retrofit 是 Square 的一个著名的网络请求库。对于
Retrofit
不了解的同窗 能够参考我以前写的文章:全新的网络加载框架Retrofit2,上位的小三
Retrofit
除了提供了传统的 Callback
形式的 API
,还有 RxJava
版本的 Observable
形式 API
。下面我用对比的方式来介绍 Retrofit
的 RxJava
版 API
和传统版本的区别。
以获取一个 MovieEntity
对象的接口做为例子。使用Retrofit
的传统 API,你能够用这样的方式来定义请求:
1 @GET("top250") 2 Call<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//正常返回Call对象
咱们来写getMovie
方法的代码:
1 //进行网络请求 2 private void getMovie(){ 3 String baseUrl = "https://api.douban.com/v2/movie/"; 4 5 Retrofit retrofit = new Retrofit.Builder() 6 .baseUrl(baseUrl) 7 .addConverterFactory(GsonConverterFactory.create()) 8 .build(); 9 10 MovieService movieService = retrofit.create(MovieService.class); 11 Call<MovieEntity> call = movieService.getTopMovie(0, 10); 12 call.enqueue(new Callback<MovieEntity>() { 13 @Override 14 public void onResponse(Call<MovieEntity> call, Response<MovieEntity> response) { 15 resultTV.setText(response.body().toString()); 16 } 17 18 @Override 19 public void onFailure(Call<MovieEntity> call, Throwable t) { 20 resultTV.setText(t.getMessage()); 21 } 22 }); 23 }
以上为没有通过封装的、原生态的Retrofit
写网络请求的代码。
而使用 RxJava
形式的 API
,定义一样的请求是这样的:
1 @GET("top250") 2 Observable<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//RxJava返回Observable对象
Retrofit
自己对Rxjava
提供了支持,getMovie
方法改成:
1 //进行网络请求 2 private void getMovie(){ 3 String baseUrl = "https://api.douban.com/v2/movie/"; 4 5 Retrofit retrofit = new Retrofit.Builder() 6 .baseUrl(baseUrl) 7 .addConverterFactory(GsonConverterFactory.create()) 8 .addCallAdapterFactory(RxJavaCallAdapterFactory.create())//提供RXjava支持 9 .build(); 10 11 MovieService movieService = retrofit.create(MovieService.class); 12 13 movieService.getTopMovie(0, 10)//返回Observable对象 14 .subscribeOn(Schedulers.io()) 15 .observeOn(AndroidSchedulers.mainThread()) 16 .subscribe(new Subscriber<MovieEntity>() { 17 @Override 18 public void onCompleted() { 19 Toast.makeText(MainActivity.this, "Get Top Movie Completed", Toast.LENGTH_SHORT).show(); 20 } 21 22 @Override 23 public void onError(Throwable e) { 24 resultTV.setText(e.getMessage()); 25 } 26 27 @Override 28 public void onNext(MovieEntity movieEntity) { 29 resultTV.setText(movieEntity.toString()); 30 } 31 }); 32 }
这样基本上就完成了Retrofit
和Rxjava
的结合,你们能够本身进行封装;那么用上了RxJava
,咱们就能够用它强大的操做符
来对数据进行处理和操做,各位看官能够具体去实现,我在这里不作多作赘述。
参考文章:RxJava 与 Retrofit 结合的最佳实践
RxBinding
是 Jake Wharton
的一个开源库,它提供了一套在 Android
平台上的基于 RxJava
的 Binding API
。所谓 Binding
,就是相似设置 OnClickListener
、设置 TextWatcher
这样的注册绑定对象的 API
。
举个设置点击监听的例子。使用 RxBinding
,能够把事件监听用这样的方法来设置:
1 Button button = ...; 2 RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件 3 .subscribe(new Action1<ViewClickEvent>() { 4 @Override 5 public void call(ViewClickEvent event) { 6 // Click handling 7 } 8 });
看起来除了形式变了没什么区别,实质上也是这样。甚至若是你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener()
来实现的。然而,仅仅这一个形式的改变,却刚好就是 RxBinding
的目的:扩展性。经过 RxBinding
把点击监听转换成 Observable
以后,就有了对它进行扩展的可能。扩展的方式有不少,根据需求而定。一个例子是前面提到过的 throttleFirst()
操做符,用于去抖动,也就是消除手抖致使的快速连环点击:
1 RxView.clickEvents(button) 2 .throttleFirst(500, TimeUnit.MILLISECONDS) 3 .subscribe(clickAction);
若是想对 RxBinding
有更多了解,能够去它的 GitHub
项目 下面看看。
RxLifecycle
配合 Activity/Fragment
生命周期来管理订阅的。 因为 RxJava
Observable
订阅后(调用 subscribe
函数),通常会在后台线程执行一些操做(好比访问网络请求数据),当后台操做返回后,调用 Observer
的 onNext
等函数,而后在 更新 UI 状态。 可是后台线程请求是须要时间的,若是用户点击刷新按钮请求新的微博信息,在刷新尚未完成的时候,用户退出了当前界面返回前面的界面,这个时候刷新的 Observable
若是不取消订阅,则会致使以前的 Activity
没法被 JVM
回收致使内存泄露。 这就是 Android
里面的生命周期管理须要注意的地方,RxLifecycle
就是用来干这事的。好比下面的示例:
1 myObservable 2 .compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY)) 3 .subscribe();
这样Activity
在destroy
的时候就会自动取消这个observer
RxBus
并非一个库,而是一种模式。相信大多数开发者都使用过EventBus
或者Otto
,做为事件总线通讯库,若是你的项目已经加入RxJava
和EventBus
,不妨用RxBus
代替EventBus
,以减小库的依赖。RxJava
也能够轻松实现事件总线,由于它们都依据于观察者模式。
拓展连接: 用RxJava实现事件总线(Event Bus) [深刻RxBus]:支持Sticky事件
RxPermission
是基于RxJava
开发的用于帮助在Android 6.0
中处理运行时权限检测的框架。在Android 6.0
中,系统新增了部分权限的运行时动态获取。而再也不是在之前的版本中安装的时候授予权限。
拓展连接: 使用RxPermission框架对android6.0权限进行检测
简而言之Rxjava
是一个很牛逼的库,若是你的项目中尚未使用RxJava
的话,建议能够尝试去集成使用;对大多数人而已RxJava
是一个比较难上手的库了,不亚于Dagger
的上手难度;不过当你认识学习使用过了,你就会发现RxJava
的魅力所在;若是看一遍没有看懂的童鞋,建议多看几回;动手写写代码,我想信本文能够给到大家一些帮助;大家真正的体会到什么是 从入门到放弃再到不离不弃
;这就是RxJava
的魅力所在。
拓展阅读: