什么是RxJava? GitHub给出的介绍是:RxJava是ReactiveX(Reactive Extensions)的Java VM实现:用于经过使用可观察序列来编写异步和基于事件的程序的库。java
在个人理解中RxJava主要能够实现异步任务,和事件总线的功能,这也是RxJava的厉害之处。git
RxJava的GitHub地址:github
关于RxJava的使用详解,该篇文章会介绍更加详细,并且主要偏向于对RxJava的使用。数据结构
建立一个观察者,有两种方式实现:建立Observer和Subscriber。并发
Observer:app
Observer<String> observer = new Observer<String>() {
@Override
public void onError(Throwable e) {
Log.i("test", "onError");
}
@Override
public void onComplete() {
Log.i("test", "onComplete");
}
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("test", "onSubscribe");
}
@Override
public void onNext(String s) {
Log.i("test", "onNext----->" + s);
}
};
复制代码
Subscriber:异步
Subscriber subscriber = new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.i("test", "onSubscribe");
}
@Override
public void onNext(Object o) {
Log.i("test", "onNext");
}
@Override
public void onError(Throwable t) {
Log.i("test", "onError");
}
@Override
public void onComplete() {
Log.i("test", "onComplete");
}
}
复制代码
建立被观察者,而且须要和观察者订阅起来,在RxJava中的被观察者是Observable使用subscribe方法订阅。ide
建立被观察者有三种方式能够实现:函数
1)使用Observable.create建立
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("你好");
}
}).subscribe(observer);//订阅
复制代码
在建立方式须要在subscribe方法里,手动调用Observer的onNext、onError、onComplete方法,而onSubscribe方法是自动调用的。
2)Observable.just 可使用Observable.just方式来建立一个Observable
Observable.just("你好","hello world").subscribe(observer);
复制代码
使用Observable.just建立,而后subscribe订阅,这种方式会自动调用onSubscribe、onNext、onError和onComplete方法。
3)Observable.fromArray
使用Observable.fromArray来建立一个Observable对象。
String[] quotations = {"热爱祖国", "热爱人民"};
Observable.fromArray(quotations).subscribe(observer);
复制代码
使用Observable.fromArray建立,而后订阅,和Observable.just同样,会自动调用观察者的方法。
在上面咱们已经建立了一个观察,建立一个观察者,其中包括四个方法:onError、onComplete、onSubscribe和onNext。
那么这几个方法都表示什么呢?
onSubscribe:被观察者订阅观察者的时候,就会触发该方法。
onCompleted:事件队列完结。RxJava 不只把每一个事件单独处理,还会把它们看作一个队列。RxJava 规定,当不会再有新的 onNext 发出时,须要触发 onCompleted方法做为标志。
onError: 事件队列异常。在事件处理过程当中出异常时,onError方法会被触发,同时队列自动终止,不容许再有事件发出。
onNext:表示普通事件,能够在该方法作一些业务逻辑处理。
操做符包括普通的操做符、变换操做符、过滤操做符、组合操做符、辅助操做符、错误处理操做符、条件操做符、布尔操做符和转换操做符。
使用,如:
Observable.intervalRange(0,6,0,3,TimeUnit.SECONDS).create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("666");
}
}).subscribe(observer);
复制代码
intervalRange该操做符是用于延迟执行、而且按期执行。
map:指定一个Function对象,将Observable转换为一个新的Observable并发射。
Observable.just("你好","hello world").map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s;
}
}).subscribe(observer);
复制代码
flatMap、cast:
Observable.just("你好","hello world").flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull String s) throws Exception {
return Observable.just(s);
}
}).cast(String.class).subscribe(observer);
复制代码
flatMap将Observable发射的数据集合变换为Observable集合,而后将这些Observable发射的数据平坦地放进一个单独的Observable,而cast则强制将Observable发射的全部数据转换为指定类型。
buffer操做符功能:
一、能一次性集齐多个结果到列表中,订阅后自动清空相应结果,直到彻底清除
二、 也能够周期性的集齐多个结果到列表中,订阅后自动清空相应结果,直到彻底清除
Observable
.range(0,5)
.buffer(2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull List<Integer> integers) {
Log.i("test","----------------->onNext:" + integers);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
Observable
.just("你好","hello world","我爱我家")
.buffer(3)
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull List<String> strings) {
Log.i("test",""+strings);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
除了以上的变换操做符,还有groupBy操做符,进行分组操做。
过滤操做符包括filter、skip、take、element等等。
filter:对Observable产生的结果自定义规则进行过滤,只有知足条件的结果才提交给订阅者。
Observable
.just("你好","hello world","我爱我家")
.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
Log.i("test",""+s);
return s.equals("你好");
}
}).subscribe(observer);
复制代码
distinct:去重
Observable
.just("你好","hello world","我爱我家","我爱我家")
.distinct()
.subscribe(observer);
复制代码
skip:过滤掉前n项
Observable
.just("你好","hello world","我爱我家","我爱我家")
.skip(2)
.subscribe(observer);
复制代码
take:取前n项
Observable
.just("你好","hello world","我爱我家","我爱我家")
.take(2)
.subscribe(observer);
复制代码
throttleWithTimeout:若是在限定的时间内,源Observable有新的数据发射出来,该数据就会被丢弃,同时throttleWithTimeout从新开始计时,若是每次都是在计时结束前发射数据,那么这个限流就会走向极端(只会发射最后一个数据)
Observable
.just("你好","hello world","我爱我家","我爱我家")
.throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.subscribe(observer);
复制代码
组合操做符包括:merge、startWidth、concat、jion、switch和zip等等。
merge:将多个Observable合并到一个Observable中进行发射。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("test", "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i("test", "onNext--->" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i("test", "onError");
}
@Override
public void onComplete() {
Log.i("test", "onComplete");
}
};
Observable<String> observable1 = Observable.just("你好", "hello World");
Observable<String> observable2 = Observable.just("new obj", "mergeobj");
Observable.merge(observable1, observable2).subscribe(observer);
复制代码
concat:将多个Observable发射的数据合并发射,其具备严格的顺序,发射顺序具备队列的特色。前一个数据没有发射完成不会发射后一个数据。
Observable<String> observable1 = Observable.just("你好", "hello World");
Observable<String> observable2 = Observable.just("new obj", "mergeobj");
Observable.concat(observable1, observable2).subscribe(observer);
复制代码
除了以上的组合操做符,还有zip、combineLastest等。
zip:合并两个或者多个Obserable发射出的数据项,根据指定的函数变换它们,并发射一个新值。
辅助操做符包括DO、delay、observeOn、timeout、timeInterval、timestamp、subscribeOn、meterialize和to等。
delay:延迟执行发射数据
Observable<String> observable1 = Observable.just("你好", "hello World");
Observable<String> observable2 = Observable.just("new obj", "mergeobj");
Observable.concat(observable1, observable2).delay(5, TimeUnit.SECONDS).subscribe(observer);
复制代码
subscribeOn:指定Obserable自身在那个线程上运行。
observeOn:指定Obserable发射出的数据在那个线程运行。
其余的操做符读者能够自行实践。
在rxjava中,错误操做符包括catch和retry。
catch可以拦截原始Observable的onError通知,将它替换为其余数据项或者数据序列,让产生的Observable可以正常终止或者根本不终止。 catch实现分为三个不一样的操做符:
一、onErrorReturn:返回原有Observable行为的备用Observable, 备用的Observable忽略原有的Observable的onError调用,即不会将错误传递给观察者,而是发射一个特殊的项,以及调用观察者的onCompleted。
二、onErrorResumeNext:跟onErrorReturn同样返回备用的Observable,不会调用原有的Observable的onError,它会发射备用的Observable数据。
三、onExceptionResumeNext:若是onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
retry:不会将原有的Observable的onError通知传递给观察者,这会订阅这个Observable,再给它一次机会无错误地完成其数据序列,而它总会传递onNext通知给观察者。该操做符有可能形成数据重复,由于从新订阅。若是超过了从新订阅的次数,就不会再次订阅了,而是把最新的一个onError通知传递给观察者。
条件操做符包括:defaultEmpty、skipUntil、amb、skipWhile、takeUtil、takeWhile
defaultEmpty:若是原有的Observable没有发射数据,就发射一个默认的数据。
skipUntil:订阅原始的Observable,可是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。
布尔操做符包括:all、isEmpty、contains、exists和sequenceEqual。
关于条件操做符和布尔操做符,读者能够关注《RxJava操做符(08-条件和布尔操做) 》这篇文章,文章地址:
转换操做符可以将Observable转换为另外一个对象或者数据结构,其中转换操做符包括:toMap、toMultiMap、toList、toSortedList、nest和getIterator等。
toMap:将原始的Observable发射的全部数据项集合到一个Map中,而后发射该Map。
String s1 = "你好";
String s2 = "hello world";
String s3 = "lalala";
Observable.just(s1,s2,s3).toMap(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s;
}
}).subscribe(new SingleObserver<Map<String, String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull Map<String, String> stringStringMap) {
Log.i("test",""+stringStringMap);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
复制代码
toMultiMap:相似于toMap,不一样的地方在于map的value是一个集合。
toList:将发射的数据组成一个List。
String s1 = "你好";
String s2 = "hello world";
String s3 = "lalala";
Observable.just(s1,s2,s3).toList().subscribe(new SingleObserver<List<String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull List<String> strings) {
Log.i("test",""+strings);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
复制代码
关于其余操做符,读者能够参考《RxJava操做符大全》这篇文章。
前面讲解辅助操做符的时候,咱们知道使用subscribeOn能够指定Obserable自身在那个线程上运行。使用observeOn能够指定Obserable发射出的数据在那个线程运行。RxJava默认线程是在调用subcribe方法的线程上进行回调,可是若是想切换线程,就须要使用Scheduler。
在RxJava中内置了如下几个Scheduler:
一、Scheduler.immediate():运行在当前线程,是timeout、timestamp和timeInterval操做符的默认调度器。
二、Scheduler.io():I/O操做使用的Scheduler。
三、Scheduler.newThread():开启一个新的线程执行操做。
2和3的区别就是:2的内部实现了一个无数量上限的线程池,重用空闲的线程,所以2具备更高的效率。
四、Scheduler.trampoline():能够将任务经过trampoline方法加入队列,该调度器会按顺序处理队列的任务,是repeat和retry操做符的默认调度器。
五、Scheduler.computation():计算所使用的调度器,它具备固定的线程池,大小为cpu核数,注意不要将io操做放到computation中,不然io操做的等待时间会浪费cpu。该调度器是buffer、delay、sample、debounce、interval和skip的默认调度器。
六、AndroidSchedulers.mainThread():表示在主线程中运行,该调度器是RxAndroid提供的。
Observable.just("你好","hello world")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
复制代码
写到这里,关于RxJava的知识基本讲解完了,相信读者读完该文,也懂得使用RxJava了,接下来我更新RxJava结合Retrofix和OkHttp的使用,敬请关注,谢谢!