RxJava(五):变换操做符

博客主页java

RxJava 的变换操做符主要包括如下几种:segmentfault

  • map:对序列的每一项都用一个函数来变换 Observable 发射的数据序列
  • flatMap、 concatMap 和 flatMapIterable:将 Observable 发射的数据集合变换为 Observables 集合,而后将这些 Observable 发射的数据平坦化地放进一个单独的 Observable 中
  • switchMap :将 Observable 发射的数据集合变换为 Observables 集合,而后只发射这些 Observables 最近发射过的数据。
  • scan :对 Observable 发射的每一项数据应用一个函数,而后按顺序依次发射每个值
  • groupBy :将 Observable 拆分为 Observable 集合,将原始 Observable 发射的数据按 Key 分组,每个 Observable 发射过一组不一样的数据
  • buffer :按期从 Observable 收集数据到一个集合,而后把这些数据集合打包发射,而不是一次发射一个
  • window :按期未来自 Observable 的数据拆分红一些 Observable 窗口,而后发射这些窗口,而不是每次发射一项
  • cast :在发射以前强制将 Observable 发射的全部数据转换为指定类型

1. map 和 flatMap

1.1 map 操做符

对 Observable 发射的每一项数据应用一个函数,执行变换操做
数组

map 操做符对原始 Observable 发射的每一项数据应用一个你选择的函数,而后返回一个发射这些结果的 Observable缓存

RxJava 将这个操做符实现为 map 函数,这个操做符默认不在任何特定的调度器上执行。数据结构

Observable.just("HELLO")
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s.toLowerCase();
            }
        })
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s + " world!";
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 执行结果
 Next: hello world!

第一次转换,将字符串 “HELLO“ 转换成全是小写字母的字符串 ”hello“。第二次转换,在字符串 ”hello“ 后面添加新的字符串 ” world!“,它们组成了新的字符串也就是执行结果。app

1.2 flatMap 操做符

flatMap 将一个发射数据的 Observable 变换为多个 Observables, 而后将它们发射的数据合并后放进一个单独 Observable
ide

flatMap 操做符使用一个指定的函数对原始 Observable 发射的每一项数据执行变换操做,这个函数返回一个自己也发射数据 Observable,而后 flatMap 合并这些 Observables 发射的数据,最后将合并后的结果看成它本身的数据序列发射。函数

下面看一个例子。先定义一个用户对象,包含用户名和地址,因为地址可能会包括生活、工做等地方,因此使用一个 List 对象来表示用户的地址spa

public class User {
    public String username;
    public List<Address> addresses;

    public static class Address {
        public String street;
        public String city;
    }
}

若是想打印出某个用户全部的地址,那么能够借助 map 操做符返回一个地址的列表。3d

Observable.just(user)
        .map(new Function<User, List<User.Address>>() {
            @Override
            public List<User.Address> apply(User user) throws Exception {
                return user.addresses;
            }
        })
        .subscribe(new Consumer<List<User.Address>>() {
            @Override
            public void accept(List<User.Address> addresses) throws Exception {
                for (User.Address address : addresses) {
                    Log.d(TAG, "Next: " + address.city);
                }
            }
        });

// 执行结果
 Next: shang hai
 Next: su zhou

换成 flatMap 操做符以后,flatMap 内部将用户的地址列表转换成一个 Observable

Observable.just(user)
        .flatMap(new Function<User, ObservableSource<User.Address>>() {
            @Override
            public ObservableSource<User.Address> apply(User user) throws Exception {
                return Observable.fromIterable(user.addresses);
            }
        })
        .subscribe(new Consumer<User.Address>() {
            @Override
            public void accept(User.Address address) throws Exception {
                Log.d(TAG, "Next: " + address.city);
            }
        });

// 执行结果
 Next: shang hai
 Next: su zhou

flatMap 对这些 Observables 发射的数据作的是合并 (merge) 操做, 所以它们多是交错的。还有一个操做符不会让变换后 Observables 发射的数据交错,它严格按照顺序发射这些数据,这个操做符就是 concatMap

2. groupBy

groupBy 操做符将一个 Observable 拆分为一些 Observables 集合,它们中的每个都发射原始
Observable 的一个子序列

哪一个数据项由哪个 Observable 发射是由一个函数断定的,这个函数给每一项指定一个 Key, Key 相同的数据会被同一个 Observable 发射。

最终返回的是 Observable 一个特殊子类 GroupedObservable 。它是一个抽象类。getKey()
方法是 GroupedObservable 方法,这个 Key 用于将数据分组到指定的 Observable

