RxJava操做符(09-算术/聚合操做&链接操做)

版权声明:本文为openXu原创文章【openXu的博客】,未经博主容许不得以任何形式转载git

目录:github




#算术&聚合

1. Count

  Count操做符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量。
   若是原始Observable发生错误终止,Count不发射数据而是直接传递错误通知。若是原始Observable永远不终止,Count既不会发射数据也不会终止。web

    这里写图片描述

示例代码:缓存

Observable.from(new String[] { "one", "two", "three" })
        .count()
        .subscribe(integer->Log.v(TAG, "count:"+integer));

Observable.from(new String[] { "one", "two", "three" })
        .countLong()
        .subscribe(aLong->Log.v(TAG, "countLong:"+aLong));

输出:ide

count:3
countLong:3svg


2. Concat

  concat操做符会依次发射多个Observable的数据,第一个Observable发射的全部数据在第二个Observable发射的任何数据前面,以此类推,直到前面一个Observable终止,Concat才会订阅额外的一个Observable。
Merge操做符也差很少,它结合两个或多个Observable的发射物,可是数据可能交错,而Concat不会让多个Observable的发射物交错。函数

    这里写图片描述

示例代码:.net

//还有一个实例方法叫concatWith,这二者是等价的:Observable.concat(a,b)和a.concatWith(b)
Observable.concat(
        Observable.interval(100,TimeUnit.MILLISECONDS).take(4),
        Observable.interval(200,TimeUnit.MILLISECONDS).take(5))
        .subscribe(aLong -> Log.v(TAG, "concat:"+aLong));

输出:code

concat:0
concat:1
concat:2
concat:3orm

concat:0
concat:1
concat:2
concat:3
concat:4


## 3. Reduce   Reduce操做符对原始Observable发射数据的第一项应用一个函数,而后再将这个函数的返回值与第二项数据一块儿传递给函数,以此类推,持续这个过程知道原始Observable发射它的最后一项数据并终止,此时Reduce返回的Observable发射这个函数返回的最终值。   注意若是原始Observable没有发射任何数据,reduce抛出异常IllegalArgumentException。   在其它场景中,这种操做有时被称为累积,汇集,压缩,折叠,注射等。

    这里写图片描述

示例代码:

Observable.just(1,2,3,4)
        .reduce(new Func2<Integer, Integer, Integer>() {
            //integer为前面几项只和,integer2为当前发射的数据
            @Override
            public Integer call(Integer integer, Integer integer2) {
                Log.v(TAG, "integer:"+integer+"  integer2:"+integer2);
                return integer+integer2;
            }
        }).subscribe(integer -> Log.v(TAG, "reduce:"+integer));

输出:

integer:1 integer2:2
integer:3 integer2:3
integer:6 integer2:4
reduce:10



#链接操做

1. Publish

  Publish 操做符将普通的Observable转换为可链接的Observable(ConnectableObservable),ConnectableObservable是Observable的子类。 可链接的Observable (connectable Observable)与普通的Observable差很少,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操做符时才会开始,这样能够更灵活的控制发射数据的时机。
  注意:若是一个ConnectableObservable已经开始发射数据,再对其进行订阅只能接受以后发射的数据,订阅以前已经发射过的数据就丢失了。

    这里写图片描述

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(5);
//使用publish操做符将普通Observable转换为可链接的Observable
ConnectableObservable<Long> connectableObservable = obs.publish();
//第一个订阅者订阅,不会开始发射数据
connectableObservable.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "1.onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "1.onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "1.onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});
//开始发射数据
Log.v(TAG, "start time:" + sdf.format(new Date()));
connectableObservable.connect();
//第二个订阅者延迟2s订阅,这将致使丢失前面2s内发射的数据
connectableObservable
        .delaySubscription(2, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "2.onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "2.onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "2.onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});

/*
输出:
start time:23:01:30
1.onNext:0->time:23:01:31
1.onNext:1->time:23:01:32
2.onNext:1->time:23:01:32
1.onNext:2->time:23:01:33
2.onNext:2->time:23:01:33
1.onNext:3->time:23:01:34
2.onNext:3->time:23:01:34
1.onNext:4->time:23:01:35
2.onNext:4->time:23:01:35
1.onCompleted
2.onCompleted
 */

2. Connect

  connect是ConnectableObservable接口的一个方法,它的做用就是让ConnectableObservable开始发射数据(即便没有任何订阅者订阅这个Observable,调用connect都会开始发射数据)。
  connect方法返回一个Subscription对象,能够调用它的unsubscribe方法让Observable中止发射数据给观察者。

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS);
//使用publish操做符将普通Observable转换为可链接的Observable
ConnectableObservable<Long> connectableObservable = obs.publish();
//开始发射数据
Subscription sub = connectableObservable.connect();
//第二个订阅者延迟2s订阅,这将致使丢失前面2s内发射的数据
connectableObservable
        .delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                Log.v(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.v(TAG, "onError");
            }
            @Override
            public void onNext(Long along) {
                Log.v(TAG, "onNext:"+along+"->time:"+ sdf.format(new Date()));
            }
        });

