RxJava操做符汇总
RxJava操做符(一) —-建立操做符
RxJava操做符(二)—-转换操做符
RxJava操做符(三)—-合并操做符
RxJava操做符(四)—-功能操做符
RxJava操做符(五) —-过滤操做符
RxJava操做符(六)—-条件操做符java
一、all() 操做符git
/** * ================all() 操做符================================================= * * 判断发送到数据是否都知足指定的条件 */
public static void all() {
Observable
.range(1, 5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG + "all", aBoolean ? "发送数据都小于5" : "发送的数据不知足全小于5");
}
});
}
输出以下:
github
二、repeatUntil() 操做符web
/** * ===================repeatUntil() 操做符======================================= * * repeat操做符的升级版。能够动态控制是否继续重复发射事件序列。 return 则中止重复,return 则继续重复发射 */
static int count = 0;
public static void repeatUntil() {
Observable
.just(1, 2, 3)
.repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
count++;
if (count >= 2)
return true;
return false;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "repeat", String.valueOf(integer));
}
});
}
输出以下:ide
三、takeUntil() 操做符svg
/** * ========================takeUntil 操做符 ====================================== * * 发送complete的结束条件,固然发送结束以前也会包含这个值. return true 时结束,false继续 * * 如下代码:观察者会接收到 0,1,2,3,4,5 */
public static void takeUntil() {
Observable
.range(0, 10)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer.equals(5))
return true;
return false;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "takeUntil", String.valueOf(integer));
}
});
}
输出以下:函数
/** * takeUntil 也能传入一个被观察者Observable,当该Obervable开始发送数据时(注意:观察者Observer不会接收事件),那么原始的Observable则中止发送 */
public static void takeUntil2() {
Observable
.interval(1, TimeUnit.SECONDS)
.takeUntil(Observable.timer(5, TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "takeUntil2", String.valueOf(aLong));
}
});
}
输出以下:
spa
四、takeWhile() 操做符.net
/** * ========================takeWhile 操做符 ====================================== * * 不知足这个条件时会发送结束。 reture true 继续发送,return false 中止发送 * * 如下代码:观察者会接收到 0,1,2,3,4,5 */
public static void takeWhile() {
Observable.range(0, 10)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer < 6)
return true;
return false;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "takeWhile", String.valueOf(integer));
}
});
}
输出以下:3d
五、skipWhile()操做符
/** * =====================skipWhile()================================ * * 判断发送的每项数据是否知足指定函数条件。直到该判断条件为false时,才开始发送observable的数据(前面的实际会丢弃) * * 如下代码:从6开始接收 */
public static void skipWhile() {
Observable
.interval(1, TimeUnit.SECONDS)
.skipWhile(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
if (aLong > 5)
return false;
else return true;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "skipWhile", String.valueOf(aLong));
}
});
}
输出以下:
六、sequenceEqual操做符
/** * =================sequenceEqual================================ * * 判断两个obervable须要发送的数据是否相等,若是相同则返回true,不然返回false */
public static void sequenceEqual() {
Observable
.sequenceEqual(Observable.just(4, 5, 6), Observable.just(4, 5, 6))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG + "sequenceEq", "两个Obervable是否相等:" + aBoolean);
}
});
}
输出以下:
七、contains()操做符
/** * =====================contains()================================= * * 判断发送的数据是否包含指定数据 */
public static void contains() {
Observable
.just(1, 2, 3, 4, 5)
.contains(3)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG + "contains", "发送的数据是否包含3:" + (aBoolean ? "是" : "否"));
}
});
}
输出以下:
八、isEmpty()操做符
/** * ==================isEmpty() ======================================= * * 判断被观察者发送的数据是否为空 */
public static void isEmpty() {
Observable
.just(1)
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG + "isEmpty", "发送的数据是否为空:" + (aBoolean ? "是" : "否"));
}
});
}
输出以下:
九、amb() 操做符
/** * =================amb() 操做符===================================== * * 当须要发送多个Observable时,只发送 先发送数据的Observerable 的数据 ,其他Observable会被丢弃 */
public static void amb() {
List<ObservableSource<Integer>> list = new ArrayList<>();
Observable observable1 = Observable.just(1, 2, 3);
Observable observable2 = Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS);
list.add(observable1);
list.add(observable2);
Observable
.amb(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
//只能接收到observable1 的发送的数据,而observable2会被丢弃
Log.d(TAG + "amb", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
输出以下:
十、defaultEmpty()操做符
/** * ==========================defaultEmpty() =============================== * * 在不发送一个有效事件(next事件)、仅发送了complete事件的前提下,发送一个默认值 */
public static void defaultEmpty() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onComplete();
}
})
.defaultIfEmpty(6)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "defaultIfEmp", String.valueOf(integer));
}
});
}
输出以下:
RxJava操做符汇总
RxJava操做符(一) —-建立操做符
RxJava操做符(二)—-转换操做符
RxJava操做符(三)—-合并操做符
RxJava操做符(四)—-功能操做符
RxJava操做符(五) —-过滤操做符
RxJava操做符(六)—-条件操做符