RxJava2.0基础操做符(二)

RxJava2.0入门教程(一)
1、建立操做符
(1)crete():建立一个被观察者对象 & 链式调用数组

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {

            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {

            }
        }, new Action() {
            @Override
            public void run() throws Exception {

            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {

            }
        });

(2)just():快速建立1个被观察者对象(直接发送传入的事件,最多只能发送10个参数)缓存

//建立对象后至关于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)方法
        Observable.just(1, 2, 3, 4).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

(3)formArray():快速建立1个被观察者对象(直接发送传入的数组数据,不限发送事件个数)数据结构

Integer[] items = {0, 1, 2, 3, 4};
        Observable.fromArray(items).subscribe();

(4)fromIterable():快速建立1个被观察者对象(直接发送传入的集合List数据,不限发送事件个数)app

List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        Observable.fromIterable(list).subscribe();

(5)never():不发送任何事件(观察者接收后什么都不调用)ide

Observable.never().subscribe();

(6)empty():仅发送Complete事件,直接通知完成(用于测试使用)函数

Observable.empty().subscribe();

(7)error():仅发送Error事件,直接通知异常(用于测试使用)测试

Observable.error(new RuntimeException()).subscribe();

(8)defer():直到有观察者订阅时,才动态建立被观察者对象 & 发送事件(动态建立被观察者对象 & 获取最新的Observable对象数据)
在订阅时才建立,因此i值会取第2次的赋值code

Integer i = 1;
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });
        i = 2;
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

(9)timer():快速建立1个被观察者对象(延迟指定时间后,发送1个数值0用于检测)orm

//参数:一、延迟时间。二、时间单位
Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {

                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

(10)interval():快速建立1个被观察者对象(每隔指定时间就发送事件)server

/**
         * 第一次延长时间
         * 间隔时间数字(每隔一秒产生一个数字,从0开始递增1,无限个)
         * 时间单位
         */
        Observable.interval(3,1,TimeUnit.SECONDS).subscribe();

(11)intervalRange():快速建立1个被观察者对象(每隔指定时间就发送事件,可指定发送的数据的数量)

/**
         * start:事件序列起始点
         * count:事件数量
         * initialDelay:第1次事件延迟发送时间
         * period:间隔时间数字
         * unit:时间单位
         */
        Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS).subscribe();

(12)range():快速建立1个被观察者对象(连续发送1个事件序列,可指定范围,无延迟发送事件,不可指定负数)

/**
         * start:事件序列起点
         * count:事件数量
         */
        Observable.range(3,10)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

(13)rangeLong():相似于range(),区别在于该方法支持数据类型 = Long

2、变换操做符
(1)Map():对被观察者发送的每1个事件都经过指定的函数处理,从而变换成另一种事件
示例:数据加1转化为String类型

private void initMap() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: ");
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                String mapStr = String.valueOf(integer + 1);
                return mapStr;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
    }

(2)FlatMap():将被观察者发送的事件序列进行拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送(无序的将被观察者发送的整个事件序列进行变换)
示例:将一个发送事件的被观察者Observable变换为多个发送事件的Observables,而后将它们发射的事件合并后放进一个单独的Observable里

private void initFlatMap() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("主事件 " + integer + "子事件" + i);
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String string) throws Exception {

            }
        });
    }

(3)ConcatMap():相似FlatMap()操做符(concatMap严格按照被观察者发送的顺序来发送)
示例:有序的将被观察者发送的整个事件序列进行变换

private void initConcatMap() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("主事件 " + integer + "子事件" + i);
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String string) throws Exception {

            }
        });
    }

(4)Buffer():按期从被观察者须要发送的事件中获取必定数量的事件 & 放到缓存区中,最终发送(缓存被观察者发送的事件)

