RxJava2.0入门教程(一)
1、建立操做符
(1)crete():建立一个被观察者对象 & 链式调用数组
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { } }, new Action() { @Override public void run() throws Exception { } }, new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { } });
(2)just():快速建立1个被观察者对象(直接发送传入的事件,最多只能发送10个参数)缓存
//建立对象后至关于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)方法 Observable.just(1, 2, 3, 4).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(3)formArray():快速建立1个被观察者对象(直接发送传入的数组数据,不限发送事件个数)数据结构
Integer[] items = {0, 1, 2, 3, 4}; Observable.fromArray(items).subscribe();
(4)fromIterable():快速建立1个被观察者对象(直接发送传入的集合List数据,不限发送事件个数)app
List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list).subscribe();
(5)never():不发送任何事件(观察者接收后什么都不调用)ide
Observable.never().subscribe();
(6)empty():仅发送Complete事件,直接通知完成(用于测试使用)函数
Observable.empty().subscribe();
(7)error():仅发送Error事件,直接通知异常(用于测试使用)测试
Observable.error(new RuntimeException()).subscribe();
(8)defer():直到有观察者订阅时,才动态建立被观察者对象 & 发送事件(动态建立被观察者对象 & 获取最新的Observable对象数据)
在订阅时才建立,因此i值会取第2次的赋值code
Integer i = 1; Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); } }); i = 2; observable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(9)timer():快速建立1个被观察者对象(延迟指定时间后,发送1个数值0用于检测)orm
//参数:一、延迟时间。二、时间单位 Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(10)interval():快速建立1个被观察者对象(每隔指定时间就发送事件)server
/** * 第一次延长时间 * 间隔时间数字(每隔一秒产生一个数字,从0开始递增1,无限个) * 时间单位 */ Observable.interval(3,1,TimeUnit.SECONDS).subscribe();
(11)intervalRange():快速建立1个被观察者对象(每隔指定时间就发送事件,可指定发送的数据的数量)
/** * start:事件序列起始点 * count:事件数量 * initialDelay:第1次事件延迟发送时间 * period:间隔时间数字 * unit:时间单位 */ Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS).subscribe();
(12)range():快速建立1个被观察者对象(连续发送1个事件序列,可指定范围,无延迟发送事件,不可指定负数)
/** * start:事件序列起点 * count:事件数量 */ Observable.range(3,10) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(13)rangeLong():相似于range(),区别在于该方法支持数据类型 = Long
2、变换操做符
(1)Map():对被观察者发送的每1个事件都经过指定的函数处理,从而变换成另一种事件
示例:数据加1转化为String类型
private void initMap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { Log.d(TAG, "subscribe: "); e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { String mapStr = String.valueOf(integer + 1); return mapStr; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "accept: " + s); } }); }
(2)FlatMap():将被观察者发送的事件序列进行拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送(无序的将被观察者发送的整个事件序列进行变换)
示例:将一个发送事件的被观察者Observable变换为多个发送事件的Observables,而后将它们发射的事件合并后放进一个单独的Observable里
private void initFlatMap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("主事件 " + integer + "子事件" + i); } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String string) throws Exception { } }); }
(3)ConcatMap():相似FlatMap()操做符(concatMap严格按照被观察者发送的顺序来发送)
示例:有序的将被观察者发送的整个事件序列进行变换
private void initConcatMap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { final List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("主事件 " + integer + "子事件" + i); } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String string) throws Exception { } }); }
(4)Buffer():按期从被观察者须要发送的事件中获取必定数量的事件 & 放到缓存区中,最终发送(缓存被观察者发送的事件)
/** * count:每次从被观察者中获取的事件数量 * skip:每次获取新事件的数量 */ Observable.just(1, 2, 3, 4, 5) .buffer(3, 1) .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<Integer> value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(5)Scan():Scan连续地对数据序列的每一项应用一个函数,而后连续发射结果
示例:累加计算
private void initScan() { Observable.just(1, 2, 3, 4, 5).scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: " + integer); } }); }
3、组合\合并操做符:组合多个被观察者 & 合并须要发送的事件
1 ~ 3:组合多个被观察者一块儿发送数据
(1)concat() & concatArray():合并后按发送顺序串行执行
//concat()组合被观察者数量小于等于4个 Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6), Observable.just(7, 8, 9), Observable.just(10, 11, 12)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); //concatArray()组合被观察者数量可大于4个 Observable.concatArray(Observable.just(1, 2, 3), Observable.just(4, 5, 6), Observable.just(7, 8, 9), Observable.just(10, 11, 12), Observable.just(13, 14, 15)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(2)merge() & mergeArray():合并后按时间线并行执行
//组合多个被观察者一块儿发送数据小于等于4 Observable.merge(Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); //组合多个被观察者一块儿发送数据能够大于4 Observable.mergeArray(Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS), Observable.intervalRange(5, 9, 1, 1, TimeUnit.SECONDS), Observable.intervalRange(1, 3, 1, 1, TimeUnit.SECONDS), Observable.intervalRange(3, 6, 1, 1, TimeUnit.SECONDS)) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(3)concatDelayError() & mergeDelayError() & concatArrayDelayError() & mergeArrayDelayError():使用concat() & merge() & concatArray() & mergeArray()操做符时,其中一个被观察者发送onError,则会立刻终止其余被观察者发送的事件。使用DelayError方法onError事件推迟到其余被观察者发送事件结束才触发,
Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); emitter.onComplete(); } }), Observable.just(4, 5, 6)).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(4)Zip():合并多个被观察者发送的事件,生成一个新的事件序列,并最终发送(最终合并的事件数量多个被观察者中数量最少的数量)
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("A"); emitter.onNext("B"); emitter.onNext("C"); emitter.onNext("D"); emitter.onComplete(); } }).subscribeOn(Schedulers.newThread()); Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String string) throws Exception { return integer + string; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String value) { Log.d(TAG, "最终接收到的事件 = " + value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
尽管被观察者2的事件D没有事件与其合并,但仍是会继续发送
若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,则被观察者2的事件D也不会发送
(5)combineLatest():当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与另一个Observable发送的每一个数据结合,最终基于该函数的结果发送数据
Observable.combineLatest(Observable.just(1L, 2L, 3L), Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() { @Override public Long apply(Long o1, Long o2) throws Exception { return o1 + o2; } }).subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { Log.e(TAG, "合并的结果是: " + s); } });
(6)combineLatestDelayError():与(3)中的做用相似(错误处理)
(7)reduce():把被观察者须要发送的事件聚合成1个事件 & 发送
Observable.just(1, 2, 3 ,4, 5, 6).reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception { Log.e(TAG, "本次计算的数据是: " + s1 + " 乘 " + s2); return s1 * s2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer s) throws Exception { Log.e(TAG, "最终计算的结果是: " + s); } });
(8)collect():将被观察者发送的数据事件收集到一个数据结构里
Observable.just(1, 2, 3, 4, 5, 6).collect( // 1. 建立数据结构(容器),用于收集被观察者发送的数据 new Callable<ArrayList<Integer>>() { @Override public ArrayList<Integer> call() throws Exception { return new ArrayList<>(); } // 2. 对发送的数据进行收集 }, new BiConsumer<ArrayList<Integer>, Integer>() { @Override public void accept(ArrayList<Integer> list, Integer integer) throws Exception { list.add(integer); // 对发送的数据进行收集 } }).subscribe(new Consumer<ArrayList<Integer>>() { @Override public void accept(@NonNull ArrayList<Integer> s) throws Exception { Log.e(TAG, "本次发送的数据是: " + s); } });
(9)startWith() & startWithArray():在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者(后调用先追加)
Observable.just(4, 5, 6) .startWith(0) // 追加单个数据 = startWith() .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件" + value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
(10)count():统计被观察者发送事件的数量
Observable.just(1, 2, 3, 4).count().subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(TAG, "发送的事件数量 = " + aLong); } });