上节初步了解了RxJava,本节主要讲讲RxJava的操做符。官方文档中能够看到操做符特别的多,一样也是RxJava比较重要的内容,在我看来灵活使用RxJava离不开对它的操做符的理解。那本节就根据图书《RxJava Essentials》上操做符的描述,根据操做符类型分类分别介绍一些操做符的使用。另外本节引用了RxJava-Android-Samples一些实例更好理解操做在实际开发中如何使用。git
take程序员
发射事件流中的前n个事件
Observable.just(1,2,3,4,5,6,7,8,9,10) //只取前4个事件 1,2,3,4 .take(4) .subscribe(...)
takeLastgithub
发射事件流中的后n个事件
Observable.just(1,2,3,4,5,6,7,8,9,10) //只取4个事件 7,8,9,10 .takeLast(4) .subscribe(...)
distinct编程
过滤事件流重复发射的事件
Observable.just(1,2,3,3,2,3,2,4,5,4,5,5) //只取5个事件 1,2,3,4,5 .distinct() .subscribe(...)
distinctUntilChange数组
过滤同样的事件直到事件发生变化才进行发射
Observable.just(1,2,2,3,3,4,4,4,5,5) //只取5个事件 1,2,3,4,5 .distinct() .subscribe(...)
repeat缓存
重复发射事件流
Observable.just(1,2,3) .repeat(3) .subscribe(...) //订阅获取 1,2,3,1,2,3,1,2,3
first并发
发射首个事件
Observable.just(1,2,3) .first() .subscribe(...) //订阅获取 1
lastide
发射最后一个事件
Observable.just(1,2,3) .last() .subscribe(...) //订阅获取 1
skipLast函数式编程
跳过发射事件流后n个事件
Observable.just(1,2,3,4,5,6,7,8,9,10) .skipLast(4) .subscribe(...) //订阅获取 1,2,3,4,5,6
timeout函数
不发射超出指定时间内外的事件
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); deplyTime();//延迟 subscriber.onNext(4); subscriber.onNext(5); } }) .timeout(2, TimeUnit.SECONDS) .subscribe(...) //订阅获取 1,2,3
sample
发射在指定时间内事件流中的最后一个事件
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); deplyTime(); subscriber.onNext(4); subscriber.onNext(5); deplyTime(); subscriber.onNext(6); } }) .sample(2,TimeUnit.SECONDS) .subscribe(...) //订阅获取 3,5,6
throttleFirst
发射在指定时间内事件流中的第一个事件
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); deplyTime(); subscriber.onNext(4); subscriber.onNext(5); deplyTime(); subscriber.onNext(6); } }) .throttleFirst(2,TimeUnit.SECONDS) .subscribe(...) //订阅获取 1,4,6
debounce
在计时时间内事件流没有产生新事件则发射当前事件,如有新事件产生则从新计时
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); deplyTime();//延时两秒 subscriber.onNext(3); subscriber.onNext(4); subscriber.onNext(5); deplyTime(); deplyTime(); subscriber.onNext(3); subscriber.onNext(4); deplyTimeLittle();//延时一秒 subscriber.onNext(5); subscriber.onNext(4); deplyTimeLittle(); subscriber.onNext(7); deplyTime(); subscriber.onNext(6); } }) .debounce(2,TimeUnit.SECONDS) .subscribe(...) //订阅获取 2,5,7,6
实例应用:监听EditTextView字符变化,在输入字符后400ms内无变化则订阅字符串事件
_subscription = RxTextView.textChangeEvents(_inputSearchText) .debounce(400, TimeUnit.MILLISECONDS)// default Scheduler is Computation .filter(changes -> isNotNullOrEmpty(_inputSearchText.getText().toString())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(_getSearchObserver());
flatMap
faltMap用法和map相似但flatMap返回的是Observable<T>对象,另外flatMap支持无序,最后订阅的事件流并不必定和原来的序列保持一致。
concatMap
concatMap用法和flatMap同样,只是concatMap可以保证事件流保持原来的序列。
flatMapIterable
flatMapIterable和faltMap相似,但返回类型是Iterable
switchMap
scan
获取当前事件和后一个事件作特殊处理返回同类型事件,主要应用对事件的包装。
groupBy
对事件进行分类订阅,根据自定义筛选规则对事件流分类,经过GroupedObservable.getKey()区分处理事件。
Observable.just(1,2,3,4,5,6,7,8,9,10) .groupBy(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; } }).subscribe(new Action1<GroupedObservable<Boolean, Integer>>() { @Override public void call(GroupedObservable<Boolean, Integer> booleanIntegerGroupedObservable) { if(booleanIntegerGroupedObservable.getKey()){ //True booleanIntegerGroupedObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("Subscriber true",integer + "\n"); } }); }else{ //False booleanIntegerGroupedObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("Subscriber false",integer + "\n"); } }); } } }); //订阅结果 1,3,5,7,9由false处理,2,4,6,8,10由true处理
buffer
将事件流组装为数组发射,大小由Buffer决定
Observable.just(1,2,3,4,5,6,7,8,9,10) .buffer(3) .subscribe(...) //订阅获取 [1, 2, 3],[4, 5, 6],[7, 8, 9],[10]
实例应用:点击事件,buffer作定时组装数组,计时时间内无新事件产生则组装当前数组发射
RxView.clickEvents(_tapBtn) .map(onClickEvent -> { Timber.d("--------- GOT A TAP"); _log("GOT A TAP"); return 1; }) .buffer(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(...);
window
window操做符会在时间间隔内缓存结果,相似于buffer缓存一个list集合,区别在于window将这个结果集合封装成了observable
bservable.interval(1,TimeUnit.SECONDS) .take(10) .window(3,TimeUnit.SECONDS) .subscribe(...)
cast
设置事件的指定类型
merge
合并事件流,用法和groupBy偏偏相反
Observable.merge(Observable.just(2,3),Observable.just(3,5)) .subscribe(...); //订阅获取 2,3,3,5
zip
整合多个事件流将事件结果整合处理再发射事件
Observable.zip(Observable.just(1, 2, 3), Observable.just(1, 2, 3), new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }) .subscribe(...) //订阅结果 2,4,6
实战应用:两个http请求并发,等待两个结果返回再处理结果实现屡次请求一次处理
Observable.zip( service.getUserPhoto(id), service.getPhotoMetadata(id), (photo, metadata) -> createPhotoWithData(photo, metadata)) .subscribe(photoWithData -> showPhoto(photoWithData));
join
join一样是将多个事件流结果合并统一处理,当join可控制每一个事件流结果生命周期,,在每一个结果的生命周期内,能够与另外一个Observable产生的结果按照必定的规则进行合并。
combineLatest
CombineLatest操做符行为相似于zip,可是只有当原始的Observable中的每个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,而后发射这个函数的返回值。
本节一些地方引用的例子使用了函数式编程写法,这是Java SE 8中一个重要特性。这里稍微作一个简短的Lambda介绍。
Lambda表达式是Java SE 8中一个重要的新特性。lambda表达式容许你经过表达式来代替功能接口。 lambda表达式就和方法同样,它提供了一个正常的参数列表和一个使用这些参数的主体(body,能够是一个表达式或一个代码块)。Lambda表达式还加强了集合库。
lambda的确让Java代码紧凑简洁,可并行处理集合例如filter、map、reduce等并行函数。但目前看来据我所知使用Lambda的程序员不是大多数,Lambda也下降了代码的可读性在开发企业项目不易于维护开发,但不妨先学习了解。
在学习RxJava的过程当中我对于RxJava有了本身的理解,找了一张来自泡在网上的日子的一张图。整个RxJava被订阅的过程是一个Subscription能够比喻成工厂生产商品到消费者的过程,产品最终是否能够到达消费者手中由Subscription决定,observable能够理解成未加工过的产品(至少在为被Subscriber消费以前是的),Operations为整个生产线上每一条流水线。Schers为一个车间,它表明着Operation在哪车间里运做。Observable经过流水线的传递最终到达消费者Subscer手中。这个过程就像是给一个初始的产品模型在生产制做过程中不断加工制做组装,最终达到消费者手中是一个制做加工所想要的商品。这就是我对RxJava的理解,可能有点误差但大体上和其思想比较接近。
其实关于RxJava的资料有不少,我主要是以学习分享为目的来讲说本身学习RxJava所感所想,但愿更和你们一块儿讨论学习进步。