/**
         * count:每次从被观察者中获取的事件数量 
         * skip:每次获取新事件的数量
         */
        Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 1)
                .subscribe(new Observer<List<Integer>>() {

                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<Integer> value) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

(5)Scan():Scan连续地对数据序列的每一项应用一个函数,而后连续发射结果
示例:累加计算

private void initScan() {
        Observable.just(1, 2, 3, 4, 5).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });
    }

3、组合\合并操做符:组合多个被观察者 & 合并须要发送的事件
1 ~ 3:组合多个被观察者一块儿发送数据
(1)concat() & concatArray():合并后按发送顺序串行执行

//concat()组合被观察者数量小于等于4个
        Observable.concat(Observable.just(1, 2, 3),
                Observable.just(4, 5, 6),
                Observable.just(7, 8, 9),
                Observable.just(10, 11, 12))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
        //concatArray()组合被观察者数量可大于4个
        Observable.concatArray(Observable.just(1, 2, 3),
                Observable.just(4, 5, 6),
                Observable.just(7, 8, 9),
                Observable.just(10, 11, 12),
                Observable.just(13, 14, 15))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });

(2)merge() & mergeArray():合并后按时间线并行执行

//组合多个被观察者一块儿发送数据小于等于4
        Observable.merge(Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Long value) {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
        //组合多个被观察者一块儿发送数据能够大于4
        Observable.mergeArray(Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(5, 9, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(1, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(3, 6, 1, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Long value) {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });

(3)concatDelayError() & mergeDelayError() & concatArrayDelayError() & mergeArrayDelayError():使用concat() & merge() & concatArray() & mergeArray()操做符时,其中一个被观察者发送onError,则会立刻终止其余被观察者发送的事件。使用DelayError方法onError事件推迟到其余被观察者发送事件结束才触发,

Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException());
                emitter.onComplete();
            }
        }), Observable.just(4, 5, 6)).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer value) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

(4)Zip():合并多个被观察者发送的事件,生成一个新的事件序列,并最终发送(最终合并的事件数量多个被观察者中数量最少的数量)

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
                emitter.onNext("B");
                emitter.onNext("C");
                emitter.onNext("D");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                return integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "最终接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

尽管被观察者2的事件D没有事件与其合并,但仍是会继续发送
若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,则被观察者2的事件D也不会发送

(5)combineLatest():当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与另一个Observable发送的每一个数据结合,最终基于该函数的结果发送数据

Observable.combineLatest(Observable.just(1L, 2L, 3L),
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                new BiFunction<Long, Long, Long>() {
                    @Override
                    public Long apply(Long o1, Long o2) throws Exception {
                        return o1 + o2;
                    }
                }).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long s) throws Exception {
                Log.e(TAG, "合并的结果是: " + s);
            }
        });

(6)combineLatestDelayError():与(3)中的做用相似(错误处理)

(7)reduce():把被观察者须要发送的事件聚合成1个事件 & 发送

Observable.just(1, 2, 3 ,4, 5, 6).reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                Log.e(TAG, "本次计算的数据是: " + s1 + " 乘 " + s2);
                return s1 * s2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最终计算的结果是: " + s);
            }
        });

(8)collect():将被观察者发送的数据事件收集到一个数据结构里

Observable.just(1, 2, 3, 4, 5, 6).collect(
                // 1. 建立数据结构(容器),用于收集被观察者发送的数据 
                new Callable<ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        return new ArrayList<>();
                    } // 2. 对发送的数据进行收集 
                }, new BiConsumer<ArrayList<Integer>, Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                        list.add(integer); // 对发送的数据进行收集 
                    }
                }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次发送的数据是: " + s);
            }
        });

(9)startWith() & startWithArray():在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者(后调用先追加)

Observable.just(4, 5, 6)
                .startWith(0) // 追加单个数据 = startWith()
                .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });

(10)count():统计被观察者发送事件的数量

Observable.just(1, 2, 3, 4).count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "发送的事件数量 = " + aLong);
            }
        });

RxJava2.0功能性操做符(三)