今天,咱们继续跟着 RxJava-Android-Samples 的脚步,一块儿看一下RxJava2
在实战当中的应用,在这个项目中,第二个的例子的描述以下: java
2s
内连续点击了一个按钮五次,那么咱们只会收到一个“你点击了该按钮五次”的时间,而不是五个"你点击了该按钮"的事件。这个示例的目的是让咱们学会如何应用
buffer
操做符。
仔细思考了一下,在平时的项目中,咱们彷佛不会遇到须要统计一段时间内用户点击了多少次按钮这种需求。git
可是,咱们有时候会须要计算一段时间内的平均数据,例如统计一段时间内的平均温度,或者统计一段时间内的平均位置。在接触RxJava
以前,咱们通常会将这段时间内统计到的数据都暂时存起来,等到须要更新的时间点到了以后,再把这些数据结合起来,计算这些数据的平均值。github
如今,咱们就来看一下,用RxJava2
如何去实现这个需求。bash
这里,咱们经过一个Handler
循环地发送消息,实现间隔必定时间进行温度的测量,可是在测量以后,咱们并不实时地更新界面的温度显示,而是每隔3s
统计一次过去这段时间内的平均温度。dom
public class BufferActivity extends AppCompatActivity {
private PublishSubject<Double> mPublishSubject;
private CompositeDisposable mCompositeDisposable;
private TextView mTv;
private SourceHandler mSourceHandler;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_buffer);
mTv = (TextView) findViewById(R.id.tv_buffer);
mPublishSubject = PublishSubject.create();
DisposableObserver<List<Double>> disposableObserver = new DisposableObserver<List<Double>>() {
@Override
public void onNext(List<Double> o) {
double result = 0;
if (o.size() > 0) {
for (Double d : o) {
result += d;
}
result = result / o.size();
}
Log.d("BufferActivity", "更新平均温度:" + result);
mTv.setText("过去3秒收到了" + o.size() + "个数据, 平均温度为:" + result);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
mPublishSubject.buffer(3000, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable = new CompositeDisposable();
mCompositeDisposable.add(disposableObserver);
//开始测量温度。
mSourceHandler = new SourceHandler();
mSourceHandler.sendEmptyMessage(0);
}
public void updateTemperature(double temperature) {
Log.d("BufferActivity", "温度测量结果:" + temperature);
mPublishSubject.onNext(temperature);
}
private class SourceHandler extends Handler {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
double temperature = Math.random() * 25 + 5;
updateTemperature(temperature);
//循环地发送。
sendEmptyMessageDelayed(0, 250 + (long) (250 * Math.random()));
}
}
@Override
protected void onDestroy() {
super.onDestroy();
mSourceHandler.removeCallbacksAndMessages(null);
mCompositeDisposable.clear();
}
}
复制代码
实际的运行结果为: ide
在上面的例子中,咱们使用了buffer(int time, Unit timeUnit)
,其原理图以下所示: 函数
mPublishSubject.onNext(temperature);
复制代码
事件并不会直接传递到Observer
的onNext
方法中,而是放在缓冲区中,直到事件到以后,再将全部在这段缓冲事件内放入缓冲区中的值,放在一个List
中一块儿发送到下游。学习
关于Buffer
的其它用法,这篇文章写得很全,我这里就不详细赘述了,你们能够参考:RxJava 的学习之变换操做符 - Buffer。spa