RxJava操做符(五) ----过滤操做符

RxJava各种型操做符详解以下:

RxJava操做符汇总
RxJava操做符(一) —-建立操做符
RxJava操做符(二)—-转换操做符
RxJava操做符(三)—-合并操做符
RxJava操做符(四)—-功能操做符
RxJava操做符(五) —-过滤操做符
RxJava操做符(六)—-条件操做符java



过滤操做符: 用于将Observable发送的数据进行过滤和选择。让Observable返回咱们所须要的数据。 过滤操做符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。

一、filter() 操做符 git

/** * ========================filter() 操做符 ====================================== * * 对被观察者发送的事件作过滤操做。只有符合筛选条件的事件才会被观察者所接收。 * * return true : 继续发送 * * return false : 不发送 */
    public static void filter() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        for (int i = 0; i < 3; i++) {
                            emitter.onNext(i);
                        }
                    }
                })
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        //return true : 继续发送
                        //return false : 不发送
                        return integer != 2; // 本例子过滤了 等于2的状况
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "filter", String.valueOf(integer));
                    }
                });

    }

输出以下:github

1

二、distinct() 操做符web

/** * ========================distinct 操做符 ====================================== * * 简单地说就是去重。发射的数据包含重复的,会将重复的筛选掉。也就是,它只容许尚未被发射过的数据经过,被观察者接收。发射完数据会自动调用onComplete()方法 * * y如下代码:观察者只会接收到 : 1,2,3,5 四个数值 */
    public static void distinct() {
        Observable
                .just(1, 2, 3, 2, 3, 5)
                .distinct()
                .subscribe(
                        new Observer<Integer>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                            }

                            @Override
                            public void onNext(Integer integer) {
                                Log.d(TAG + "distinct", String.valueOf(integer));
                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onComplete() {
                                Log.d(TAG + "distinct", "onComplete");
                            }
                        });

    }

输出以下:ide

2

三、distinctUntilChanged() 操做符svg

/** * ========================distinctUntilChanged 操做符 ====================================== * * 过滤掉连续重复的事件 */
    public static void distinctUntilChanged() {
        Observable.just(1, 2, 3, 1, 2, 3, 3, 4, 4)
                .distinctUntilChanged()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, String.valueOf(integer));
                    }
                });
    }

输出以下:spa

3

四、skip(),skipLast()操做符.net

/** * ========================skip,skipLast====================================== * * skip 操做符是把Observable发射的数据过滤点掉前n项。而take操做符只能接收前n项。另外还有skipLast和takeLast则是从后往前进行过滤.接收完会调用onComplete()方法 * */

    /** * 如下代码输出: 3,4,5 */
    public static void skip() {
        Observable
                .just(1, 2, 3, 4, 5, 6, 7)
                .skip(2)  //过滤前2项
                .skipLast(3) //过滤后3项
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG + "skip", "根据顺序过滤" + String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "skip", "onComplete");
                    }
                });
//-------------------------------------------------------------------------------------------------
        Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
                .skip(1, TimeUnit.SECONDS)   //过滤地1s发送的数据
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG + "skip", "根据事件过滤" + String.valueOf(aLong));
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });


    }

输出以下:3d

4

五、 take() 操做符code

/** * ====================== take 操做符=========================== * 只能接收两个事件 */
    public static void take() {
        Observable
                .just(1, 2, 3, 4, 5, 6, 7)
                .take(2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG + "skip", String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "skip", "onComplete");
                    }
                });
    }

输出以下:

54

六、takeLast() 操做符

/** * =======================takeLast() 操做符 ====================== * * 只能接收被观察者发送的最后几个事件 */

    public static void takeLast() {
        Observable
                .just(1, 2, 3, 4, 5, 6, 7)
                .takeLast(2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG + "skip", String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "skip", "onComplete");
                    }
                });
    }

输出以下:

55

七、elementAt() 操做符

/** * ========================elementAt() 操做符 ====================================== * * 只发射第n项数据. * 一个参数和两个参数时: * elementAt(第n项) * elementAt(第n项,第N项不存在时默认值) * * n为负数时,报IndexOUtOfBoundExection。为正数但超过发射数据长度不会报异常会使用默认值代替 */

    public static void elementAt() {

        Observable.range(1, 5)
                .elementAt(6, 10)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "elementAt", String.valueOf(integer));
                    }
                });
    }

输出以下:

56

八、elementAtOrError()操做符

/** * ==============elementAtOrError()=================================================== * * 在elementAtError()的基础上,当出现越界状况(当获取位置的索引>事件序列的长度),即抛出异常 */
    public static void elementAtOrError() {
        Observable.range(1, 5)
                .elementAtOrError(6)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "elementAtOrErr", String.valueOf(integer));
                    }
                });
    }

九、ignoreElements() 操做符

/** * ========================ignoreElements() 操做符 ====================================== * * 无论发射的数据.只但愿在它完成时和遇到错误时收到通知 */
    public static void ignoreElements() {

        Observable
                .range(0, 10)
                .ignoreElements()
                .subscribe(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG + "ignoreEles", "完成了");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG + "ignoreEles", "出错了");
                    }
                });
    }

输出以下:

57

十、ofType() 操做符

/** * ========================ofType() 操做符 ====================================== * * 经过数据的类型过滤数据,只发送指定类型数据。 * * 如下代码观察者只接收到: 1,2 */
    public static void ofType() {

        Observable.just("哈哈", 1, "嘻嘻", 2, 3.5)
                .ofType(Integer.class)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "ofType", String.valueOf(integer));
                    }
                });
    }

输出以下:

58

十一、throttleFirst()/throttleLast() 操做符

/** * ========================throttleFirst()/throttleLast() 操做符 ====================================== * * throttleFirst() 在某段时间内,只发送该段事件第一次事件 * * throttleLast() 在某段时间内,只发送该段事件最后一次事件 */

    public static void throttleFirst() {
        Observable.interval(300, TimeUnit.MILLISECONDS) //每一个0.3秒发送一个事件
                .throttleFirst(1, TimeUnit.SECONDS) //只接收每秒内发送的第一个数据
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "throttleFirst", String.valueOf(aLong));
                    }
                });
    }

输出以下:

59

十二、sample()

/** * ===================sample()================================= * * 在某段时间内,只发送该段时间内最新(最后)1次事件 * * 与throttleLast相似 */
    public static void sample() {
        Observable.interval(300, TimeUnit.MILLISECONDS) //每一个0.3秒发送一个事件
                .sample(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "sample", String.valueOf(aLong));
                    }
                });
    }

输出以下:

60

1三、firstElement()/lastElement()

/** * ========================firstElement()/lastElement()========================== * * 选取第一个元素/最后一个元素 */
    public static void firstElement() {
        Observable.just(1, 2, 3)
                .firstElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "firstElement", String.valueOf(integer));
                    }
                });
    }

输出以下:
61

public static void lastElement() {
        Observable
                .just(1, 2, 3)
                .lastElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "lastElement", String.valueOf(integer));
                    }
                });
    }

输出以下:
62


上面代码地址


RxJava各种型操做符详解以下:

RxJava操做符汇总
RxJava操做符(一) —-建立操做符
RxJava操做符(二)—-转换操做符
RxJava操做符(三)—-合并操做符
RxJava操做符(四)—-功能操做符
RxJava操做符(五) —-过滤操做符
RxJava操做符(六)—-条件操做符