Rxjava2 Observable的数据变换详解及实例(二)

接续上篇: Rxjava2 Observable的数据变换详解及实例(一)java

1. Window

按期未来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据。react

img-window
WindowBuffer 相似,但不是发射来自原始Observable的数据包,它发射的是 Observables,这些Observables中的每个都发射原始Observable数据的一个子集,最后发 射一个 onCompleted 通知。git

Buffer同样,Window 有不少变体,每一种都以本身的方式将原始Observable分解为多个做为结果的Observable,每个都包含一个映射原始数据的 window 。用 Window操做符的术语描述就是,当一个窗口打开(when a window "opens")意味着一个新的Observable已经发射 (产生)了,并且这个Observable开始发射来自原始Observable的数据;当一个窗口关闭 (when a window "closes")意味着发射(产生)的Observable中止发射原始Observable的数据, 而且发射终止通知 onCompleted 给它的观察者们。github

在RxJava中有许多种Window操做符的方法。缓存

1.1 window(closingSelector)

window 的这个方法会当即打开它的第一个窗口。每当它观察到closingSelector返回的 Observable发射了一个对象时,它就关闭当前打开的窗口并当即打开一个新窗口。用这个方法,这种 window 变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。网络

img-window(closingSelector)
解析: 一开始开启一个 window 接收原始数据,每当它观察到closingSelector返回的 Observable发射了一个对象时,它就关闭当前打开的窗口并取消此时订阅closingSelector 的Observable ( 此时多是没有数据 window )并当即打开一个新窗口,注意: 每一个窗口开启前都会去订阅一个closingSelector返回的 Observable。app

实例代码:ide

// 1. window(Callable boundary)
    // 开启一个window,并订阅观察boundary返回的Observable发射了一个数据,
    // 则关闭此window,将收集的数据以Observable发送, 从新订阅boundary返回的Observable,开启新window
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .window(new Callable<Observable<Long>>() {

            @Override
            public Observable<Long> call() throws Exception {
                System.out.println("--> call(1)");
                return Observable.timer(2, TimeUnit.SECONDS); // 两秒后关闭当前窗口
            }
        }).subscribe(new Consumer<Observable<Long>>() {

            @Override
            public void accept(Observable<Long> t) throws Exception {
                // 接受每一个window接受的数据的Observable
                t.subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long t) throws Exception {
                        System.out.println("--> accept(1): " + t);
                    }
                });
            }
        });

输出:函数

--> call(1)
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> call(1)
--> accept(1): 4
--> accept(1): 5
--> call(1)
--> accept(1): 6
--> accept(1): 7
--> call(1)
--> accept(1): 8
--> accept(1): 9
--> call(1)
--> accept(1): 10

Javadoc: window(closingSelector)
Javadoc: window(closingSelector, bufferSize)

1.2 window(openingIndicator, closingIndicator)

openingIndicator 发射一个数据,就会打开一个 window, 同时订阅 closingIndicator 返回的Observable,当这个Observable发射一个数据,就结束此 window 和 ,发送收集数据的 Observable。

img-window(openingIndicator, closingIndicator)
不管什么时候,只要 window 观察到 windowOpenings 这个Observable发射了一个 Opening 对象,它就打开一个窗口,而且同时调用 closingSelector 生成一个与那个窗口关联的关闭 (closing)Observable 。当这个关闭 (closing)Observable 发射了一个对象时,window 操做符就会关闭那个窗口以及关联的closingSelector的 Observable。

注意: 对这个方法来讲,因为当前窗口的关闭和新窗口的打开是由单独的 Observable 管理的,它建立的窗口可能会存在重叠(重复某些来自原始Observable的数据) 或间隙(丢弃某些来自原始Observable的数据)

实例代码:

// 2. window(ObservableSource openingIndicator, Function<T, ObservableSource<R>> closingIndicator)
    // 当openingIndicator发射一个数据,就会打开一个window, 同时订阅closingIndicator返回的Observable,
    // 当这个Observable发射一个数据,就结束此window以及对应的closingIndicator,发送收集数据的 Observable。
    Observable<Long> openingIndicator = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable t) throws Exception {
                    System.out.println("--> openingIndicator is subscribe!");
                }
            }).doOnComplete(new Action() {
                
                @Override
                public void run() throws Exception {
                    System.out.println("--> openingIndicator is completed!");
                }
            }).doOnNext(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> openingIndicator emitter: " + t);
                }
            });
    
    Observable<Long> dataSource = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable t) throws Exception {
                    System.out.println("--> DataSource is subscribe!");
                }
            }).doOnNext(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> DataSource emitter: " + t);
                }
            });
    
    dataSource.window(openingIndicator, new Function<Long, Observable<Long>>() {

                @Override
                public Observable<Long> apply(Long t) throws Exception {
                    System.out.println("--> apply(2): " + t);
                    return Observable.timer(2, TimeUnit.SECONDS).doOnSubscribe(new Consumer<Disposable>() {

                        @Override
                        public void accept(Disposable t) throws Exception {
                            System.out.println("--> closingIndicator is subscribe!");
                        }
                    });
                }
            }).subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    System.out.println("-------------------> new window data");
                    t.subscribe(new Consumer<Long>() {

                        @Override
                        public void accept(Long t) throws Exception {
                            System.out.println("--> accept(2): " + t);
                        }
                    });
                }
            });

输出:

--> DataSource is subscribe!
--> openingIndicator is subscribe!
--> openingIndicator emitter: 1
--> DataSource emitter: 1
-------------------> new window data
--> apply(2): 1
--> closingIndicator is subscribe!
--> openingIndicator emitter: 2
--> DataSource emitter: 2
-------------------> new window data
--> apply(2): 2
--> closingIndicator is subscribe!
--> accept(2): 2
--> accept(2): 2
--> openingIndicator emitter: 3
--> DataSource emitter: 3
-------------------> new window data
--> apply(2): 3
--> closingIndicator is subscribe!
--> accept(2): 3
--> accept(2): 3
--> accept(2): 3
--> DataSource emitter: 4
--> openingIndicator emitter: 4
--> accept(2): 4
--> accept(2): 4
-------------------> new window data
--> apply(2): 4
--> closingIndicator is subscribe!
--> DataSource emitter: 5
--> accept(2): 5
--> accept(2): 5
--> openingIndicator emitter: 5

Javadoc: window(openingIndicator, closingIndicator)
Javadoc: window(openingIndicator, closingIndicator,bufferSize)

1.3 window(count)

这个 window 的方法当即打开它的第一个窗口。每当当前窗口发射了 count 项数据,它就关闭当前窗口并打开一个新窗口。若是从原始Observable收到了 onErroronCompleted 通知它也会关闭当前窗口。

这种 window 方法发射一系列不重叠的窗口,这些窗口的数据集合与原始 Observable发射的数据是 一一对应 的。

img-window(count)

实例代码:

// 3. window(count)
    // 以count为缓存大小收集的不重叠的Observables对象,接受的数据与原数据彼此对应
    Observable.range(1, 20)
        .window(5)  // 设置缓存大小为5
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                System.out.println("--------------> new data window");
                t.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer t) throws Exception {
                        System.out.println("--> accept window(3): " + t);
                    }
                });
            }
        });

输出:

--------------> new data window
--> accept window(3): 1
--> accept window(3): 2
--> accept window(3): 3
--> accept window(3): 4
--> accept window(3): 5
--------------> new data window
--> accept window(3): 6
--> accept window(3): 7
--> accept window(3): 8
--> accept window(3): 9
--> accept window(3): 10
--------------> new data window
--> accept window(3): 11
--> accept window(3): 12
--> accept window(3): 13
--> accept window(3): 14
--> accept window(3): 15
--------------> new data window
--> accept window(3): 16
--> accept window(3): 17
--> accept window(3): 18
--> accept window(3): 19
--> accept window(3): 20

