目录html
几种主要的需求java
建立Observable的各类方式react
使用 Create
操做符从头开始建立一个Observable,给这个操做符传递一个接受观察者做为参数的函数,编写这个函数能够调用观察者的 onNext
,onError
和 onCompleted
方法,当发生订阅的时候会自动调用观察者的 onSubscribe
方法。git
经过 Subscribe 进行Observable 与 Observer 的订阅,其中 subscribe 方法能够接收一个完整通知参数的 Observer 对象,也能够接收部分通知参数的 Consumer
(接收数据) 或者 Action
(仅接收通知) 对象。github
实例代码:数组
// 建立Observable(被观察者) Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("World"); emitter.onComplete(); } }); // 建立Observer(观察者), 能够接受全部通知 Observer<String> observer = new Observer<String>() { public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } public void onNext(String t) { System.out.println("--> onNext = " + t); } public void onError(Throwable e) { System.out.println("--> onError"); } public void onComplete() { System.out.println("--> onComplete"); } }; // 建立只接受 onNext(item) 通知的Consumer(观察者) Consumer<String> nextConsumer = new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept nextConsumer: " + t); } }; // 建立只接受 onError(Throwable) 通知的Consumer(观察者) Consumer<Throwable> errorConsumer = new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.out.println("-- accept errorConsumer: " + t); } }; // 建立只接受 onComplete() 通知的Action(观察者) Action completedAction = new Action() { @Override public void run() throws Exception { System.out.println("--> run completedAction"); } }; // 建立只接受 onSubscribe 通知的Consumer(观察者) Consumer<Disposable> onSubscribeComsumer = new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> accept onSubscribeComsumer "); } }; // 1. 进行订阅,subscribe(Observer) observable.subscribe(observer); System.out.println("---------------------------------------------"); // 2. 进行订阅,subscribe(Consumer onNext) observable.subscribe(nextConsumer); System.out.println("---------------------------------------------"); // 3. 进行订阅,subscribe(Consumer onNext, Consumer onError) observable.subscribe(nextConsumer, errorConsumer); System.out.println("---------------------------------------------"); // 4. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted) observable.subscribe(nextConsumer, errorConsumer, completedAction); System.out.println("---------------------------------------------"); // 5. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe) observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);
输出:缓存
--> onSubscribe --> onNext = Hello --> onNext = World --> onComplete --------------------------------------------- --> accept nextConsumer: Hello --> accept nextConsumer: World --------------------------------------------- --> accept nextConsumer: Hello --> accept nextConsumer: World --------------------------------------------- --> accept nextConsumer: Hello --> accept nextConsumer: World --> run completedAction --------------------------------------------- --> accept onSubscribeComsumer --> accept nextConsumer: Hello --> accept nextConsumer: World --> run completedAction
注意:create 方法默认不在任何特定的调度器上执行。app
- onSubscribe(Disposable): 在发生订阅时接收。
- onNext(item): 在被观察者发射数据接收。
- onError(Throwable): 在被观察者发射Error时接收。
onComplete(): 在被观察者完成数据发送时接收。ide
Javadoc: create(OnSubscribe)
Javadoc: subscribe()
Javadoc: subscribe(observer)
Javadoc: subscribe(onNext)
Javadoc: subscribe(onNext, onError)
Javadoc: subscribe(onNext, onError, onComplete)
Javadoc: subscribe(onNext, onError, onComplete, onSubscribe)函数
直到有观察者订阅时才建立 Observable,而且为每一个观察者建立一个新的 Observable.
Defer
操做符会一直等待直到有观察者订阅它,而后它使用Observable工厂方法生成一个 Observable。它对每一个观察者都这样作,所以尽管每一个订阅者都觉得本身订阅的是同一个 Observable,事实上每一个订阅者获取的是它们本身的单独的数据序列。
实例代码:
// 建立一个Defer类型的Observable Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { public ObservableSource<? extends Integer> call() throws Exception { // 建立每一个观察者订阅所返回的 Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onComplete(); } }); return observable; } }); // 建立第一个观察者并订阅defer Observable deferObservable.subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("No.1 --> accept = " + t); } }); // 建立第二个观察者并订阅defer Observable deferObservable.subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("No.2 --> accept = " + t); } }); // 建立第三个观察者并订阅defer Observable deferObservable.subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("No.3 --> accept = " + t); } });
输出:
No.1 --> accept = 1 No.1 --> accept = 2 No.1 --> accept = 3 No.1 --> accept = 4 No.1 --> accept = 5 No.2 --> accept = 1 No.2 --> accept = 2 No.2 --> accept = 3 No.2 --> accept = 4 No.2 --> accept = 5 No.3 --> accept = 1 No.3 --> accept = 2 No.3 --> accept = 3 No.3 --> accept = 4 No.3 --> accept = 5
注意:defer 方法默认不在任何特定的调度器上执行。
Javadoc: defer(Func0)
Empty
:建立一个不发射任何数据可是正常终止的Observable
Never
:建立一个不发射数据也不终止的Observable
Error
:建立一个不发射数据以一个错误终止的Observable
这三个操做符生成的 Observable 行为很是特殊和受限,多用于一些特殊的场景(某些操做状态异常后返回一个error、empty、never 的 Observable)。测试的时候颇有用,有时候也用于结合其它的 Observables,或者做为其它须要 Observable 的操做符的参数。
实例代码:
System.out.println("--> 1 -----------------------------------"); // 1. 建立一个不发射任何数据可是正常终止的Observable Observable.empty() .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Object t) { System.out.println("onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("onError: " + e); } @Override public void onComplete() { System.out.println("onComplete"); } }); System.out.println("--> 2 -----------------------------------"); // 2. 建立一个不输出数据,而且不会终止的Observable Observable.never() .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Object t) { System.out.println("onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("onError: " + e); } @Override public void onComplete() { System.out.println("onComplete"); } }); System.out.println("--> 3 -----------------------------------"); // 3. 建立一个不发射数据以一个错误终止的Observable Observable.error(new NullPointerException("error test")) .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Object t) { System.out.println("onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("onError: " + e); } @Override public void onComplete() { System.out.println("onComplete"); } });
输出:
--> 1 ----------------------------------- onSubscribe onComplete --> 2 ----------------------------------- onSubscribe --> 3 ----------------------------------- onSubscribe onError: java.lang.NullPointerException: error test
注意:
- RxJava将这些操做符实现为 empty,never和 error。
- error 操做符须要一 个 Throwable参数,你的Observable会以此终止。
- 这些操做符默认不在任何特定的调度器上执行,可是 empty 和 error 有一个可选参数是Scheduler,若是你传递了Scheduler参数,它 们会在这个调度器上发送通知.
Javadoc: empty()
Javadoc: never()
Javadoc: error(java.lang.Throwable)
建立一个发射指定值的Observable。
Just
将单个数据转换为发射那个数据的Observable。相似于From
,可是From会将数组或Iterable的数据取出而后逐个发射,而Just只是简单的原样发射,将数组或Iterable当作单个数据。
注意: 若是你传递 null
给 Just
,它会返回一个发射 null 值的 Observable。不要误认为它会返回一个空Observable(彻底不发射任何数据的Observable),若是须要空Observable你应该使用Empty
操做符。
实例代码:
// 单个对象发送 Observable.just(1) .subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("--> singe accept: " + t); } }); System.out.println("---------------------------------"); // 多个对象发送,内部实际使用from实现 (接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable) Observable.just(1, 2, 3, 4, 5) .subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("--> mutil accept: " + t); } });
输出:
--> singe accept: 1 --------------------------------- --> mutil accept: 1 --> mutil accept: 2 --> mutil accept: 3 --> mutil accept: 4 --> mutil accept: 5
Javadoc: just(item ...)
将其它种类的对象和数据类型转换为Observable,发射来自对应数据源数据类型的数据,在RxJava中,from
操做符能够转换 Future
、Iterable
和数组
。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。
实例代码:
// 初始化数据 Integer[] array = { 1, 2, 3, 4, 5, 6 }; List<String> iterable = new ArrayList<String>(); iterable.add("A"); iterable.add("B"); iterable.add("C"); iterable.add("D"); iterable.add("E"); // 1. fromArray Observable.fromArray(array).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(1):fromArray: " + t); } }); System.out.println("---------------------------------------"); // 2. fromIterable Observable.fromIterable(iterable) .subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(2) fromIterable: " + t); } }); System.out.println("---------------------------------------"); // 3. fromCallable Observable.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return 1; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(3): fromCallable: " + t); } }); System.out.println("---------------------------------------"); // 4. fromFuture Observable.fromFuture(new Future<String>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public String get() throws InterruptedException, ExecutionException { System.out.println("--> fromFutrue: get()"); return "hello"; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return false; } @Override public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return null; } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(4): fromFuture: " + t); } });
输出:
--> accept(1):fromArray: 1 --> accept(1):fromArray: 2 --> accept(1):fromArray: 3 --> accept(1):fromArray: 4 --> accept(1):fromArray: 5 --> accept(1):fromArray: 6 --------------------------------------- --> accept(2) fromIterable: A --> accept(2) fromIterable: B --> accept(2) fromIterable: C --> accept(2) fromIterable: D --> accept(2) fromIterable: E --------------------------------------- --> accept(3): fromCallable: 1 --------------------------------------- --> fromFutrue: get() --> accept(4): fromFuture: hello
注意:from默认不在任何特定的调度器上执行。然而你能够将Scheduler做为可选的第二个参数传递给Observable,它会在那个调度器上管理这个Future。
Javadoc: from(array)
Javadoc: from(Iterable)
Javadoc: from(Callable)
Javadoc: from(Future)
Javadoc: from(Future,Scheduler)
Javadoc: from(Future,timeout,timeUnit)
建立一个发射特定数据重复屡次的Observable,它不是建立一个Observable,而是重复发射原始 Observable的数据序列,这个序列或者是无限的,或者经过 repeat(n)
指定重复次数。
实例代码:
// 1. repeat(): 一直重复发射原始 Observable的数据序列 Observable.range(1, 5) .repeat() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(1): " + t); } }); System.out.println("----------------------------------------"); // 2. repeat(n): 重复执行5次 Observable.range(1, 2) .repeat(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(2): " + t); } });
输出:
--> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 ...... ---------------------------------------- --> accept(2): 1 --> accept(2): 2 --> accept(2): 1 --> accept(2): 2 --> accept(2): 1 --> accept(2): 2
注意: repeat 操做符默认在 trampoline 调度器上执行。有一个变体能够经过可选参数指定 Scheduler。
Javadoc: repeat()
Javadoc: repeat(long)
Javadoc: repeat(Scheduler)
Javadoc: repeat(long,Scheduler)
repeatWhen
的操做符,它不是缓存和重放原始 Observable 的数据序列,接收到原始 Observable 终止通知后,有条件的决定是否从新订阅原来的 Observable 。
将原始 Observable 的终止通知(完成或错误)当作一个 void 数据传递给一个通知处理器,它以此来决定是否要从新订阅和发射原来的 Observable。这个通知处理器就像一个 Observable 操做符,接受一个发射 void通知的 Observable为输入,返回一个发射 void 数据(意思是,从新订阅和发射原始 Observable)或者直接终止(意思是,使用 repeatWhen 终止发射数据)的 Observable。
实例代码:
// repeatWhen(Func1()):接收到终止通知后,在函数中决定是否从新订阅原来的Observable // 须要注意的是repeatWhen的objectObservable处理(也能够单独自定义Observable返回),这里使用flathMap进行处理, // 让它延时发出onNext,这里onNext发出什么数据都不重要,它只是仅仅用来处理重订阅的通知,若是发出的是onComplete/onError,则不会触发重订阅 Observable.range(1, 2) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("-----------> 完成一次订阅"); } }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { private int n = 0; @Override public ObservableSource<?> apply(Observable<Object> t) throws Exception { // 接收到原始Observable的终止通知,决定是否从新订阅 System.out.println("--> apply repeat "); return t.flatMap(new Function<Object, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Object t) throws Exception { if(n < 3) { // 从新订阅3次 n ++; return Observable.just(0); } else { return Observable.empty(); } } }); // return Observable.timer(1, TimeUnit.SECONDS); // 间隔一秒后从新订阅一次 // return Observable.interval(1, TimeUnit.SECONDS); // 每间隔一秒从新订阅一次 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: " + t); } });
输出:
--> apply repeat --> accept: 1 --> accept: 2 -----------> 完成一次订阅 --> accept: 1 --> accept: 2 -----------> 完成一次订阅 --> accept: 1 --> accept: 2 -----------> 完成一次订阅 --> accept: 1 --> accept: 2 -----------> 完成一次订阅
注意:repeatWhen操做符默认在 trampoline 调度器上执行。
Javadoc: repeatWhen(Func1)
根据条件(函数BooleanSupplier
)判断是否须要继续订阅: false:继续订阅; true:取消订阅
实例代码:
// repeatUntil 根据条件(BooleanSupplier)判断是否须要继续订阅 Observable.range(1, 2) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("-----------> 完成一次订阅"); } }).repeatUntil(new BooleanSupplier() { private int n = 0; @Override public boolean getAsBoolean() throws Exception { System.out.println("getAsBoolean = " + (n < 3? false:true) ); // 是否须要终止 if (n < 3) { n++; return false; // 继续从新订阅 } return true; // 终止从新订阅 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: " + t); } });
输出:
--> accept: 1 --> accept: 2 -----------> 完成一次订阅 getAsBoolean = false --> accept: 1 --> accept: 2 -----------> 完成一次订阅 getAsBoolean = false --> accept: 1 --> accept: 2 -----------> 完成一次订阅 getAsBoolean = false --> accept: 1 --> accept: 2 -----------> 完成一次订阅 getAsBoolean = true
Javadoc: repeatWhen(Func1)
建立一个发射特定整数序列的Observable。
Range
操做符发射一个范围内的有序整数序列,你能够指定范围的起始和长度。
RxJava将这个操做符实现为 range 函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。若是你将第二个参数设为0,将致使Observable不发射任何数据(若是设置 为负数,会抛异常)。
实例代码:
// 1. range(n,m) 发射从n开始的m个整数序列,序列区间[n,n+m-1) Observable.range(0, 5) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("-- accept(range): " + t); } }); System.out.println("------------------------------"); // 2. rangeLong(n,m) 发射从n开始的m个长整型序列,序列区间[n,n+m-1) Observable.rangeLong(1, 5) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("-- accept(rangeLong): " + t); } });
输出:
-- accept(range): 0 -- accept(range): 1 -- accept(range): 2 -- accept(range): 3 -- accept(range): 4 ------------------------------ -- accept(rangeLong): 1 -- accept(rangeLong): 2 -- accept(rangeLong): 3 -- accept(rangeLong): 4 -- accept(rangeLong): 5
Javadoc: range(int start,int count)
Javadoc: rangeLong(long start, long count)
建立一个按固定时间间隔发射整数序列的Observable,它按固定的时间间隔发射一个无限递增的整数序列。
RxJava将这个操做符实现为 interval 方法。它接受一个表示时间间隔的参数和一个表示时间单位的参数。
实例代码:
// [1] interval(long period, TimeUnit unit) // 每间隔period时间单位,发射一次整数序列 Observable.interval(1, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { public void accept(Long l) throws Exception { System.out.println("--> accept(1): " + l); } }); System.out.println("------------------------------------"); // [2] interval(long initialDelay, long period, TimeUnit unit) // 在延迟initialDelay秒后每隔period时间单位发射一个整数序列 Observable.interval(0, 1, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } }); System.out.println("------------------------------------"); // [3] intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) // 延迟initialDelay秒后从起始数据start开始,每隔period秒发送一个数字序列,一共发送count个数据 Observable.intervalRange(1, 5, 3, 2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { public void accept(Long t) throws Exception { System.out.println("--> accept(3): " + t); } });
注意:interval 默认在 computation 调度器上执行, 有一个变体能够经过可选参数指定 Scheduler。
Javadoc: interval(long period, TimeUnit unit)
Javadoc: interval(long period, TimeUnit unit, Scheduler scheduler)
Javadoc: interval(long initialDelay, long period, TimeUnit unit)
Javadoc: interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
输出:
--> accept(1): 0 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 ... ------------------------------------ --> accept(2): 0 --> accept(2): 1 --> accept(2): 2 --> accept(2): 3 --> accept(2): 4 --> accept(2): 5 ... ------------------------------------ --> accept(3): 1 --> accept(3): 2 --> accept(3): 3 --> accept(3): 4 --> accept(3): 5
建立一个给定的延迟后发射一个特殊的值的Observable。
RxJava将这个操做符实现为 timer 函数。timer 返回一个Observable,它在延迟一段给定的时间后发射一个简单的数字0
实例代码:
// timer(long delay, TimeUnit unit, Scheduler scheduler) // 定时delay时间 单位后发送数字0,指定可选参数Schedule调度器为trampoline(当前线程排队执行) Observable.timer(5, TimeUnit.SECONDS, Schedulers.trampoline()) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept: " + t); } });
输出:
--> accept: 0
注意:timer 操做符默认在 computation 调度器上执行。有一个变体能够经过可选参数指定 Scheduler。
Javadoc: timer(long delay, TimeUnit unit)
Javadoc: timer(long delay, TimeUnit unit, Scheduler scheduler)
根据实际状况,使用不一样的方式建立不一样种类的Observable,这个在开发中很是有用,能够减小不少重复、复杂、冗余的操做,能够快速的建立一个符合要求的Observable,必定程度上提升了开发的效率。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例
实例代码: