import android.util.Log; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.ObservableSource; import io.reactivex.Observer; import io.reactivex.Scheduler; import io.reactivex.Single; import io.reactivex.SingleObserver; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.functions.BiFunction; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.functions.Predicate; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; /** * Created by vein on 2018/1/31. */ public class RxDemo { /** * 1.建立一个发射对象,并对其进行观察 */ public void create() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { //发射数据源 // e.onNext(0); // e.onComplete(); // e.onError(new Throwable()); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { //Disposable能够终止事件传递,若是执行 d.dispose();,后续onNext则接收不到发射源数据 } @Override public void onNext(Integer integer) { //接收发射内容 } @Override public void onError(Throwable e) { } @Override public void onComplete() { //若是发射源 } }); } /** * 2. 合并发射事件 * <p> * 分别从两个上游事件中各取出一个组合 * 一个事件只能被使用一次,顺序严格按照事件发送的顺序 * 最终下游事件收到的是和上游事件最少的数目相同(必须两两配对,多余的舍弃) * <p> */ public void zip() { Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() { @Override public String apply(String s, Integer integer) throws Exception { return s + integer; } }).subscribe(new Consumer<String>() {//Consumer内部实现,可用subscribe(new Observer<Integer>() {}代替 @Override public void accept(String resultStr) throws Exception { Log.d("", resultStr); //结果为 A1 B2 C3 //依据最短数据源为结果长度,多余舍弃 } }); } //构造事件1,做为zip()的数据源 private Observable<String> getStringObservable() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { if (!e.isDisposed()) { e.onNext("A"); e.onNext("B"); e.onNext("C"); } } }); } //构造事件2,做为zip()数据源 private Observable<Integer> getIntegerObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(4); e.onNext(5); } } }); } /** * 3.对上游发送的每个事件应用一个函数,使得每个事件都按照指定的函数去变化 */ public void map() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { //此处添加转换方法 String resultStr = "map result " + integer; return resultStr; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { //结果 //map result 1 //map result 2 //map result 3 } }); } /** * 4. * FlatMap将一个发送事件的上游Observable变换成多个发送事件的Observables, * 而后将它们发射的事件合并后放进一个单独的Observable里 * 可是flatMap并不保证事件的顺序,若是想保证顺序性用concatMap */ public void flatmap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { //事件合并方法 List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("I am value " + integer); } return Observable.fromIterable(list); // int delayTime = (int) (1 + Math.random() * 50); // return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { //结果(注意,每条输出的顺序不固定) //I am value 1 //I am value 2 //I am value 1 //I am value 2 //I am value 2 //I am value 3 //I am value 2 //I am value 3 //I am value 3 } }); } /** * 5. 与flatMap功能同样,惟一区别是能保证顺序 */ public void concatMap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { } }).concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("I am value " + integer); } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { //结果 //I am value 1 //I am value 1 //I am value 1 //I am value 2 //I am value 2 //I am value 2 //I am value 3 //I am value 3 //I am value 3 } }); } /** * 6. 让订阅者在接收到数据前作一些操做的操做符, * 例如来了消息以后,先保存数据,保存数据以后再显示,能够用该操做符 */ public void doOnNext() { Observable.just("1", "2").doOnNext(new Consumer<String>() { @Override public void accept(String s) throws Exception { //save 1 ,2 } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { } }); } /** * 7. 过滤操做符,取正确的值 */ public void filter() { Observable.just(1, 20, 65, -5, 7, 19) .filter(new Predicate<Integer>() { @Override public boolean test(@NonNull Integer integer) throws Exception { //过滤条件 return integer >= 10; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { //返回过滤以后的值,结果 //20,65,19 } }); } /** * 8.跳过多少个事件后开始接收 */ public void skip() { Observable.just(1, 2, 3, 4, 5) .skip(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //结果3,4,5 } }); } /** * 9.用于指定订阅者最多收到多少数据 */ public void take() { Flowable.fromArray(1, 2, 3, 4, 5) .take(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //结果3,4,5 } }); } /** * 10.延迟(间隔)执行,rxjava2中已经废弃,用11.interval代替 */ @Deprecated public void timer() { Observable.timer(2, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { } }); } /** * 11.间隔执行操做,默认在新线程 */ public void interval() { Disposable mDisposable;//销毁界面时注意取消订阅 //延迟3秒开始执行,每2秒执行一次 mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) // 因为interval默认在新线程,因此咱们应该切回主线程 .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { } }); //注意在activity的onDestroy()里取消订阅 if (mDisposable != null && !mDisposable.isDisposed()) { mDisposable.dispose(); } } /** * 12.Single只会接收一个参数,SingleObserver只会调用onError或者onSuccess */ public void single() { Single.just(new Random().nextInt()).subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(Integer integer) { } @Override public void onError(Throwable e) { } }); } /** * 13. 链接操做符,可接受Observable的可变参数,或者Observable的集合 */ public void concat() { Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //结果1,2,3,4,5,6 } }); } /** * 14.去重 */ public void distinct() { Observable.just(1, 1, 2, 2, 3, 4, 5, 6, 6) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //结果1,2,3,4,5,6 } }); } /** * 15.一次用一个方法处理一个值,能够有一个seed做为初始值 * (scan用法与reduce相同,区别是scan输出整个执行过程,reduce输出最后计算的值) */ public void reduce() { Observable.just(1, 2, 3) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { //integer + integer2用于求1+2+3+4+5+6 //integer*integer2 用于1*2*3*4*5*6 return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { //结果6 //若是是scan用法,则输出1,3,6 } }); } /** * 按照时间划分窗口,将数据发送给不一样的Observable */ public void window() { Observable.interval(600, TimeUnit.MILLISECONDS) // 间隔一秒发一次 .take(15) // 最多接收15个 .window(3, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(@NonNull Observable<Long> longObservable) throws Exception { //时间按3秒总体传递一次事件,该处为3秒执行一次 longObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { //该输出为再take范围内的,每个3秒传送过来的数据 } }); } }); } //1.若是有被压问题使用Flowable代替Observable //2.RxJava 2.x 新增Consumer,可自定义实现,accept 里面至关于本来的onNext }