Javadoc: window(count)

1.4 window(count, skip)

这个 window 的方法当即打开它的第一个窗口。原始Observable每发射 skip 项数据它就打开 一个新窗口(例如,若是 skip 等于3,每到第三项数据,它会建立一个新窗口)。每当当前窗口发射了 count 项数据,它就关闭当前窗口并打开一个新窗口。若是从原始Observable 收到了onErroronCompleted 通知它也会关闭当前窗口。

img-window(count, skip)

解析: window 一开始打开一个 window,每发射 skip 项数据就会打开一个 window 独立收集 原始数据,当 window 收集了 count 个数据就会关闭,开启另一个。当原始Observable发送了onError或者onCompleted通知也会关闭当前窗口。

  • skip = count: 会依次顺序接受原始数据,同window(count)
  • skip > count: 两个窗口可能会有 skip-count 项数据丢失
  • skip < count: 两个窗口可能会有 count-skip 项数据重叠

实例代码:

// 4. window(count,skip)
    // window一开始打开一个window,每发射skip项数据就会打开一个window独立收集原始数据
    // 当window收集了count个数据就会关闭window,开启另一个。
    // 当原始Observable发送了onError 或者 onCompleted 通知也会关闭当前窗口。
    // 4.1 skip = count: 会依次顺序接受原始数据,同window(count)
    Observable.range(1, 10)
        .window(2, 2)   // skip = count, 数据会依次顺序输出
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Integer>() {
    
                        @Override
                        public void accept(Integer t) throws Exception {
                            System.out.println("--> accept window(4-1): " + t +" , ThreadID: "+ Thread.currentThread().getId());
                        }
                    });
            }
        });
    
    // 4.2 skip > count: 两个窗口可能会有 skip-count 项数据丢失
    Observable.range(1, 10)
        .window(2, 3)   // skip > count, 数据会存在丢失
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Integer>() {
    
                        @Override
                        public void accept(Integer t) throws Exception {
                            System.out.println("--> accept window(4-2): " + t +" , ThreadID: "+ Thread.currentThread().getId());
                        }
                    });
            }
        });
    
    // 4.3 skip < count: 两个窗口可能会有 count-skip 项数据重叠
    Observable.range(1, 10)
        .window(3, 2)   // skip < count, 数据会重叠
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Integer>() {
    
                        @Override
                        public void accept(Integer t) throws Exception {
                            System.out.println("--> accept window(4-3): " + t +" , ThreadID: "+ Thread.currentThread().getId());
                        }
                    });
            }
        });

输出:

--> accept window(4-1): 1 , ThreadID: 11
--> accept window(4-1): 2 , ThreadID: 11
--> accept window(4-1): 4 , ThreadID: 12
--> accept window(4-1): 3 , ThreadID: 11
--> accept window(4-1): 5 , ThreadID: 12
--> accept window(4-1): 6 , ThreadID: 12
--> accept window(4-1): 7 , ThreadID: 13
--> accept window(4-1): 8 , ThreadID: 13
--> accept window(4-1): 9 , ThreadID: 13
--> accept window(4-1): 10 , ThreadID: 14
--> accept window(4-2): 1 , ThreadID: 15
--> accept window(4-2): 2 , ThreadID: 15
--> accept window(4-2): 4 , ThreadID: 16
--> accept window(4-2): 5 , ThreadID: 16
--> accept window(4-2): 7 , ThreadID: 17
--> accept window(4-2): 8 , ThreadID: 17
--> accept window(4-2): 10 , ThreadID: 18
--> accept window(4-3): 1 , ThreadID: 19
--> accept window(4-3): 2 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 20
--> accept window(4-3): 4 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 21
--> accept window(4-3): 6 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 22
--> accept window(4-3): 8 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 23
--> accept window(4-3): 10 , ThreadID: 23

Javadoc: window(count, skip)

1.5 window(timespan, TimeUnit)

这个 window 的方法当即打开它的第一个窗口收集数据。每当过了 timespan 这么长的时间段它就关闭当前窗口并打开一个新窗口(时间单位是 unit ,可选在调度器 scheduler 上执行)收集数据。若是从原始 Observable 收到了 onError 或 onCompleted 通知它也会关闭当前窗口。

这种 window 方法发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是 一一对应 的。

实例代码:

// 5. window(long timespan, TimeUnit unit)
    // 每当过了 timespan 的时间段,它就关闭当前窗口并打开另外一个新window收集数据
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .window(2, TimeUnit.SECONDS)                            // 间隔2秒关闭当前 window 并打开一个新 window 收集数据
    //  .window(2, TimeUnit.SECONDS, Schedulers.newThread())    // 指定在 newThread 线程中
        .subscribe(new Consumer<Observable<Long>>() {

            @Override
            public void accept(Observable<Long> t) throws Exception {
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(5): " + t +" , ThreadID: "+ Thread.currentThread().getId() );
                            }
                        });
            }
        });

输出:

--> accept window(5): 1 , ThreadID: 11
--> accept window(5): 2 , ThreadID: 11
--> accept window(5): 3 , ThreadID: 11
--> accept window(5): 4 , ThreadID: 14
--> accept window(5): 5 , ThreadID: 14
--> accept window(5): 6 , ThreadID: 15
--> accept window(5): 7 , ThreadID: 16
--> accept window(5): 8 , ThreadID: 16
--> accept window(5): 9 , ThreadID: 17
--> accept window(5): 10 , ThreadID: 17

Javadoc: window(timespan, TimeUnit)
Javadoc: window(timespan, TimeUnit, scheduler)

1.6 window(timespan, TimeUnit, count)

这个 window 的方法当即打开它的第一个窗口。这个变体是 window(count) 和 window(timespan, unit[, scheduler]) 的结合,每当过了 timespan 的时长或者当前窗口收到了 count 项数据,它就关闭当前窗口并打开另外一个。若是从原始 Observable收到了 onErroronCompleted 通知它也会关闭当前窗口。

这种window方法发射 一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是 一一对应 的。

img-window(timespan, TimeUnit, count)

实例代码:

// 6. window(long timespan, TimeUnit unit, long count)
    // 每当过了timespan的时间段或者当前窗口收到了count项数据,它就关闭当前window并打开另外一个window收集数据
    Observable.intervalRange(1, 12, 0, 500, TimeUnit.MILLISECONDS)
        .window(2, TimeUnit.SECONDS, 5) // 每隔2秒关闭当前收集数据的window并开启一个window收集5项数据
    //  .window(2, TimeUnit.SECONDS,Schedulers.newThread(), 5 ) // 指定在 newThread 线程中
        .subscribe(new Consumer<Observable<Long>>() {

            @Override
            public void accept(Observable<Long> t) throws Exception {
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Long>() {

                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(6): " + t + " , ThreadID: "+ Thread.currentThread().getId() );
                            }
                        });
            }
        });

输出:

--> accept window(6): 1 , ThreadID: 11
--> accept window(6): 2 , ThreadID: 11
--> accept window(6): 3 , ThreadID: 11
--> accept window(6): 4 , ThreadID: 11
--> accept window(6): 5 , ThreadID: 11
--> accept window(6): 6 , ThreadID: 14
--> accept window(6): 7 , ThreadID: 14
--> accept window(6): 8 , ThreadID: 14
--> accept window(6): 9 , ThreadID: 14
--> accept window(6): 10 , ThreadID: 14
--> accept window(6): 11 , ThreadID: 15
--> accept window(6): 12 , ThreadID: 15

Javadoc: window(timespan, TimeUnit, count)
Javadoc: window(timespan, TimeUnit, scheduler, count)

1.7 window(timespan, timeskip, TimeUnit)