Observable.range(1, 8)
        .groupBy(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer % 2 == 0 ? "偶数组" : "奇数组";
            }
        })
        .subscribe(new Consumer<GroupedObservable<String, Integer>>() {
            @Override
            public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
                Log.d(TAG, "Next-> group name: " + stringIntegerGroupedObservable.getKey());
            }
        });

// 执行结果
 Next-> group name: 奇数组
 Next-> group name: 偶数组

对上述代码作一些修改, 对 GroupedObservable 使用 getKey() 方法,从而可以选出奇数组的
GroupedObservable ,最后打印出该 GroupedObservable 下的所有成员

Observable.range(1, 8)
        .groupBy(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer % 2 == 0 ? "偶数组" : "奇数组";
            }
        })
        .subscribe(new Consumer<GroupedObservable<String, Integer>>() {
            @Override
            public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {

                final String key = stringIntegerGroupedObservable.getKey();
                if (key.equals("奇数组")) {
                    stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "Next->" + key + ":" + integer);
                        }
                    });
                }
            }
        });

// 执行结果
 Next->奇数组:1
 Next->奇数组:3
 Next->奇数组:5
 Next->奇数组:7

3. buffer 和 window

3.1 buffer 操做符

buffer 会按期收集 Observable 数据并放进一个数据包裹,而后发射这些数据包裹,而不是一次发射一个值

buffer 操做符将 Observable 变换为另外一个,原来的 Observable 正常发射数据,由变换产生 Observable 发射这些数据的缓存集合。

Observable.range(1, 10)
        .buffer(2)
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.d(TAG, "Next: " + integers);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next: [1, 2]
 Next: [3, 4]
 Next: [5, 6]
 Next: [7, 8]
 Next: [9, 10]
 Complete.

上述代码,发射了从 1 到 10 这 10 个数字,因为使用了 buffer 操做符,它会将原先的 Observable 转换成新的 Observable,而新的 Observable 每次可发射两个数字,发射完毕后调用 onComplete()
方法。

查看 buffer 操做符的源码,能够看到使用 buffer 操做符以后转换成

public final Observable<List<T>> buffer(int count) {
    return buffer(count, count);
}

若是将发射的数据变成11,range(1, 11) 执行结果以下:

Next: [1, 2]
 Next: [3, 4]
 Next: [5, 6]
 Next: [7, 8]
 Next: [9, 10]
 Next: [11]
 Complete.

再修改一下代码,缓存5个数字,执行结果以下:

Next: [1, 2, 3, 4, 5]
 Next: [6, 7, 8, 9, 10]
 Next: [11]
 Complete.

在 RxJava 有许多 buffer 的重载方法,例如比较经常使用的 buffer(count, skip)

buffer(count, skip) 从原始 Observable 的第一项数据开始建立新的缓存,此后每当收到 skip 项数据,就用 count 项数据填充缓存: 开头的一项和后续的 count - 1 项。它以列表 (List) 的形式发射缓存,这些缓存可能会有重叠部分 (好比 skip < count),也可能会有间隙(好比 skip > count时),取决于 count 和 skip 的值。

Observable.range(1, 11)
        .buffer(5, 1)
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.d(TAG, "Next: " + integers);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next: [1, 2, 3, 4, 5]
 Next: [2, 3, 4, 5, 6]
 Next: [3, 4, 5, 6, 7]
 Next: [4, 5, 6, 7, 8]
 Next: [5, 6, 7, 8, 9]
 Next: [6, 7, 8, 9, 10]
 Next: [7, 8, 9, 10, 11]
 Next: [8, 9, 10, 11]
 Next: [9, 10, 11]
 Next: [10, 11]
 Next: [11]
 Complete.

若是原来 Observable 发射了一个 onError 通知,那么 buffer 会当即传递这个通知,而不是首先发射缓存的数据,即便在这以前缓存中包含了原始 Observable 发射的数据。

window 操做符与 buffer 相似, 但它在发射以前是把收集到的数据放进单独的 Observable,而不是放进 一个数据结构。

3.2 window 操做符

按期未来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据

window 发射的不是原始 Observable 数据包,而是 Observables,这些 Observables 中的每个都发射原始 Observable 数据的一个子集,最后发射一个 onComplete 通知。

Observable.range(1, 10)
        .window(2)
        .subscribe(new Consumer<Observable<Integer>>() {
            @Override
            public void accept(Observable<Integer> integerObservable) throws Exception {
                Log.d(TAG, "Next-> ");
                integerObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "       " + integer);
                    }
                });
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 
        1
        2
 Next-> 
        3
        4
 Next-> 
        5
        6
 Next-> 
        7
        8
 Next-> 
        9
        10
 Complete.

若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)

相关文章
相关标签/搜索