RxJava2.0入门篇

传统用法:开启子线程去作耗时任务,业务逻辑越复杂,代码嵌套越严重,Rx系列出来好久了,想本身作一个总结,但愿能帮到一部分人缓存

观察者模式先提一嘴

这个老模式简直不想说太多,就说一下流程服务器

1建立被观察者app

2建立观察者ide

3被观察者与观察者进行绑定函数

4当被观察者状态改变,观察者收到后作响应处理spa

第一步,RxJava建立被观察者

第一种方法:经过Observable.create(ObservableOnSubscribe)线程

这里边的emitter来发射数据和信息code

二:经过Observable.just(参数);server

三:经过Observable.from();blog

第二部,建立观察者

 1 Observer<Object> observer = new Observer<Object>() {
 2             @Override
 3             public void onSubscribe(Disposable d) {
 4                 //被订阅时调用
 5             }
 6 
 7             @Override
 8             public void onNext(Object o) {
 9           //当被观察者改变的时候调用的方法
10             }
11 
12             @Override
13             public void onError(Throwable e) {
14           //处理异常的方法
15             }
16 
17             @Override
18             public void onComplete() {
19           //再也不有新的事件的时候调用
20             }
21         };

订阅

observable.subscribe(observer);

订阅以后,代码将依次调用observer的onSubscribe(),observable的subscribe(),observer的onNext与onComplete

一个简单的模式就造成了

操做符

map -->把一个事件转化成另外一个事件

举个栗子:Integer转String操做

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: ");
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                String mapStr = String.valueOf(integer + 1);
                return mapStr;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

flatMap -->flatMap是一个很是强大的操做符,flatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,而后将它们发射的事件合并后放进一个单独的Observable里,可是flatmap不能保证事件的顺序

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).flatMap(new Function<Integer, Observable<String>>() {
            @Override
            public Observable<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 5; i++) {
                    String iStr = "flatMap value" + integer;
                    arrayList.add(iStr);
                }
                return Observable.fromIterable(arrayList).delay(10, TimeUnit.MICROSECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

 

concatMap -->做用和flatMap同样,可是保证了顺序

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(11);
                e.onNext(111);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    arrayList.add("concatMap value" + i + "integer" + integer);
                }
                return Observable.fromIterable(arrayList).delay(5, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

Buffer -->

Buffer操做符会按期收集Observable的数据放进一个数据包裹,而后发射这些包裹,并非一次发射一个值
Buffer操做符将一个Observable变换为另外一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。若是原来的Observable发射了一个onError通知,Buffer会当即传递这个通知,而不是首先发射缓存的数据。

scan -->
Scan连续地对数据序列的每一项应用一个函数,而后连续发射结果
Scan操做符对原始Observable发射的第一项数据应用一个函数,而后将这个函数的结果做为本身的第一项数据发射。将函数的结果同第二项数据一块儿填充给这个函数来产生本身的第二项数据。持续进行这个过程来产生剩余的数据序列。
Observable.just(1,2,3,4,5).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });

window -->

Window按期未来自原始Observable的数据分解为一个Observable窗口,发射这些窗口而不是每次发射一项数据

window和Buffer相似,但不是发射来自原始Observable的数据包,发射的是Observables,这些Observables中的每个都发射原始Observable数据的一个子集,最后发射一个onComplete通知。

zip -->

ZIP经过一个函数将多个Observable发送的事件结合到一块儿,而后发送这些组合到一块儿的事件。按照严格的顺序应用这个函数,只发射与发射项最少的那个Observable同样多的数据,zip在Android中的使用,能够适用于以下场景,一个界面须要展现用户的一些信息,这些信息分别要从两个服务器接口中获取,只有当两个数据都获取后才能进行展现。这类同时的信息请求比较适用zip

        //第一个事件
        Observable<Integer> observable1 = Observable.range(1, 5);
        //第二个事件
        Observable<Integer> observable2 = Observable.range(6, 10);
        //合并事件
        Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
            @Override
            public String apply(Integer integer, Integer integer2) throws Exception {
                return String.valueOf(integer + integer2);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
相关文章
相关标签/搜索