Rxjava(七):条件操做符和布尔操做符

博客主页java

RxJava 的条件操做符主要包括如下几个:segmentfault

  • amb :给定多个 Observable ,只让第一个发射数据的 Observable 发射所有数据
  • defaultlfEmpty :发射来自原始 Observable 的数据,若是原始 Observable 没有发射数据,则发射一个默认数据
  • skipUntil :丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,而后发射原始 Observable 的剩余数据
  • skipWhile :丢弃原始 Observable 发射的数据,直到一个特定的条件为假,而后发射原始 Observable 剩余的数据
  • takeUntil :发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知
  • takeWhile and takeWhileWithIndex:发射原始 Observable 的数据,直到一个特定的条件为真,而后跳过剩余的数据

RxJava 的布尔操做符主要包括:ide

  • all :判断是否全部的数据项都知足某个条件
  • contains :判断 Observable 是否会发射一个指定的值
  • exists and isEmpty :判断 Observable 是否发射了一个值
  • sequenceEqual :判断两个 Observables 发射的序列是否相等

1. all、contains 和 amb

1.1 all 操做符

断定 Observable 发射的全部数据是否都知足某个条件
函数

传递一个谓词函数给 all 操做符,这个函数接受原始 Observable 发射的数据,根据计算返回一个布尔值。 all 返回一个只发射单个布尔值的 Observable,若是原始 Observable 正常终止而且每一项数据都知足条件,就返回 true。若是原始 Observabl 的任意一项数据不知足条件,就返回falseui

Observable.just(1, 2, 3, 4, 5)
        .all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 10;
            }
        }).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
});

// 执行结果
 Success: true

判断 Observable 发射的全部数据是否都大于 3spa

Observable.just(1, 2, 3, 4, 5)
        .all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer > 3;
            }
        }).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
});

// 执行结果
 Success: false

1.2 contains 操做符

断定一个 Observable 是否发射了一个特定的值
code

给 contains 传一个指定的值,若是原始 Observable 发射了那个值,那么返回的 Observable 将发射 true,不然发射 false 。与它相关的一个操做符是 isEmpty ,用于断定原始 Observable 是否未发射任何数据。对象

Observable.just(2, 30, 22, 5, 60, 1)
        .contains(22)
        .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG, "Success: " + aBoolean);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        });

// 执行结果
 Success: true

1.3 amb 操做符

给定两个或多个 Observable ,它只发射首先发射数据或通知的那个 Observable 的全部数据
blog

当传递多个 Observable 给 amb 时,它只发射其中一个 Observable 数据和通知: 首先发送通知给 amb 的那个 Observable ,无论发射的是一项数据 ,仍是一个 onError 或 onCompleted 通知。 amb 忽略和丢弃其余全部 Observables 的发射物。ip

在 RxJava 中, amb 还有一个相似的操做符 ambWith。 例如, Observable.amb(ol, o2 )和
ol.ambWith(o2)是等价的

在 RxJava 2.x 中, amb 须要传递 Iterable 对象,或者使用 ambArray 来传递可变参数。

Observable.ambArray(
        Observable.just(1, 2, 3),
        Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 1
 Next: 2
 Next: 3

修改一下代码,第一个 Observable 延迟 ls 后再发射数据

Observable.ambArray(
        Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS),
        Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 4
 Next: 5
 Next: 6

因为第一个 Observable 延迟发射,所以咱们只消费了第二个 Observable 的数据,第一个 Observable 发射的数据就再也不处理了。

2. defaultlfEmpty

发射来自原始 Observable 值,若是原始 Observable 没有发射任何值,就发射一个默认值

defaultIfEmpty 简单精确地发射原始 Observable 的值,若是原始 Observable 没有发射任何数据,就正常终止(以 onComplete 形式了),那么 defaultlfEmpty 返回的 Observable 就发射一个咱们提供的默认值。

Observable.empty()
        .defaultIfEmpty(8)
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                Log.d(TAG, "Next: " + o);
            }
        });

// 执行结果
 Next: 8