这个 window 的方法当即打开它的第一个窗口。随后每当过了 timeskip 的时长就打开一个新窗口(时间单位是 unit ,可选在调度器 scheduler 上执行),当窗口打开的时长达 到 timespan ,它就关闭当前打开的窗口。若是从原始Observable收到 了 onError 或 onCompleted 通知它也会关闭当前窗口。窗口的数据可能重叠也可能有间隙,取决于你设置的 timeskiptimespan 的值。

img-window(timespan,timeskip, TimeUnit)
解析: 在每个 timeskip 时期内都建立一个新的 window,而后独立收集 timespan 时间段的原始Observable发射的每一项数据。注意:由于每一个 window 都是独立接收数据,当接收数据的时间与建立新 window 的时间不一致时会有数据项重复,丢失等状况。

  • skip = timespan: 会依次顺序接受原始数据,同window(count)
  • skip > timespan: 两个窗口可能会有 skip-timespan 项数据丢失
  • skip < timespan: 两个窗口可能会有 timespan-skip 项数据重叠

实例代码:

// 7. window(long timespan, long timeskip, TimeUnit unit)
        // 在每个timeskip时期内都建立一个新的window,而后独立收集timespan时间段的原始Observable发射的每一项数据,
        // 若是timespan长于timeskip,它发射的数据包将会重叠,所以可能包含重复的数据项。
        // 7.1 skip = timespan: 会依次顺序接受原始数据,同window(count)
        Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .window(1, 1, TimeUnit.SECONDS)                             // 设置每秒建立一个window,收集2秒的数据
        //  .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())     // 指定在 newThread 线程中
            .subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    t.observeOn(Schedulers.newThread())
                        .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(7-1): " + t + " , ThreadID: "+ Thread.currentThread().getId());
                            }
                        });
                }
            });
        
        // 7.2 skip > timespan: 两个窗口可能会有 skip-timespan 项数据丢失
        Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .window(1, 2, TimeUnit.SECONDS)                             // 设置每秒建立一个window,收集2秒的数据
        //  .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())     // 指定在 newThread 线程中
            .subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    t.observeOn(Schedulers.newThread())
                        .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(7-2): " + t + " , ThreadID: "+ Thread.currentThread().getId());
                            }
                        });
                }
            });
        
        // 7.3 skip < timespan: 两个窗口可能会有 timespan-skip 项数据重叠
        Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .window(2, 1, TimeUnit.SECONDS)                             // 设置每秒建立一个window,收集2秒的数据
        //  .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())     // 指定在 newThread 线程中
            .subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    t.observeOn(Schedulers.newThread())
                        .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(7-3): " + t + " , ThreadID: "+ Thread.currentThread().getId());
                            }
                        });
                }
            });

输出:

--> accept window(7-1): 1 , ThreadID: 11
--> accept window(7-1): 2 , ThreadID: 11
--> accept window(7-1): 3 , ThreadID: 14
--> accept window(7-1): 4 , ThreadID: 15
--> accept window(7-1): 5 , ThreadID: 17
----------------------------------------------------------------------
--> accept window(7-2): 1 , ThreadID: 11
--> accept window(7-2): 3 , ThreadID: 14
--> accept window(7-2): 5 , ThreadID: 15
----------------------------------------------------------------------
--> accept window(7-3): 1 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 17

Javadoc: window(timespan, timeskip, TimeUnit)
Javadoc: window(timespan, timeskip, TimeUnit, scheduler)

2. GroupBy

将一个 Observable 分拆为一些 Observables 集合,它们中的每个发射原始 Observable 的一个子序列。

RxJava实现了 groupBy 操做符。它返回Observable的一个特殊子类 GroupedObservable ,实现了GroupedObservable 接口的对象有一个额外的方法 getKey ,这个 Key 用于将数据分组到指定的Observable。有一个版本的 groupBy 容许你传递一个变换函数,这样它能够在发射结果 GroupedObservable 以前改变数据项。

