【知识整理】这多是最好的RxJava 2.x 入门教程(三)

 

 

这多是最好的RxJava 2.x入门教程系列专栏

文章连接:

这多是最好的RxJava 2.x 入门教程(完结版)【强力推荐】

这多是最好的RxJava 2.x 入门教程(一)

这多是最好的RxJava 2.x 入门教程(二)

这多是最好的RxJava 2.x 入门教程(三)

GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples

为了知足你们的饥渴难耐,GitHub将同步更新代码,主要包含基本的代码封装,RxJava 2.x全部操做符应用场景介绍和实际应用场景,后期除了RxJava可能还会增添其余东西,总之,GitHub上的Demo专为你们倾心打造。传送门:https://github.com/nanchen2251/RxJava2Exampleshtml

 

1、前言

      年轻的老司机们,我这么勤的为你们分享,却少有催更的,好吧。其实写这个系列不是为了吸睛,那我们继续写咱们的 RxJava 2.x 的操做符。react

2、正题

七、distinct

       这个操做符很是的简单、通俗、易懂,就是简单的去重嘛,我甚至都不想贴代码,但人嘛,总得锲而不舍。git

1 Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
2                 .distinct()
3                 .subscribe(new Consumer<Integer>() {
4                     @Override
5                     public void accept(@NonNull Integer integer) throws Exception {
6                         mRxOperatorsText.append("distinct : " + integer + "\n");
7                         Log.e(TAG, "distinct : " + integer + "\n");
8                     }
9                 });

输出:github

Log 日志显而易见,咱们在通过dinstinct() 后接收器接收到的事件只有1,2,3,4,5了。app

八、Filter

      信我,Filter 你会很经常使用的,它的做用也很简单,过滤器嘛。能够接受一个参数,让其过滤掉不符合咱们条件的值ide

 1 Observable.just(1, 20, 65, -5, 7, 19)
 2                 .filter(new Predicate<Integer>() {
 3                     @Override
 4                     public boolean test(@NonNull Integer integer) throws Exception {
 5                         return integer >= 10;
 6                     }
 7                 }).subscribe(new Consumer<Integer>() {
 8             @Override
 9             public void accept(@NonNull Integer integer) throws Exception {
10                 mRxOperatorsText.append("filter : " + integer + "\n");
11                 Log.e(TAG, "filter : " + integer + "\n");
12             }
13         });

输出:spa

能够看到,咱们过滤器舍去了小于10的值,因此最好的输出只有20,65,19。线程

 

九、buffer

 buffer 操做符接受两个参数,buffef(count,skip),做用是将 Observable 中的数据按 skip (步长) 分红最大不超过 count 的 buffer ,而后生成一个 Observable 。也许你还不太理解,咱们能够经过咱们的示例图和示例代码来进一步深化它。3d

 1 Observable.just(1, 2, 3, 4, 5)
 2                 .buffer(3, 2)
 3                 .subscribe(new Consumer<List<Integer>>() {
 4                     @Override
 5                     public void accept(@NonNull List<Integer> integers) throws Exception {
 6                         mRxOperatorsText.append("buffer size : " + integers.size() + "\n");
 7                         Log.e(TAG, "buffer size : " + integers.size() + "\n");
 8                         mRxOperatorsText.append("buffer value : ");
 9                         Log.e(TAG, "buffer value : " );
10                         for (Integer i : integers) {
11                             mRxOperatorsText.append(i + "");
12                             Log.e(TAG, i + "");
13                         }
14                         mRxOperatorsText.append("\n");
15                         Log.e(TAG, "\n");
16                     }
17                 });

输出:日志

如图,咱们把1,2,3,4,5依次发射出来,通过buffer 操做符,其中参数 count 为3, skip 为2,而咱们的输出 依次是 123,345,5。显而易见,咱们 buffer 的第一个参数是count,表明最大取值,在事件足够的时候,通常都是取count个值,而后每次跳过skip个事件。其实看 Log 日志,我相信你们都明白了。

 

十、timer

timer 颇有意思,至关于一个定时任务。在1.x 中它还能够执行间隔逻辑,但在2.x中此功能被交给了 interval,下一个会介绍。但须要注意的是,timer 和 interval 均默认在新线程。

 1 mRxOperatorsText.append("timer start : " + TimeUtil.getNowStrTime() + "\n");
 2         Log.e(TAG, "timer start : " + TimeUtil.getNowStrTime() + "\n");
 3         Observable.timer(2, TimeUnit.SECONDS)
 4                 .subscribeOn(Schedulers.io())
 5                 .observeOn(AndroidSchedulers.mainThread()) // timer 默认在新线程,因此须要切换回主线程
 6                 .subscribe(new Consumer<Long>() {
 7                     @Override
 8                     public void accept(@NonNull Long aLong) throws Exception {
 9                         mRxOperatorsText.append("timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
10                         Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
11                     }
12                 });

输出:

显而易见,当咱们两次点击按钮触发这个事件的时候,接收被延迟了2秒。

 

十一、interval

如同咱们上面可说,interval 操做符用于间隔时间执行某个操做,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。

 

 1 mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n");
 2         Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
 3         Observable.interval(3,2, TimeUnit.SECONDS)
 4                 .subscribeOn(Schedulers.io())
 5                 .observeOn(AndroidSchedulers.mainThread()) // 因为interval默认在新线程,因此咱们应该切回主线程
 6                 .subscribe(new Consumer<Long>() {
 7                     @Override
 8                     public void accept(@NonNull Long aLong) throws Exception {
 9                         mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
10                         Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
11                     }
12                 });

输出:

如同 Log 日志同样,第一次延迟了3秒后接收到,后面每次间隔了2秒。

然而,心细的小伙伴可能会发现,因为咱们这个是间隔执行,因此当咱们的Activity都销毁的时候,实际上这个操做还依然在进行,因此,咱们得花点当心思让咱们在不须要它的时候干掉它。查看源码发现,咱们subscribe(Cousumer<? super T> onNext)返回的是Disposable,咱们能够在这上面作文章。

 1 @Override
 2     protected void doSomething() {
 3         mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n");
 4         Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
 5         mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS)
 6                 .subscribeOn(Schedulers.io())
 7                 .observeOn(AndroidSchedulers.mainThread()) // 因为interval默认在新线程,因此咱们应该切回主线程
 8                 .subscribe(new Consumer<Long>() {
 9                     @Override
10                     public void accept(@NonNull Long aLong) throws Exception {
11                         mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
12                         Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
13                     }
14                 });
15     }
16 
17     @Override
18     protected void onDestroy() {
19         super.onDestroy();
20         if (mDisposable != null && !mDisposable.isDisposed()) {
21             mDisposable.dispose();
22         }
23     }

哈哈,再次验证,解决了咱们的疑惑。

 

十二、doOnNext

其实以为 doOnNext 应该不算一个操做符,但考虑到其经常使用性,咱们仍是咬咬牙将它放在了这里。它的做用是让订阅者在接收到数据以前干点有意思的事情。假如咱们在获取到数据以前想先保存一下它,无疑咱们能够这样实现。

 1 Observable.just(1, 2, 3, 4)
 2                 .doOnNext(new Consumer<Integer>() {
 3                     @Override
 4                     public void accept(@NonNull Integer integer) throws Exception {
 5                         mRxOperatorsText.append("doOnNext 保存 " + integer + "成功" + "\n");
 6                         Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
 7                     }
 8                 }).subscribe(new Consumer<Integer>() {
 9             @Override
10             public void accept(@NonNull Integer integer) throws Exception {
11                 mRxOperatorsText.append("doOnNext :" + integer + "\n");
12                 Log.e(TAG, "doOnNext :" + integer + "\n");
13             }
14         });

输出:

 

1三、skip

 skip 颇有意思,其实做用就和字面意思同样,接受一个 long 型参数 count ,表明跳过 count 个数目开始接收。

1  Observable.just(1,2,3,4,5)
2                 .skip(2)
3                 .subscribe(new Consumer<Integer>() {
4                     @Override
5                     public void accept(@NonNull Integer integer) throws Exception {
6                         mRxOperatorsText.append("skip : "+integer + "\n");
7                         Log.e(TAG, "skip : "+integer + "\n");
8                     }
9                 });

输出:

 

 

1四、take

take,接受一个 long 型参数 count ,表明至多接收 count 个数据。

1  Flowable.fromArray(1,2,3,4,5)
2                 .take(2)
3                 .subscribe(new Consumer<Integer>() {
4                     @Override
5                     public void accept(@NonNull Integer integer) throws Exception {
6                         mRxOperatorsText.append("take : "+integer + "\n");
7                         Log.e(TAG, "accept: take : "+integer + "\n" );
8                     }
9                 });

输出:

 

1五、just

 just,没什么好说的,其实在前面各类例子都说明了,就是一个简单的发射器依次调用onNext()方法。

 1 Observable.just("1", "2", "3")
 2                 .subscribeOn(Schedulers.io())
 3                 .observeOn(AndroidSchedulers.mainThread())
 4                 .subscribe(new Consumer<String>() {
 5                     @Override
 6                     public void accept(@NonNull String s) throws Exception {
 7                         mRxOperatorsText.append("accept : onNext : " + s + "\n");
 8                         Log.e(TAG,"accept : onNext : " + s + "\n" );
 9                     }
10                 });

 输出:

 

3、写在最后

      好吧,本节先讲到这里,下节咱们仍是继续讲简单的操做符,虽然咱们的教程比较枯燥,如今也不那么受人关注,但后面的系列我相信你们必定会很是喜欢的,咱们下期再见!

     代码所有同步到GitHub:https://github.com/nanchen2251/RxJava2Examples

相关文章
相关标签/搜索