RxJava(三):建立操做符

博客主页java

RxJava 的建立操做符主要包括以下内容:react

  • create —— 使用一个函数从头建立一个 Observable
  • defer —— 只有当订阅者订阅才建立 Observable ,为每一个订阅建立一个新的 Observable
  • empty —— 建立一个什么都不作直接通知完成的 Observable
  • never —— 建立一个不发射任何数据 Observable
  • error —— 建立一个什么都不作直接通知错误的 Observable
  • from —— 将一个 Iterable、一个 Future 或者一个数组转换成 Observable
  • interval —— 建立一个按照给定的时间间隔发射整数序列的 Observable
  • just —— 将一个或多个对象转换成发射这个或这些对象的一个 Observable
  • range —— 建立一个发射指定范围的整数序列的 Observable
  • timer —— 建立一个在给定的延时以后发射单个数据的 Observable
  • repeat —— 建立一个发射特定数据重复屡次的 Observable

1. create、just 和 from

1.1 create

使用一个函数从头开始建立一个 Observable
segmentfault

咱们可使用 create 操做符从头开始建立一个 Observable 给这个操做符传递一个接受观察者做为参数的函数,编写这个函数让它的行为表现为一个 Observable ——恰当地调用观察者 onNext、onError、onComplete 方法。一个形式正确的有限 Observable 必须尝试调用观察者 onComplete() 一次或者它的 onError() 一次,并且此后不能再调用观察者的任何其余方法。数组

RxJava 建议咱们在传递给 create 方法的函数时,先检查一下观察者的 isDisposed 状态,以便在没有观察者的时候,让咱们的 Observable 中止发射数据,防止运行昂贵的运算。缓存

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        try {
            if (!emitter.isDisposed()) {
                for (int i = 0; i < 5; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        } catch (Exception e) {
            emitter.onError(e);
        }
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next-> " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error-> " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 执行结果
 Next-> 0
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4
 Complete.

1.2 just

建立一个发射指定值的 Observable

just 将单个数据转换为发射这个单个数据的 Observable数据结构

Observable.just("Observable#just")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 执行结果
 Next: Observable#just

just 相似于 from,可是 from 会将数组或 Iterable 的数据取出而后逐个发射,而 just 只是简单地原样发射,将数组或 Iterable 看成单个数据。app

它能够接受一至十个参数,返回一个按参数列表顺序发射这些数据的 Observableide

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error-> " + throwable.getMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next: 1
 Next: 2
 Next: 3
 Next: 4
 Next: 5
 Next: 6
 Next: 7
 Next: 8
 Next: 9
 Next: 10
 Complete.

1.3 from

from 能够将其余种类的对象和数据类型转换为 Observable

当咱们使用 Observable 时,若是要处理的数据均可以转换成 Observables ,而不是须要混合使用 Observables 和其余类型的数据,会很是方便。这让咱们在数据流的整个生命周期中,可使用一组统一的操做符来管理它们。函数

例如, Iterable 能够当作同步的 Observable; Future 能够当作老是只发射单个数据的 Observable。经过显式地将那些数据转换为 Observables ,咱们能够像使用 Observable 样与它们交互。spa

所以,大部分 ReactiveX 实现都提供了将特定语言的对象和数据结构转换为 Observables 的方法。

RxJava 中, from 操做符能够将 Future、Iterable 和数组转换成 Obseruable 。对于 Iterable
和数组,产生的 Observable 会发射 Iterable 或数组的每一项数据。

fromArray

Observable.fromArray("Observable", "fromArray")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 执行结果
 Next: Observable
 Next: fromArray

fromIterable

List<Integer> items = new ArrayList<>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}

Observable.fromIterable(items)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error-> " + throwable.getMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next: 0
 Next: 1
 Next: 2
 Next: 3
 Next: 4
 Complete.

Future ,它会发射 Future.get() 方法返回的单个数据。

ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());

Observable.fromFuture(future)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

private class MyCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        Log.d(TAG, "模拟一些耗时的任务...");
        Thread.sleep(2000L);
        return "OK";
    }
}

// 执行结果
15:54:06.832  模拟一些耗时的任务...
15:54:08.833  Next: OK

fromFuture 方法有一个可接受两个可选参数的版本,分别指定超时时长和时间单位。若是过了指定的时长, Future 尚未返回一个值,那么这个 Observable 就会发射错误通知井终止。 下面的代码,把超时时间设置为 1s, Observable.fromFuture(future, 1, TimeUnit.SECONDS),执行结果以下:

16:00:44.260  模拟一些耗时的任务...
16:00:45.367  io.reactivex.exceptions.OnErrorNotImplementedException

2. repeat

建立一个发射特定数据重复屡次的 Observable

repeat 会重复地发射数据。某些实现容许咱们重复发射某个数据序列,还有一些容许咱们限制重复的次数。

repeat 不是建立一个 Observable,而是重复发射原始 Observable 的数据序列,这个序列或者是无限的,或者是经过 repeat(n) 指定的重复次数。

Observable.just("hello, repeat")
        .repeat(3) // 重复3次
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next-> " + s);
            }
        });

在RxJava 2.x 中还有两个 repeat 相关的操做符: repeatWhen 和 repeatUntil

2.1 repeatWhen

repeatWhen 不是缓存和重放原始 Observable 的数据序列,而是有条件地从新订阅和发射原来的 Observable

将原始 Observable 终止通知(完成或错误)看成一个 void 数据传递给一个通知处理器,以此来决定是否要从新订阅和发射原来的 Observable。这个通知处理器就像一个 Observable 操做符,接受一个发射 void 通知的 Observable 做为输入,返回一个发射 void 数据(意思是,从新订阅和发射原始 Observable )或者直接终止(即便用 repeatWhen 终止发射数据)的 Observable

Observable.range(0, 5)
        .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                return Observable.timer(10, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next-> " + integer);
    }
});

// 执行结果
16:35:18.627  Next-> 0
16:35:18.627  Next-> 1
16:35:18.627  Next-> 2
16:35:18.627  Next-> 3
16:35:18.627  Next-> 4
16:35:28.627  Next-> 0
16:35:28.627  Next-> 1
16:35:28.627  Next-> 2
16:35:28.627  Next-> 3
16:35:28.627  Next-> 4

会先发射 0 到 4 这5个数据 ,因为使用了 repeatWhen 操做符,所以在 10s 以后还会再发射一次这些数据。

2.2 repeatUntil

repeatUntil 是 RxJava 2.x 新增的操做符,表示直到某个条件就再也不重复发射数据。当 BooleanSupplier的 getAsBoolean() 返回 false 时,表示重复发射上游的 Observable,当 getAsBoolean() 为 true 时,表示停止重复发射上游的 Observable

final long startTimeMillis = System.currentTimeMillis();
Observable.interval(500, TimeUnit.MILLISECONDS)
        .take(5)
        .repeatUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                return System.currentTimeMillis() - startTimeMillis > 5000;
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "Next-> " + aLong);
    }
});

// 执行结果
 Next-> 0
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4
 Next-> 0
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4

执行的结果里打印了两遍 0 到 4 ,之因此再也不打印第三遍是由于符合了 System.currentTimeMillis() - startTimeMillis > 5000 这个条件。

3. defer、interval 和 timer

3.1 defer

直到有观察者订阅时才建立 Observable ,而且为每一个观察者建立一个全新的 Observable

defer 操做符会一直等待直到有观察者订阅它,而后它使用 Observable 工厂方法生成一个 Observable。它对每一个观察者都这样作,所以尽管每一个订阅者都觉得本身订阅的是同一个 Observable ,但事实上每一个订阅者获取的是它们本身单独的数据序列。

在某些状况下,直到最后一分钟(订阅发生时)才生成 Observable ,以确保 Observable 包含最新的数据。

Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
    @Override
    public ObservableSource<? extends String> call() throws Exception {
        return Observable.just("hello, defer");
    }
});

observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next: " + s);
    }
});

// 执行结果
 Next: hello, defer

3.2 interval

建立一个按固定时间间隔发射整数序列的 Observable

interval 操做符返回一个 Observable ,它按固定的时间间隔发射一个无限递增的整数序列。

interval 接受一个表示时间间隔的参数和一个表示时间单位的参数。 interval 默认在 computation 调度器上执行

Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next: " + aLong);
            }
        });

// 执行结果
 Next: 0
 Next: 1
 Next: 2
 Next: 3
 ......

每隔 1秒 打印一个数字。

3.3 timer

建立一个 Observable, 它在一个给定的延迟后发射一个特殊的值

timer 操做符建立一个在给定的时间段以后返回一个特殊值的 Observable

timer 返回一个 Observable,它在延迟一段给定的时间后发射一个简单的数字 0。 timer 操做符默认在 computation 调度器上执行。

Observable.timer(2, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                // 2秒后打印
                Log.d(TAG, "Next: " + aLong);
            }
        });

// 执行结果
 Next: 0

若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)

相关文章
相关标签/搜索