RxJava操做符大全

整理归类了比较全的比较经常使用的操做符,但不是最全的。还有个别没有添加,欢迎你们交流补充。java

建立操做符

create

完整建立1个被观察者对象(Observable)数组

just

  1. 快速建立1个被观察者对象(Observable)
  2. 发送事件的特色:直接发送 传入的事件

快速建立 被观察者对象(Observable) & 发送10个如下事件数据结构

from

fromeArray

  1. 快速建立1个被观察者对象(Observable)
  2. 发送事件的特色:直接发送 传入的数组数据

将数组元素一次发射出,能够用来遍历数组app

fromIterable

  1. 快速建立1个被观察者对象(Observable)
  2. 发送事件的特色:直接发送 传入的集合List数据

同上,可用来遍历集合ide

发送事件

下列方法通常用于测试使用函数

<-- empty()  -->
// 该方法建立的被观察者对象发送事件的特色:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即观察者接收后会直接调用onCompleted()

<-- error()  -->
// 该方法建立的被观察者对象发送事件的特色:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()

<-- never()  -->
// 该方法建立的被观察者对象发送事件的特色:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用
复制代码

延时操做符

  1. 定时操做:在通过了x秒后,须要自动执行y操做
  2. 周期性操做:每隔x秒后,须要自动执行y操做

delay

使得被观察者延迟一段时间再发送事件测试

// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)

// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延迟时间  & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知能够设置是否延迟
复制代码

defer

直到有观察者(Observer )订阅时,才动态建立被观察者对象(Observable) & 发送事件this

  1. 经过 Observable工厂方法建立被观察者对象(Observable)
  2. 每次订阅后,都会获得一个刚建立的最新的Observable对象,这能够确保Observable对象里的数据是最新的
<-- 1. 第1次对i赋值 ->>
Integer i = 10;

// 2. 经过defer 定义被观察者对象
// 注:此时被观察者对象还没建立
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

<-- 2. 第2次对i赋值 ->>
i = 15;

<-- 3. 观察者开始订阅 ->>
// 注:此时,才会调用defer()建立被观察者对象(Observable)
observable.subscribe(new Observer<Integer>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe链接");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "接收到的整数是"+ value  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件做出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件做出响应");
    }
});
复制代码

timer

  1. 快速建立1个被观察者对象(Observable)
  2. 发送事件的特色:延迟指定时间后,发送1个数值0(Long类型)

timer操做符默认运行在一个新线程上
也可自定义线程调度器(第3个参数):timer(long, TimeUnit, Scheduler)spa

interval

  1. 快速建立1个被观察者对象(Observable)线程

  2. 发送事件的特色:每隔指定时间就发送事件

    /**
     * initialDelay 初始延时时间
     * unit 时间单位
     * period 间隔时间
     * scheduler 线程调度器
     */
     public static Observable<Long> interval(long interval, TimeUnit unit) {
     	return interval(interval, interval, unit, Schedulers.computation());
     }
     
     public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) {
     	return interval(interval, interval, unit, scheduler);
     }
     
     public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
     	return interval(initialDelay, period, unit, Schedulers.computation());
     }
    复制代码

range/rangeLong

  1. 快速建立1个被观察者对象(Observable)

  2. 发送事件的特色:连续发送1个事件序列,可指定范围

    /**
     * start 起始数字
     * count 数量
     */
     public static Observable<Integer> range(int start, int count)
    复制代码

过滤操做符

  • take, takeFirst, takeLast

  • skip, skipFirst, skipLast

  • first

  • last

  • firstOrDefault, lastOrDefault (只发射最后一项(或者知足某个条件的最后一项)数据,能够指定默认值。)

    // 跳过前面几项
      public final Observable<T> skip(int count)
      // 跳过前面的时间,以后产生的数据提交
      public final Observable<T> skip(long time, TimeUnit unit)
      // skipLast和skip相反,跳事后面的几项。
      // 忽略最后时间单位内产生的数据
      skipLast(long time,TimeUnit)  
      
      // 并非娶第n个,而是取前面n个数据
      take(n) 
      // 是在制定时间内取数据,若是超过了这个时间源Observable产生的数据将做废
      take(long time, TimeUnit unit)  
    复制代码

takeFirst操做符和first操做符相似,取知足条件的第一个
区别:first取不到要抛异常,takeFirst不会

