Android进阶系列之第三方库知识点整理。java
知识点总结,整理也是学习的过程,若有错误,欢迎批评指出。web
第一篇:Rxjava2(一)、基础概念及使用
第二篇:Rxjava2(二)、五种观察者模式及背压数组
终于到操做符了,我以为rxjava2
如此好用,绝对少不了操做符的功劳,下面这张图你就简单的扫一眼,别慌,咱们慢慢啃。并发
上一篇讲了,rxjava
有五种观察者建立模式,其中Observable
和Flowable
差很少,只是Flowable
支持背压,而其它三种,都是简化版的Observable
,因此,本篇以Observable
方式来说操做符的使用。app
Observable
源码ide
Observable
是一个抽象类,继承ObservableSource
函数
ObservableSource
:post
这类操做符,建立直接返回Observable
学习
create
是最经常使用的一个操做符,该操做符的参数中产生的emitter
发射器,经过onNext
不断给下游发送数据,也能够发送onComplete
、onError
事件给下游。fetch
须要发送给下游的数据,就经过emitter.onNext()给下游发送。
当发送了
onComplete
或者onError
事件后,下游中止接收剩下的onNext
事件
示意图:
方法:
static <T> Observable<T> create(ObservableOnSubscribe<T> source)
复制代码
demo:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
// .....
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: s=" + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
复制代码
结果:
这类操做符,直接将一个数组
、集合
拆分红单个ObJect数据依次发送给下游,也能够直接发送Object数据。
转换一个或多个 Object
数据,依次将这些数据发射到下游。
最多接收十个
Object
参数。
示意图:
方法:
A : 最多只接收十个参数。
Demo:
Observable.just("A", "B", "C", "D")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
复制代码
结果:
直接传入一个数组数据,操做符将数组里面的元素按前后顺序依次发送给下游,能够发送十个以上的数据。
示意图:
方法:
static <T> Observable<T> fromArray(T... items)
复制代码
Demo:
String[] data = new String[]{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"};
Observable.fromArray(data)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "fromArray--accept: s=" + s
);
}
});
复制代码
结果:
直接传入一个集合数据,操做符将集合里面的元素按前后顺序依次发送给下游,能够发送十个以上的数据。
示意图:
方法:
static <T> Observable<T> fromIterable(Iterable<? extends T> source)
复制代码
Demo:
List<String> mData = new ArrayList<>();
mData.add("A");
mData.add("B");
mData.add("C");
Observable.fromIterable(mData)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "fromIterable--accept: s=" + s);
}
});
复制代码
结果:
快速建立一个被观察者对象,连续发送一个指定开始和总数的事件序列
当即发送,无延时
示意图:
方法:
static Observable<Integer> range(final int start, final int count)
复制代码
Demo:
// 从3开始发送,直到发送了十个数据中止。
Observable.range(3, 10).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "range--accept: integer=" + integer);
}
});
复制代码
结果:
快速建立一个被观察者,延迟必定时间后再每隔指定的一个时间发送一个事件(从0开始的整数)给下游。
发送数据从0开始,依次+1整数递增
延迟时间能够为0,重载方法不设置默认使用第二个参数数值。
示意图:
方法:
// initialDelay:发射第一个值须要等待时间
// period:后续每隔多少秒发射一个值
// unit:前两个参数的时间单位
Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
// 两参方法
public static Observable<Long> interval(long period, TimeUnit unit) {
// 第一个参数和第二个参数一致,即延迟period后再每隔period秒发送一个事件。
// 默认使用 Schedulers.computation()
return interval(period, period, unit, Schedulers.computation());
}
复制代码
示意图:
方法:
// initialDelay:发射第一个值须要等待时间
// period:后续每隔多少秒发射一个值
// unit:前两个参数的时间单位
// scheduler:等待发生并发出项目的调度程序
static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
复制代码
demo:
// 延迟2秒后发送一个事件,后续每隔五秒发送一个事件
Observable.interval(2, 5, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: aLong=" + aLong);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: error" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
复制代码
快速建立1个被观察者对象,每隔指定时间发送1个事件,能够指定事件发送开始的值和总的值。
示意图:
方法:
// start:范围起始值
// count:要发出的值的总数,若是为零,则操做员在初始延迟后发出onComplete。
// initialDelay:发出第一个值(开始)以前的初始延迟
// period:后续值之间的时间段
// unit:前面时间参数单位
static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
复制代码
demo:
// 第一个延迟三秒后发送int值2,后续每隔1秒累加发送给下游,一共发送10个数据。
Observable.intervalRange(2, 10, 3, 1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: aLong=" + aLong);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: error" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
复制代码
建立一个Observable
对象,被观察者逻辑真正执行的时机是在其被订阅的时候。
当下游订阅后,上游才开始处理逻辑。
示意图:
方法:
//
static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
复制代码
demo:
String[] mStrings = new String[]{"A", "B", "C", "D"};
Observable observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
// 上游发送mStrings数组
return Observable.fromArray(mStrings);
}
});
// 在订阅以前,将数组数据改变
mStrings = new String[]{"defer,订阅时候才建立"};
// 订阅
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
复制代码
建立一个被观察者对象,上游延时指定的时间后发送一个事件到下游。
发送的数值为Long型的0
示意图:
方法:
// delay:发射单个数据以前的延时
// unit:前者时间单位
// scheduler:指定的调度程序 (默认为Schedulers.computation())
static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
复制代码
demo:
public void timer() {
// 延迟5秒后发送Long型值0到下游,可指定Schedulers,默认Schedulers.computation()
Observable.timer(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "accept: aLong=" + aLong);
}
});
}
复制代码
结果:
对上游发送的每个事件都进行指定的函数处理,从而变换成另外一个事件再发送给下游。
常使用场景:用做数据类型转换
示意图:
方法:
// R:输出类型
// mapper:应用于ObservableSource发出的每一个项目的函数
final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
复制代码
demo:
public void map() {
// 经过just发送整型数值一、二、3
Observable.just(1, 2, 3).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
// 经过Map操做符对上游的数据进行函数处理,再转换成指定的事件发送给下游
return integer + "变换";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
复制代码
将一个发送事件的上游Observable变换为多个发送事件的Observables,而后将它们发射的事件单独作处理后再合并放进一个单独的Observable里发送给下游。
示意图:
能够看到上游发送了三个事件(注意颜色),中间对每一个事假数据进行处理后(每个圆变成两个矩形),再合并成包含六个矩形事件的Observable对象发送给下游,注意矩形颜色,他是无规律,无序的,并非严格按照上游发送的顺序来发送给下游。
方法:
final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
复制代码
demo:
public void flatMap() {
// 被观察者经过just发送整型数值一、二、3
Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
// 对收到的数值再进行函数处理。
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("变换后的数据" + integer);
}
// 将函数处理后的数据,在包装成一个Observable对象发送给下游。
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
复制代码
同flatMap
同样的功能,只是flatMap
不能保证转换后发送给下游事件的时序,concatMap转换后能严格按照上游发送的顺序再发送给下游。
示意图:
同
flatMap
同样,重点注意颜色,转换后颜色和上游发送的顺序一致,有序发送
方法:
final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
复制代码
demo:
public void concatMap() {
// 被观察者经过just发送整型数值一、二、3
Observable.just(1, 2, 3).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
// 对收到的数值再进行函数处理。
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("变换后的数据" + integer);
}
// 将函数处理后的数据,在包装成一个Observable对象发送给下游。
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
复制代码
组合多个被观察者一块儿发送数据,合并后 按发送顺序串行执行
组合的被观察者数量要求小于等于4个,从提供的方法参数里面能够得知。
示意图:
方法:
public static <T> Observable<T> concat(
ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
复制代码
demo:
public void concat() {
// 用just操做符建立三个Observable对象
Observable<String> observable1 = Observable.just("1", "2");
Observable<String> observable2 = Observable.just("A", "B", "C");
Observable<String> observable3 = Observable.just("hello", "rxjava");
// 使用concat操做符合并三个Observable对象,并将合并后的数据顺序(串行)发送给下游
Observable.concat(observable1
, observable2, observable3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
复制代码
同concat
同样,组合多个被观察者一块儿发送数据,合并后 按发送顺序串行执行
concatArray对组合的被观察者对象没有个数限制,能够大于4个。
示意图:
上游发送的是一个组合的观察者数组,没有数量限制(注意颜色)
转换后串行发送(颜色和上游发送顺序对应)
方法:
static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
复制代码
demo:
public void concatArray() {
Observable<String> observable1 = Observable.just("1", "2");
Observable<String> observable2 = Observable.just("A", "B", "C");
Observable<String> observable3 = Observable.just("D", "E");
Observable<String> observable4 = Observable.just("F");
Observable<String> observable5 = Observable.just("G");
Observable.concatArray(observable1, observable2, observable3, observable4, observable5)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
复制代码
使用concat操做符时,若是遇到其中一个被观察者发出onError
事件则会立刻终止其余被观察者的事件,若是但愿onError
事件推迟到其余被观察者都结束后才触发,可使用对应的concatDelayError。
方法:
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return concatDelayError(fromIterable(sources));
}
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
return concatDelayError(sources, bufferSize(), true);
}
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd)
复制代码
demo:
public void concatArrayDelayError() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onError(new NullPointerException(""));
emitter.onNext("D");
}
});
Observable.concatArrayDelayError(observable, Observable.just("E", "F"))
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: s="+s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: e" + e.getMessage(), e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
复制代码
结果:
能够看到,第一个observable发送到c后,手动抛出一个错误,可是并灭有影响到Observable.just("E", "F")的执行,咱们依旧打印出了 E,F两个参数后才去执行咱们手动抛出的NullPointerException错误
。。。。
操做符这部份内容比较多,先整理这部分,后面会对其余操做符再作整理。