在 defaultIfEmpty 方法内部,其实调用的是 switchIfEmpty 操做符,源码以下:

public final Observable<T> defaultIfEmpty(T defaultItem) {
    ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
    return switchIfEmpty(just(defaultItem));
}

defaultIfEmpty 和 switchIfEmpty 的区别是, defaultIfEmpty 操做符只能在被观察者不发送数据时发送一个默认的数据 ,若是想要发送更多数据,则可使用 switchIfEmpty 操做符,发送自定义的被观察者做为替代。

Observable.empty()
        .switchIfEmpty(Observable.just(1, 2, 3))
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                Log.d(TAG, "Next: " + o);
            }
        });

// 执行结果
 Next: 1
 Next: 2
 Next: 3

3. sequenceEqual

断定两个 Observable 是否发射相同的数据序列

传递两个 Observable 给 sequenceEqual 操做符时,它会比较两个 Observable 发射物,若是两个序列相同(相同的数据,相同的顺序,相同的终止状态〉 ,则发射 true 不然发射 false

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5)
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 执行结果
 Success: true

将两个 Observable 改为不一致

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5, 6)
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 执行结果
 Success: false

sequenceEqual 还有一个版本接受第三个参数,能够传递一个函数用于比较两个数据项是否相同。对于复杂对象的比较,用三个参数的版本更为合适。

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5),
        new BiPredicate<Integer, Integer>() {
            @Override
            public boolean test(Integer integer, Integer integer2) throws Exception {
                return integer == integer2;
            }
        }
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 执行结果
 Success: true

4. skipUntil 和 skipWhile

4.1 skipUntil 操做符

丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据

skipUntil 订阅原始的 Observable,可是忽略它的发射物,直到第二个 Observable 发射一项数据那一刻,它才开始发射原始的 Observabl。 skipUntil 默认不在任何特定的调度器上执行。

Observable.intervalRange(1, 9, 0, 1, TimeUnit.MILLISECONDS)
        .skipUntil(Observable.timer(4, TimeUnit.MILLISECONDS))
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next: " + aLong);
            }
        });

// 执行结果
 Next: 4
 Next: 5
 Next: 6
 Next: 7
 Next: 8
 Next: 9

上述代码,原始的 Observable 发射 1 到 9 这 9 个数 ,初始延迟时间是 0,每间隔 lms。因为使用 skipUntil,所以它会发射原始 Observable 在 3ms 以后的数据。

4.2 skipWhile 操做符

丢弃 Observable 发射的数据,直到一个指定的条件不成立。

skipWhile 订阅原始的 Observable ,可是忽略它的发射物,直到指定的某个条件变为 false。它才开始发射原始的 Observable。skipWhile 默认不在任何特定的调度器上执行

Observable.just(1, 2, 3, 4, 5)
        .skipWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer <= 2;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 3
 Next: 4
 Next: 5

5. takeUntil 和 takeWhile

5.1 takeUntil 操做符

当第二个 Observable 发射了一项数据或者终止时,丢弃原始 Observable 发射的任何数据

takeUntil 订阅并开始发射原始 Observable ,它还监视你提供的第二个 Observable。若是第二个 Observable 发射了一项数据或者发射了一个终止通知,则 takeUntil 返回的 Observable 会中止发射原始 Observable 并终止。

Observable.just(1, 2, 3, 4, 5, 6)
        .takeUntil(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer == 4;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 1
 Next: 2
 Next: 3
 Next: 4

5.2 takeWhile 操做符

发射原始 Observable 发射的数据,直到一个指定的条件不成立

takeWhile 发射原始的 Observable 直到某个指定的条件不成立,它会当即中止发射原始 Observable,并终止本身的 Observable。

RxJava 中的 takeWhile 操做符返回一个原始 Observable 行为的 Observable,直到某项数据,指定的函数返回 false ,这个新的 Observable 就会发射 onComplete 终止通知

Observable.just(1, 2, 3, 4, 5, 6)
        .takeWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer <= 2;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 执行结果
 Next: 1
 Next: 2
 Complete.

若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)

相关文章
相关标签/搜索