takeLast操做符与last操做符类似。区别在于,若是取不到知足条件的值,last将抛出异常

filter

过滤数据,不知足条件的数据将被过滤不发射。

filter(Fun) 自定义过滤条件
return false的数据将不被发射

ofType

过滤指定类型的数据

Observable.just(1,2,"3")
        .ofType(Integer.class)
        .subscribe(item -> Log.d("JG",item.toString()));
复制代码

elementAt/elementAtOrDefault/elementAtOrError

发射某一项数据,若是超过了范围能够指定默认值。内部经过OperatorElementAt过滤。

Observable.just(3,4,5,6)
             .elementAt(2)
    .subscribe(item->Log.d("JG",item.toString())); //5
复制代码

firstElement/lastElement

仅选取第1个元素 / 最后一个元素

ignoreElements

丢弃全部数据,只发射错误或正常终止的通知。内部经过OperatorIgnoreElements实现。

distinct

过滤重复数据,内部经过OperatorDistinct实现。

distinctUntilChanged

过滤掉连续重复的数据。内部经过OperatorDistinctUntilChanged实现

Observable.just(3,4,5,6,3,3,4,9)
   .distinctUntilChanged()
  .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
复制代码

timeout

若是原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable。

Debounce/throtleWithTimeout

根据你指定的时间间隔进行限流

发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据

条件操做符

single/singleOrDefault

检测源Observable产生的数据项是否只有一个,不然报错

onError()
java.lang.IllegalArgumentException: Sequence contains too many elements

all

all操做符接收一个函数参数,建立并返回一个单布尔值的Observable,
若是原Observable正常终止而且每一项数据都知足条件,就返回true,
若是原Observable的任何一项数据不知足条件或者非正常终止就返回False。

判断全部的数据项是否知足某个条件,内部经过OperatorAll实现。

amb/ambWith

amb操做符对于给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的全部数据。

当你传递多个Observable给amb操做符时,该操做符只发射其中一个Observable的数据和通知:首先发送通知给amb操做符的的那个Observable,无论发射的是一项数据仍是一个onError或onCompleted通知,amb将忽略和丢弃其它全部Observables的发射物。

amb(T o1, T ... o2)(可接受2到9个参数)

给定多个Observable,只让第一个发射数据的Observable发射所有数据,其余Observable将会被忽略。

contains

contains操做符将接收一个特定的值做为一个参数,断定原Observable是否发射该值,若已发射,则建立并返回的Observable将发射true,不然发射false。
判断在发射的全部数据项中是否包含指定的数据,内部调用的实际上是exists

contains操做符默认不在任何特定的调度器上执行。

可用来判断Observable发射的值中是否包含该值。

exists

exists操做符相似与contains操做符,不一样的是,其接受一个函数参数,在函数中,对原Observable发射的数据,设定比对条件并作判断。若任何一项知足条件就建立并返回一个发射true的Observable,不然返回一个发射false的Observable。

该操做符默认不在任何特定的调度器上执行。

判断是否存在数据项知足某个条件。内部经过OperatorAny实现。

isEmpty

isEmpty操做符用于断定原始Observable是否没有发射任何数据。若原Observable未发射任何数据,建立建立并返回一个发射true的Observable,不然返回一个发射false的Observable。

isEmpty操做符默认不在任何特定的调度器上执行。

能够用来判断是否没有数据发射。

defaultIfEmpty

defaultIfEmpty操做接受一个备用数据,在原Observable没有发射任何数据正常终止(以onCompletedd的形式),该操做符以备用数据建立一个Observable并将数据发射出去。

RxJava将这个操做符实现为defaultIfEmpty。它默认不在任何特定的调度器上执行。

switchIfEmpty

若是原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable。

若是原始Observable正常终止后仍然没有发射任何数据
defaultIfEmpty使用默认值发射,switchIfEmpty使用默认Observable发射

sequenceEqual

sequenceEqual(Observable,Observable,Func2)变体接收两个Observable参数和一个函数参数,在函数参数中,能够比较两个参数是否相同。

该操做符默认不在任何特定的调度器上执行。

用于判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)

skipUntil

skipUntil操做符在观察者订阅原Observable时,该操做符将是忽略原Observable的发射的数据,直到第二个Observable发射了一项数据那一刻,它才 开始发射原Observable发射的数据。

