本文是 "RxJava 沉思录" 系列的第三篇分享。本系列全部分享:html
在上一篇分享中,咱们应该已经对 Observable 在空间维度上从新组织事件的能力 印象深入了,那么天然而然的,咱们容易联想到时间维度,事实上就我我的而言,我认为 Observable 在时间维度上的从新组织事件的能力 相比较其空间维度的能力更为突出。与上一篇相似,本文接下来将经过列举真实的例子来阐述这一论点。java
这是一个比较常见的情景,用户在手机比较卡顿的时候,点击某个按钮,正常应该启动一个页面,可是手机比较卡,没有当即启动,用户就点了好几下,结果等手机回过神来的时候,就会启动好几个同样的页面。react
这个需求用 Callback 的方式比较难处理,可是相信用过 RxJava 的开发者都知道怎么处理:后端
RxView.clicks(btn)
.debounce(500, TimeUnit.MILLISECONDS)
.observerOn(AndroidSchedulers.mainThread())
.subscribe(o -> {
// handle clicks
})
复制代码
debounce
操做符产生一个新的Observable
, 这个Observable
只发射原Observable
中时间间隔小于指定阈值的最大子序列的最后一个元素。 参考资料:Debounceapi
虽然这个例子比较简单,可是它很好的表达了 Observable 能够在时间维度上对其发射的事件进行从新组织 , 从而作到以前 Callback 形式不容易作到的事情。服务器
点赞与取消点赞是社交软件上常常出现的需求,假设咱们目前有下面这样的点赞与取消点赞的代码:网络
boolean like;
likeBtn.setOnClickListener(v -> {
if (like) {
// 取消点赞
sendCancelLikeRequest(postId);
} else {
// 点赞
sendLikeRequest(postId);
}
like = !like;
});
复制代码
如下图片素材资源来自 Dribbble 函数
若是你碰巧实现了一个很是酷炫的点赞动画,用户可能会玩得不亦乐乎,这个时候可能会对后端服务器形成必定的压力,由于每次点赞与取消点赞都会发起网络请求,假如不少用户同时在玩这个点赞动画,服务器可能会不堪重负。post
和前一个例子的防抖动思路差很少,咱们首先想到须要防抖动:优化
boolean like;
PublishSubject<Boolean> likeAction = PublishSubject.create();
likeBtn.setOnClickListener(v -> {
likeAction.onNext(like);
like = !like;
});
likeAction.debounce(1000, TimeUnit.MILLISECONDS)
.observerOn(AndroidSchedulers.mainThread())
.subscribe(like -> {
if (like) {
sendCancelLikeRequest(postId);
} else {
sendLikeRequest(postId);
}
});
复制代码
写到这个份上,其实已经能够解决服务器压力过大的问题了,可是仍是有优化空间,假设当前是已赞状态,用户快速点击 2 下,按照上面的代码,仍是会发送一次点赞的请求,因为当前是已赞状态,再发送一次点赞请求是没有意义的,因此咱们优化的目标就是将这一类事件过滤掉:
Observable<Boolean> debounced = likeAction.debounce(1000, TimeUnit.MILLISECONDS);
debounced.zipWith(
debounced.startWith(like),
(last, current) -> last == current ? new Pair<>(false, false) : new Pair<>(true, current)
)
.flatMap(pair -> pair.first ? Observable.just(pair.second) : Observable.empty())
.subscribe(like -> {
if (like) {
sendCancelLikeRequest(postId);
} else {
sendLikeRequest(postId);
}
});
复制代码
zipWith
操做符能够把两个Observable
发射的相同序号(同为第 x 个)的元素,进行运算转换,获得的新元素做为新的Observable
对应序号所发射的元素。参考资料:ZipWith
上面的代码,咱们能够看到,首先咱们对事件流作了一次 debounce
操做,获得 debounced
事件流,而后咱们把 debounced
事件流和 debounced.startWith(like)
事件流作了一次 zipWith
操做。至关于新的这个 Observable
中发射的第 n 个元素(n >= 2)是由 debounced
事件流中的第 n 和 第 n-1 个元素运算获得的(新的这个 Observable
中发射的第 1 个元素是由 debounced
事件流中的第 1 个元素和原始点赞状态 like
运算而来)。
运算的结果是获得一个 Pair
对象,它是一个双布尔类型二元组,二元组第一个元素为 true 表明这个事件不应被忽略,应该被观察者观察到;若为 false 则应该被忽略。二元组的第二个元素仅在第一个元素为 true 的状况下才有意义,true 表示应该发起一次点赞操做,而 false 表示应该发起一次取消点赞操做。上面提到的“运算”具体运算的规则是,比较两个元素,若相等,则把二元组的第一个元素置为 false,若不相等,则把二元组的第一个元素置为 true, 同时把二元组的第二个元素置为 debounced
事件流发射的那个元素。
随后的 flatMap
操做符完成了两个逻辑,一是过滤掉二元组第一个元素为 false 的二元组,二是把二元组转化回最初的 Boolean
事件流。其实这个逻辑也可由 filter
和 map
两个操做符配合完成,这里为了简单用了一个操做符。
虽然上面用了很多篇幅解释了每一个操做符的意义,但其实核心思想是简单的,就是在原先 debounce
操做符的基础上,把获得的事件流里每一个元素和它的上一个元素作比较,若是这个元素和上个元素相同(例如在已赞状态下再次发起点赞操做), 就把这个元素过滤掉,这样最终的观察者里只会在在真正须要改变点赞状态的时候才会发起网络请求了。
咱们考虑用 Callback 实现相同逻辑,虽然比较本次操做与上次操做这样的逻辑经过 Callback 也能够作到,可是 debounce
这个操做符完成的任务,若是要使用 Callback 来实现就很是复杂了,咱们须要定义一个计时器,还要负责启动与关闭这个计时器,咱们的 Callback 内部会掺杂进不少和观察者自己无关的逻辑,相比 RxJava 版本的纯粹相去甚远。
首先,咱们须要定义双击事件,不妨先规定两次点击小于 500 毫秒则为一次双击事件。咱们先使用 Callback 的方式实现:
long lastClickTimeStamp;
btn.setOnClickListener(v -> {
if (System.currentTimeMillis() - lastClickTimeStamp < 500) {
// handle double click
}
});
复制代码
上面的代码很容易理解,咱们引入一个中间变量 lastClickTimeStamp
, 经过比较点击事件发生时和上一次点击事件的时间差是否小于 500 毫秒,来确认是否发生了一次双击事件。那么如何经过 RxJava 来实现呢?就和上一个例子同样,咱们能够在时间维度对 Observable
发射的事件进行从新组织,只过滤出与上次点击事件间隔小于 500 毫秒的点击事件,代码以下:
Observable<Long> clicks = RxView.clicks(btn)
.map(o -> System.currentTimeMillis())
.share();
clicks.zipWith(clicks.skip(1), (t1, t2) -> t2 - t1)
.filter(interval -> interval < 500)
.subscribe(o -> {
// handle double click
});
复制代码
咱们再一次用到了 zipWith
操做符来对事件流自身相邻的两个元素作比较,另外此次代码中使用了 share
操做符,用来保证点击事件的 Observable
被转为 Hot Observable。
在
RxJava
中,Observable
能够被分为Hot Observable
与Cold Observable
,引用《Learning Reactive Programming with Java 8》中一个形象的比喻(翻译后的意思):咱们能够这样认为,Cold Observable
在每次被订阅的时候为每个Subscriber
单独发送可供使用的全部元素,而Hot Observable
始终处于运行状态当中,在它运行的过程当中,向它的订阅者发射元素(发送广播、事件),咱们能够把Hot Observable
比喻成一个电台,听众从某个时刻收听这个电台开始就能够听到此时播放的节目以及以后的节目,可是没法听到电台此前播放的节目,而Cold Observable
就像音乐 CD ,人们购买 CD 的时间可能先后有差距,可是收听 CD 时都是从第一个曲目开始播放的。也就是说同一张 CD ,每一个人收听到的内容都是同样的, 不管收听时间早或晚。
仅仅是上面这个双击检测的例子,还不能体现 RxJava 的优越性,咱们把需求改得更复杂一点:若是用户在“短期”内连续屡次点击,只能算一次双击操做。这个需求是合理的,由于若是按照上面 Callback 的写法,虽然能够检测出双击操做,可是若是用户快速点击 n 次(间隔均小于 500 毫秒,n >= 2), 就会触发 n - 1 次双击事件,假设双击处理函数里须要发起网络请求,会对服务器形成压力。要实现这个需求其实也简单,和上一个例子相似,咱们用到了 debounce
操做符:
Observable<Object> clicks = RxView.clicks(btn).share()
clicks.buffer(clicks.debounce(500, TimeUnit.MILLISECONDS))
.filter(events -> events.size >= 2)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(o -> {
// handle double click
});
复制代码
buffer
操做符接受一个Observable
为参数,这个Observable
所发射的元素是什么不重要,重要的是这些元素发射的时间点,这些时间点会在时间维度上把原来那个Observable
所发射的元素划分为一系列元素的组,buffer
操做符返回的新的Observable
发射的元素即为那些“组”。
参考资料: Buffer
上面的代码经过 buffer
和 debounce
两个操做符很巧妙的把点击事件流转化为了咱们关心的 “短期内点击次数超过 2 次” 的事件流,并且新的事件流中任意两个相邻事件间隔一定大于 500 毫秒。
在这个例子中,若是咱们想要使用 Callback 去实现类似逻辑,代码量确定是巨大的,并且鲁棒性也没法保证。
咱们平时使用的搜索框中,经常是当用户输入一部份内容后,下方就会显示对应的搜索提示,以支付宝为例,当在搜索框输入“蚂蚁”关键词后,下方自动刷新和关键词相关的结果:
为了简化这个例子,咱们不妨定义根据关键词搜索的接口以下:
public interface Api {
@GET("path/to/api")
Observable<List<String>> queryKeyword(String keyword);
}
复制代码
查询接口如今已经肯定下来,咱们考虑一下在实现这个需求的过程当中须要考虑哪些因素:
综合考虑上面的因素之后,咱们使用 RxJava 实现的对应的代码以下:
RxTextView.textChanges(input)
.debounce(300, TimeUnit.MILLISECONDS)
.switchMap(text -> api.queryKeyword(text.toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(results -> {
// handle results
});
复制代码
switchMap
这个操做符与flatMap
操做符相似,可是区别是若是原Observable
中的两个元素,经过switchMap
操做符都转为Observable
以后,若是后一个元素对应的Observable
发射元素时,前一个元素对应的Observable
还没有发射完全部元素,那么前一个元素对应的Observable
会被自动取消订阅,还没有发射完的元素也不会体如今switchMap
操做符调用后产生的新的Observable
发射的元素中。 参考资料:SwitchMap
咱们分析上面的代码,能够发现: debounce
操做符解决了问题 1,switchMap
操做符解决了问题 2、3。这个例子能够很好的说明,RxJava 的 Observable
能够经过一系列操做符从时间的维度上从新组织事件,从而简化观察者的逻辑。这个例子若是使用 Callback 来实现,确定是十分复杂的,须要设置计时器以及一堆中间变量,观察者中也会掺杂进不少额外的逻辑,用来保证事件与事件的依赖关系。
(未完待续)
本文属于 "RxJava 沉思录" 系列,欢迎阅读本系列的其余分享:
若是您对个人技术分享感兴趣,欢迎关注个人我的公众号:麻瓜日记,不按期更新原创技术分享,谢谢!:)