Observable
)发送的事件 & 观察者 (Observer
)接收的事件3.1 filter() ----- 见rxdocs.pdf第103页java
做用:过滤 特定条件的事件ide
public static void filter() { Observable.just(1, 2, 3, 7, 6, 9) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 5; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:spa
08-09 17:01:19.982 28378 28378 D Operation: onSubscribe
08-09 17:01:19.982 28378 28378 D Operation: onNext: value = 7
08-09 17:01:19.982 28378 28378 D Operation: onNext: value = 6
08-09 17:01:19.982 28378 28378 D Operation: onNext: value = 9
08-09 17:01:19.982 28378 28378 D Operation: onComplete
3.2 ofType() ----- 见rxdocs.pdf第105页3d
做用:过滤 特定数据类型的数据code
public static void ofType() { Observable.just(1, "A", 1.0f) .ofType(String.class) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:server
08-09 17:13:24.019 28953 28953 D Operation: onSubscribe 08-09 17:13:24.019 28953 28953 D Operation: onNext: value = A 08-09 17:13:24.019 28953 28953 D Operation: onComplete
3.3 skip() / skipLast() ----- 见rxdocs.pdf第120页blog
做用:跳过前面或后面某些事件,某几个或某段时间的事件索引
public static void skip() { Observable.just(1, 2, 3, 4, 5, 6, 7) .skip(2) .skipLast(3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:事件
08-09 17:20:42.436 29210 29210 D Operation: onSubscribe
08-09 17:20:42.437 29210 29210 D Operation: onNext: value = 3
08-09 17:20:42.437 29210 29210 D Operation: onNext: value = 4
08-09 17:20:42.437 29210 29210 D Operation: onComplete
3.4 distinct() / distinctUntilChanged()----- 见rxdocs.pdf第97页ip
做用:过滤事件序列中重复的事件 / 连续重复的事件
public static void distinct() { Observable.just(1, 2, 2, 3, 5, 6, 8, 8) .distinct() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:
08-09 17:29:09.744 29578 29578 D Operation: onSubscribe
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 1
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 2
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 3
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 5
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 6
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 8
08-09 17:29:09.744 29578 29578 D Operation: onComplete
3.5 take()/ takeLast()----- 见rxdocs.pdf第124页
做用:指定观察者最多能接收到最前面或最后面的事件数量
public static void take() { Observable.just(1, 2, 3, 4) .take(2) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:
08-09 17:36:23.703 29843 29843 D Operation: onSubscribe
08-09 17:36:23.703 29843 29843 D Operation: onNext: value = 1
08-09 17:36:23.703 29843 29843 D Operation: onNext: value = 2
08-09 17:36:23.703 29843 29843 D Operation: onComplete
3.6 throttleFirst()/ throttleLast()----- 见rxdocs.pdf第119页
做用:在某段时间内,只发送该段时间内第1次事件 / 最后1次事件
public static void throttleFirst() { Observable.intervalRange(0, 6, 0, 1, TimeUnit.SECONDS) .throttleFirst(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Long value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:
08-09 17:47:52.213 30470 30470 D Operation: onSubscribe
08-09 17:47:52.215 30470 30494 D Operation: onNext: value = 0
08-09 17:47:55.216 30470 30494 D Operation: onNext: value = 3
08-09 17:47:57.215 30470 30494 D Operation: onComplete
3.7 sample()----- 见rxdocs.pdf第118页
做用:在某段时间内,只发送该段时间内最新(最后)1次事件。与 throttleLast()
操做符相似
3.8 throttleWithTimeout () / debounce()----- 见rxdocs.pdf第95页
做用:发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
public static void debounce() { Observable.intervalRange(0, 6, 0, 1, TimeUnit.SECONDS) .debounce(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Long value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:
08-09 18:00:40.068 31014 31014 D Operation: onSubscribe
08-09 18:00:45.073 31014 31038 D Operation: onNext: value = 5
08-09 18:00:45.073 31014 31038 D Operation: onComplete
3.9 firstElement() / lastElement()
做用:仅选取第1个元素 / 最后一个元素
public static void firstElement() { Observable.just(1, 2, 3) .firstElement() .subscribe(new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Integer value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:
08-09 19:25:41.413 2486 2486 D Operation: onSubscribe
08-09 19:25:41.413 2486 2486 D Operation: onSuccess: value = 1
3.10 elementAt()----- 见rxdocs.pdf第101页
做用:指定接收某个元素(经过 索引值 肯定)
public static void elementAt() { Observable.just(1, 2, 3, 4, 5) .elementAt(2) .subscribe(new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Integer value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
输出:
08-09 19:29:54.401 2972 2972 D Operation: onSubscribe
08-09 19:29:54.402 2972 2972 D Operation: onSuccess: value = 3
3.11 elementAtOrError()
做用:在elementAt()
的基础上,当出现越界状况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常
public static void elementAtOrError() { Observable.just(1, 2, 3, 4, 5) .elementAtOrError(10) .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Integer value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
输出:
08-09 19:35:06.800 3849 3849 D Operation: onSubscribe
08-09 19:35:06.800 3849 3849 D Operation: onError: java.util.NoSuchElementException