new Timer().schedule(new TimerTask() {
    @Override
    public void run() {
        //6s以后中止发射数据
        sub.unsubscribe();
    }
},6000);

/*
输出:
onNext:3->time:23:10:49
onNext:4->time:23:10:50
onNext:5->time:23:10:51
 */

3. RefCount

  RefCount操做符能够看作是Publish的逆向,它能将一个ConnectableObservable对象再从新转化为一个普通的Observable对象,若是转化后有订阅者对其进行订阅将会开始发射数据,后面若是有其余订阅者订阅,将只能接受后面的数据(这也是转化以后的Observable 与普通的Observable的一点区别 )。
  还有一个操做符叫share,它的做用等价于对一个Observable同时应用publish和refCount操做。

    这里写图片描述

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(4);
//使用publish操做符将普通Observable转换为可链接的Observable
ConnectableObservable<Long> connectableObservable = obs.publish();
//refCount:将ConnectableObservable转化为普通Observable
Observable obsRefCount = connectableObservable.refCount();

obs.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "普通obs1:onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "普通obs1:onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "普通obs1:onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});
obs.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "普通obs2:onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "普通obs2:onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "普通obs2:onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});

obsRefCount.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "obsRefCount1:onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "obsRefCount1:onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "obsRefCount1:onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});
obsRefCount.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                Log.v(TAG, "obsRefCount2:onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.v(TAG, "obsRefCount2:onError");
            }
            @Override
            public void onNext(Long along) {
                Log.v(TAG, "obsRefCount2:onNext:"+along+"->time:"+ sdf.format(new Date()));
            }
        });

/*
输出:
普通obs1:onNext:0->time:23:28:28
普通obs1:onNext:1->time:23:28:29
普通obs1:onNext:2->time:23:28:30
普通obs1:onNext:3->time:23:28:31
普通obs1:onCompleted

普通obs2:onNext:0->time:23:28:31
普通obs2:onNext:1->time:23:28:32
普通obs2:onNext:2->time:23:28:33
普通obs2:onNext:3->time:23:28:34
普通obs2:onCompleted

obsRefCount1:onNext:0->time:23:28:28
obsRefCount1:onNext:1->time:23:28:29
obsRefCount1:onNext:2->time:23:28:30
obsRefCount1:onNext:3->time:23:28:31
obsRefCount1:onCompleted

obsRefCount2:onNext:3->time:23:28:31
obsRefCount2:onCompleted
 */

4. Replay

  经过上面的介绍咱们了解到,ConnectableObservable和普通的Observable最大的区别就是,调用Connect操做符开始发射数据,后面的订阅者会丢失以前发射过的数据。
  使用Replay操做符返回的ConnectableObservable 会缓存订阅者订阅以前已经发射的数据,这样即便有订阅者在其发射数据开始以后进行订阅也能收到以前发射过的数据。Replay操做符能指定缓存的大小或者时间,这样能避免耗费太多内存。

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
Log.v(TAG, "start time:" + sdf.format(new Date()));

//没有缓存的状况
ConnectableObservable<Long> obs = Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .publish();
obs.connect();  //开始发射数据
obs.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(aLong -> Log.v(TAG, "onNext:"+aLong+"->time:"+ sdf.format(new Date())));


//缓存一个数据
ConnectableObservable<Long> obs1 = Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .replay(1);   //缓存1个数据
obs1.connect();  //开始发射数据
obs1.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(aLong -> Log.v(TAG,
                "1.onNext:"+aLong+"->time:"+ sdf.format(new Date())));

//缓存3s内发射的数据
ConnectableObservable<Long> obs2 = Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .replay(3, TimeUnit.SECONDS);   //缓存3s
obs2.connect();  //开始发射数据
obs2.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(aLong -> Log.v(TAG,
                "2.onNext:"+aLong+"->time:"+ sdf.format(new Date())));

/*
输出:
start time:14:25:51
onNext:3->time:14:25:55
onNext:4->time:14:25:56

1.onNext:2->time:14:25:54
1.onNext:3->time:14:25:55
1.onNext:4->time:14:25:56

2.onNext:0->time:14:25:54
2.onNext:1->time:14:25:54
2.onNext:2->time:14:25:54
2.onNext:3->time:14:25:55
2.onNext:4->time:14:25:56
 */

从log能够看出,没有缓存机制的只能收到3.4;缓存1个数据的能收到前面已经发射过的2;缓存3s的将全部已经发射的数据都缓存起来了,因此数据都能收到。缓存的数据在订阅者订阅以后立马发射给订阅者。


#源码下载:

https://github.com/openXu/RxJavaTest