该操做符默认不在任何特定的调度器上执行。

skipWhile

skipWhile操做符丢弃原Observable发射的数据,直到发射的数据不知足一个指定的条件,才开始发射原Observable发射的数据。

在观察者订阅原Observable时,skipWhile操做符将忽略原Observable的发射物,直到你指定的某个条件变为false时,它开始发射原Observable发射的数据。

skipWhile操做符默认不在任何特定的调度器上执行。

takeUntil

takeUntil操做符与skipUntil操做符做用相反,当第二个Observable发射了一项数据或者终止时,丢弃原Observable发射的任何数据。
takeUntil(Func1)变体接受一个函数参数,当知足条件时终止发射数据。

takeWhile

takeWhile操做符与skipWhile操做符做用相反。在观察者订阅原Observable时,takeWhile建立并返回原Oservable的镜像Observable,暂命名为_observable,发射原Observable发射的数据。当你指定的某个条件变为false时,_observable发射onCompleted终止通知。

takeWhile操做符默认不在任何特定的调度器上执行。

变换操做符

map

对被观察者发送的每1个事件都经过指定的函数处理,从而变换成另一种事件

即,将被观察者发送的事件转换为任意的类型事件。

若是是list,可对list的每一个元素进行类型转换,最后tolist发射转换后的list。

flatmap

对Observable发射的数据都应用(apply)一个函数,这个函数返回一个Observable,而后合并这些Observables,而且发送(emit)合并的结果。 flatMap和map操做符很相像,flatMap发送的是合并后的Observables,map操做符发送的是应用函数后返回的结果集

将原Observable发射的每一个数据转换为新的Observable,发射每个转换的Observable

新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关

concatMap

做用同flatMap

与flatMap的区别是,新合并生成的事件序列顺序是有序的

switchMap

当源Observable发射一个新的数据项时,若是旧数据项订阅还未完成,就取消旧订阅数据和中止监视那个数据项产生的Observable,开始监视新的数据项.

cast

cast操做符将原始Observable发射的每一项数据都强制转换为一个指定的类型,而后再发射数据,它是map的一个特殊版本

所相互转换的类之间须要存在某种关系,如继承、实现

concat

组合多个被观察者一块儿发送数据,合并后 按发送顺序串行执行

按发送顺序串行执行

merge

组合多个被观察者一块儿发送数据,合并后 按时间线并行执行

区别上述concat()操做符:一样是组合多个被观察者一块儿发送数据,但concat()操做符合并后是按发送顺序串行执行

并行执行

zip

合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合事后的事件序列),并最终发送

事件组合方式 = 严格按照原先事件序列 进行对位合并
最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量

reduce

把被观察者须要发送的事件聚合成1个事件 & 发送

聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,而后与后1个数据继续进行聚合,依次类推

自定义聚合条件,前2个数据聚合获得结果与第三个数据再聚合。以此类推...

collect

将被观察者Observable发送的数据事件收集到一个数据结构里

Observable.just(1, 2, 3, 4)
    .collect(new Func0<ArrayList<Integer>>() {
        @Override
        public ArrayList<Integer> call() {
        	//建立收集容器
            return new ArrayList<>();
        }
    }, new Action2<ArrayList<Integer>, Integer>() {
        @Override
        public void call(ArrayList<Integer> list1, Integer integer) {
        	//开始收集每一项数据
            list1.add(integer);
        }
    }).subscribe(new Action1<ArrayList<Integer>>() {
        @Override
        public void call(ArrayList<Integer> integers) {
			//获得收集后的数据
        }
    });
复制代码

startWith

在一个被观察者发送事件前,追加发送一些数据或是一个新的被观察者

//源码是经过concat实现,在前面追加一个Observable
public final Observable<T> startWith(Observable<T> values) {
    return concat(values, this);
}
复制代码

compose

其余操做符

retry

重试,即当出现错误时,让被观察者(Observable)从新发射数据

retryUntil

出现错误后,判断是否须要从新发送数据

retryWhen

遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否须要从新订阅原始被观察者(Observable)& 发送事件

repeat

无条件地、重复发送 被观察者事件

具有重载方法,可设置重复建立次数

repeatWhen

有条件地、重复发送 被观察者事件

count

统计被观察者发送事件的数量

相关文章
相关标签/搜索