若是你取消订阅一个 GroupedObservable ,那个 Observable 将会终止。若是以后原始的 Observable又发射了一个与这个Observable的Key匹配的数据, groupBy 将会为这个 Key 建立一个新的 GroupedObservable。

img-GroupBy

注意: groupBy 将原始 Observable 分解为一个发射多个 GroupedObservable 的Observable,一旦有订阅,每一个 GroupedObservable 就开始缓存数据。所以,若是你忽略这 些 GroupedObservable 中的任何一个,这个缓存可能造成一个潜在的内存泄露。所以,若是你不想观察,也不要忽略 GroupedObservable 。你应该使用像 take(0) 这样会丢弃本身的缓存的操做符。

2.1 groupBy(keySelector)

GroupBy 操做符将原始 Observable 分拆为一些 Observables 集合,它们中的每个发射原始 Observable 数据序列的一个子序列。哪一个数据项由哪个 Observable 发射是由一个函数断定的,这个函数给每一项指定一个KeyKey相同的数据会被同一个 Observable 发射。还有一个 delayError 参数的方法,指定是否延迟 Error 通知的Observable。

实例代码:

// 1. groupBy(keySelector)
    // 将原始数据处理后加上分组tag,经过GroupedObservable发射分组数据
    Observable.range(1, 10)
        .groupBy(new Function<Integer, String>() {

            @Override
            public String apply(Integer t) throws Exception {
                // 不一样的key将会产生不一样分组的Observable
                return t % 2 == 0 ? "Even" : "Odd"; // 将数据奇偶数进行分组,
            }
        }).observeOn(Schedulers.newThread())
            .subscribe(new Consumer<GroupedObservable<String, Integer>>() {

                @Override
                public void accept(GroupedObservable<String, Integer> grouped) throws Exception {
                    // 获得每一个分组数据的的Observable
                    grouped.subscribe(new Consumer<Integer>() {

                        @Override
                        public void accept(Integer t) throws Exception {
                            // 获得数据
                            System.out.println("--> accept groupBy(1):   groupKey: " + grouped.getKey() + ", value: " + t);
                        }
                    });
                }
            });

输出:

--> accept groupBy(1):   groupKey: Odd, value: 1
--> accept groupBy(1):   groupKey: Odd, value: 3
--> accept groupBy(1):   groupKey: Odd, value: 5
--> accept groupBy(1):   groupKey: Odd, value: 7
--> accept groupBy(1):   groupKey: Odd, value: 9
--> accept groupBy(1):   groupKey: Even, value: 2
--> accept groupBy(1):   groupKey: Even, value: 4
--> accept groupBy(1):   groupKey: Even, value: 6
--> accept groupBy(1):   groupKey: Even, value: 8
--> accept groupBy(1):   groupKey: Even, value: 10

Javadoc: groupBy(keySelector)
Javadoc: groupBy(keySelector, delayError)

2.2 groupBy(keySelector, valueSelector)

GroupBy 操做符经过 keySelector 将原始 Observable 按照 Key 分组,产生不一样的 Observable,再经过 valueSelector 对原始的数据进行处理,在发送每个被处理完成的数据。

实例代码:

// 2. groupBy(Function(T,R),Function(T,R))
    // 第一个func对原数据进行分组处理(仅仅分组添加key,不处理原始数据),第二个func对原始数据进行处理
    Observable.range(1, 10)
        .groupBy(new Function<Integer, String>() {

            @Override
            public String apply(Integer t) throws Exception {
                // 对原始数据进行分组处理
                return t % 2 == 0 ? "even" : "odd";
            }
        },new Function<Integer, String>() {

            @Override
            public String apply(Integer t) throws Exception {
                // 对原始数据进行数据转换处理
                return t + " is " + (t % 2 == 0 ? "even" : "odd");
            }
            }).observeOn(Schedulers.newThread()).subscribe(new Consumer<GroupedObservable<String, String>>() {

                @Override
                public void accept(GroupedObservable<String, String> grouped) throws Exception {
                    grouped.subscribe(new Consumer<String>() {

                        @Override
                        public void accept(String t) throws Exception {
                            // 接受最终的分组处理以及原数据处理后的数据
                            System.out.println("--> accept groupBy(2):   groupKey = " + grouped.getKey()
                                    + ", value = " + t);
                        }
                    });
                }
            });

输出:

--> accept groupBy(2):   groupKey = odd, value = 1 is odd
--> accept groupBy(2):   groupKey = odd, value = 3 is odd
--> accept groupBy(2):   groupKey = odd, value = 5 is odd
--> accept groupBy(2):   groupKey = odd, value = 7 is odd
--> accept groupBy(2):   groupKey = odd, value = 9 is odd
--> accept groupBy(2):   groupKey = even, value = 2 is even
--> accept groupBy(2):   groupKey = even, value = 4 is even
--> accept groupBy(2):   groupKey = even, value = 6 is even
--> accept groupBy(2):   groupKey = even, value = 8 is even
--> accept groupBy(2):   groupKey = even, value = 10 is even

Javadoc: groupBy(keySelector, valueSelector)
Javadoc: groupBy(keySelector, valueSelector, delayError)
Javadoc: groupBy(keySelector, valueSelector, delayError, bufferSize)

3. Scan

连续地对数据序列的每一项应用一个函数,而后连续发射结果。

3.1 scan(accumulator)

Scan 操做符对原始 Observable 发射的第一项数据应用一个函数,而后将那个函数的结果做为 本身的第一项数据发射。它将函数的结果同第二项数据一块儿填充给这个函数来产生它本身的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操做符在某些状况下被叫作 accumulator

img-scan(accumulator)
解析: 先发送原始数据第一项数据,而后将这个数据与下一个原始数据做为参数传递给 accumulator, 处理后发送这个数据,并与下一个原始数据一块儿传递到下一次 accumulator ,直到数据序列结束。相似一个累积的过程

实例代码:

// 1. scan(BiFunction(Integer sum, Integer t2)) 
        // 接受数据序列,从第二个数据开始,每次会将上次处理数据和如今接受的数据进行处理后发送
        Observable.range(1, 10)
            .scan(new BiFunction<Integer, Integer, Integer>() {
                
                @Override
                public Integer apply(Integer LastItem, Integer item) throws Exception {
                    System.out.println("--> apply: LastItem = " + LastItem + ", CurrentItem = " + item);
                    return LastItem + item; // 实现求和操做
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept scan(1): " + t);
                    
                }
            });

输出:

--> accept scan(1): 1
--> apply: LastItem = 1, CurrentItem = 2
--> accept scan(1): 3
--> apply: LastItem = 3, CurrentItem = 3
--> accept scan(1): 6
--> apply: LastItem = 6, CurrentItem = 4
--> accept scan(1): 10
--> apply: LastItem = 10, CurrentItem = 5
--> accept scan(1): 15
--> apply: LastItem = 15, CurrentItem = 6
--> accept scan(1): 21
--> apply: LastItem = 21, CurrentItem = 7
--> accept scan(1): 28
--> apply: LastItem = 28, CurrentItem = 8
--> accept scan(1): 36
--> apply: LastItem = 36, CurrentItem = 9
--> accept scan(1): 45
--> apply: LastItem = 45, CurrentItem = 10
--> accept scan(1): 55

Javadoc: scan(accumulator)

3.2 scan(initialValue, accumulator)

有一个 scan 操做符的方法,你能够传递一个种子值给累加器函数的第一次调用(Observable 发射的第一项数据)。若是你使用这个版本,scan 将发射种子值做为本身的第一项数据。

注意: 传递 null 做为种子值与不传递是不一样的,null 种子值是合法的。

img-scan(initialValue, accumulator)

