RxJava是android的异步框架,官方介绍是可观测的序列,组成异步基于事件程序的库。特色是观察者模式,基于事件流的链式调用,随着异步操做调度过程复杂的状况下,程序逻辑也变得愈来愈复杂,但RxJava依然可以保持简洁。java
简单的说观察者A与被观察者B创建订阅关系,当被观察者B发生某种改变时,当即通知观察者Areact
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
注意各地方添加泛型避免大片警告,onNext()是事件的回调,onComplete()是事件的结尾。onComplete()与onError互斥须要保持惟一性,并只能调用一次。android
Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("消息1"); e.onNext("消息2"); e.onNext("消息3"); e.onComplete(); } });
建立观察者时回调的onSubscribe能够获取Disposable对象,在合适的时候判断条件,调用dispose()便可接触订阅关系缓存
Observer<String> observer=new Observer<String>() { @Override public void onSubscribe(Disposable d) { //经过判断解除订阅关系 d.dispose(); } @Override public void onNext(String o) { //对应observable的onNext方法 } @Override public void onError(Throwable e) { //对应observable的onError方法 } @Override public void onComplete() { //对应observable的onComplete方法 } };
observable.subscribeOn(Schedulers.io()) //指定事件生产在子线程 .observeOn(AndroidSchedulers.mainThread()) //指定事件消费在UI线程 .subscribe(observer);
//just模式,将自动发送onNext()事件 Observable<String> observable = Observable.just("发送消息"); //fromIterable模式,遍历集合,并自动发送onNext()事件 Observable<String> observable = Observable.fromIterable((Iterable<String>) mList); //interval模式,定时自动发送整数序列,从0开始每隔2秒计数, Observable<Long> observable = Observable.interval(0,2, TimeUnit.SECONDS) //range模式,自动发送特定的整数序列,0表示不发送,负数会抛异常,从1开始发送到20 Observable<Integer> observable = Observable.range(1,20); //timer模式,定时执行观察者的onNext()方法 Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);
如建立操做,数据过滤操做,条件操做,转载如下博客,很详细:框架
Schedulers.immediate() 默认模式,在当前线程运行异步
Schedulers.newThread() 建立新的子线程运行ide
Schedulers.io() 建立新的子线程运行,内部使用的是无上限的线程池,可重用空闲的线程,效率高spa
AndroidSchedulers.mainThread() 在UI主线程运行线程
subscribeOn() 指定Observable(被观察者)所在的线程,或者叫作事件产生的线程code
observeOn() 指定 Observer(观察者)所运行在的线程,或者叫作事件消费的线程
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("hello RxJava!"); e.onComplete(); } },BackpressureStrategy.BUFFER);//增长背压模式
onSubscribe()会返回Subscription对象,调用cancel()便可取消订阅关系,request()便可指定消费事件的数量
Subscriber<String> subscriber=new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(String s) { Log.i("RxJava", "onNext: "+s); } @Override public void onError(Throwable t) { Log.i("RxJava", "onError"); } @Override public void onComplete() { Log.i("RxJava", "onComplete"); } }; flowable.subscribe(subscriber);//创建订阅关系
若是生产者和消费者不在同一线程的状况下,若是生产者的速度大于消费者的速度,就会产生Backpressure问题。即异步状况下,Backpressure问题才会存在。
当消费者处理不了事件,就丢弃。
消费者经过request()传入其需求n,而后生产者把n个事件传递给消费者供其消费。其余消费不掉的事件就丢掉
LATEST与DROP功能基本一致,惟一的区别就是LATEST总能使消费者可以接收到生产者产生的最后一个事件
这种方式会在产生Backpressure问题的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException