RxJava接触过蛮长时间了,可是让我说个因此然来仍是说不出来,归根结底仍是仍是理解不够深入,趁着年末这个时候争取写个系列出来给本身的学习作个记录ide
注意区分RxJava1.0和2.0的区别,如下默认是在2.0的基础上作的测试学习
先来理解几个概念:测试
一、Observable : 字面意思可观察的,被观察者,也就是事件的发生者spa
二、Observer:观察者,也就是事件的接受者设计
三、subscribe():二者产生订阅关系,须要注意一点的是 observable.subscribe(observer),感受像是被观察者订阅了观察者,与常理不符,为何这么设计呢?我估计是为了链式调用吧。。code
1、最简单的使用方式:server
1 Observable.create(new ObservableOnSubscribe<Integer>() { 2 @Override 3 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 4 e.onNext(1); 5 e.onNext(2); 6 e.onNext(3); 7 e.onComplete(); 8 } 9 }).subscribe(new Observer<Integer>() { 10 11 @Override 12 public void onSubscribe(Disposable d) { 13 Log.i(TAG, "onSubscribe: "); 14 } 15 16 @Override 17 public void onNext(Integer integer) { 18 Log.i(TAG, "onNext: "+integer); 19 } 20 21 @Override 22 public void onError(Throwable e) { 23 Log.i(TAG, "onError: "+e.getMessage()); 24 } 25 26 @Override 27 public void onComplete() { 28 Log.i(TAG, "onComplete: complete"); 29 } 30 });
一、onNext()能够屡次发送事件,onComplete()发送一次,屡次调用不会报错,onError()发送一次,屡次调用会报错,不可和onComplete()共存blog
二、调用onComplete()或者onError()后,观察者也没法接受到onNext()事件
三、Disposable(2.0新增),当调用了dispose()后,观察者就没法接受到事件了ip
2、Cold Observable和Hot Observable
Cold Observable:只有当订阅者订阅的时候,数据流才开始发送,而且每一个订阅者订阅的时候都会独立执行一遍数据流的发送(create(),just()....)
Hot Observable :无论有没有订阅者订阅,一旦建立,就开始发送数据流
publish转化:
1 ConnectableObservable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).publish();//转化成Cold Observable 2 ob.connect();//开始发送数据流
若是不调用connnect(),不会发送数据流,一旦调用,就会建立一个subscription并订阅到原Observable,将接受的数据转发给订阅者。
connect()与disConnect()
1.0 connect() 返回Subscription
2.0 connect() 返回Disposable
//注意区分要释放哪一个 //释放s,则表明中断数据传输,再次链接则从新发送数据 //释放d1或者d2,则表明取消注册,数据已然在传输 public void doSubscribe(View v){ s= ob.connect(); //public final void subscribe(Observer<? super T> observer) {} 无返回值,没法取消注册 d1= ob.subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i(TAG, "onNext: first============"+aLong); } }); d2=ob.subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i(TAG, "accept: second=========="+aLong); } }); }
RefCount
Observable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
若是有订阅者就会发送数据流,无订阅数据流即中止,再次订阅从新开始发送(可能会和Cold Observable混淆,注意此处每一个订阅者接受到的数据是相同的)
Reply
ConnectableObservable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).replay();
ob.connect();
当和源 Observable 连接后,开始收集数据。当有 Observer 订阅的时候,就把收集到的数据线发给 Observer。而后和其余 Observer 同时接受数据
能够同时设置收集数据的个数及时间
Cache
Observable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).take(5).cache();//只有当订阅者订阅后才开始发送数据
与Reply相似,订阅者所有取消后也不会中止发送。