解析: 指定初始种子值,第一次发送种子值,后续发送原始数据序列以及累计处理数据。

实例代码:

// 2. scan(R,Func2)
    // 指定初始种子值,第一次发送种子值,后续发送原始数据序列以及累计处理数据
    Observable.range(1, 10)
        .scan(100, new BiFunction<Integer, Integer, Integer>() {    // 指定初始种子数据为100

            @Override
            public Integer apply(Integer lastValue, Integer item) throws Exception {
                System.out.println("--> apply: lastValue = " + lastValue + ", item = " + item);
                return lastValue + item;    // 指定初值的求和操做
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept sacn(2) = " + t);
            }
        });

输出:

--> accept sacn(2) = 100
--> apply: lastValue = 100, item = 1
--> accept sacn(2) = 101
--> apply: lastValue = 101, item = 2
--> accept sacn(2) = 103
--> apply: lastValue = 103, item = 3
--> accept sacn(2) = 106
--> apply: lastValue = 106, item = 4
--> accept sacn(2) = 110
--> apply: lastValue = 110, item = 5
--> accept sacn(2) = 115
--> apply: lastValue = 115, item = 6
--> accept sacn(2) = 121
--> apply: lastValue = 121, item = 7
--> accept sacn(2) = 128
--> apply: lastValue = 128, item = 8
--> accept sacn(2) = 136
--> apply: lastValue = 136, item = 9
--> accept sacn(2) = 145
--> apply: lastValue = 145, item = 10
--> accept sacn(2) = 155

注意: 这个操做符默认不在任何特定的调度器上执行。
Javadoc: scan(initialValue, accumulator)

4. Cast

Cast 将原始Observable发射的每一项数据都强制转换为一个指定的类型,而后再发射数据,它是 map 的一个特殊版本。转换失败会有Error通知。

4.1 cast(clazz)

将原始数据强制转换为指定的 clazz 类型,若是转换成功发送转换后的数据,不然发送Error通知。通常用于 数据类型的转换数据实际类型的检查(多态)

img-cast(clazz)

实例代码:

//  cast(clazz) 
    // 1. 基本类型转换
    Observable.range(1, 5)
        .cast(Integer.class)
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("-- accept cast(1): " + t);
            }
        });
            
    // 2. 转换失败通知
    System.out.println("------------------------------------");
    Observable.just((byte)1)
        .cast(Integer.class)
        .subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }

            @Override
            public void onNext(Integer t) {
                System.out.println("--> onNext(2) = " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2) = " + e.toString());
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete(2)");
            }
        });
    
    System.out.println("------------------------------------");
    class Animal{
        public int id;
    }

    class Dog extends Animal{
        public String name;

        @Override
        public String toString() {
            return "Dog [name=" + name + ", id=" + id + "]";
        }
    }
    
    //  3. 多态转换,检查数据的实际类型
    Animal animal = new Dog();
    animal.id = 666;
    Observable.just(animal)
        .cast(Dog.class)
        .subscribe(new Consumer<Dog>() {

            @Override
            public void accept(Dog t) throws Exception {
                System.out.println("--> accept cast(3): " + t);
            }
        });

输出:

-- accept cast(1): 1
-- accept cast(1): 2
-- accept cast(1): 3
-- accept cast(1): 4
-- accept cast(1): 5
------------------------------------
--> onSubscribe(2)
--> onError(2) = java.lang.ClassCastException: Cannot cast java.lang.Byte to java.lang.Integer
------------------------------------
--> accept cast(3): Dog [name=null, id=666]

Javadoc: cast(clazz)

小结:

在实际开发场景中,好比网络数据请求场景,原始的数据格式或类型可能并不知足开发的实际须要,须要对数据进行处理。数据变换操做在实际开发场景中仍是很是多的,因此数据的变换是很是重要的。使用Rx的数据变换操做能够轻松完成大多数场景的数据变换操做,提升开发效率。

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

相关文章
相关标签/搜索