本篇文章已受权微信公众号 YYGeeker
独家发布转载请标明出处java
CSDN学院课程地址react
- RxJava2从入门到精通-初级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-中级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-进阶篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-源码分析篇:edu.csdn.net/course/deta…
在RxJava使用以前记得在Gradle中添加依赖引入android
implementation "io.reactivex.rxjava2:rxjava:2.1.12"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
复制代码
在上面的例子中,咱们能够注意到被观察者中有个Emitter(发射器),发射器的位置位于subscribe回调参数ObservableEmitter<String> e
中,经过名字咱们能够知道,RxJava的事件通知就是经过它来进行发送的,因此它是一个事件发射器,发射器能发送的事件有onNext()
,onComplete()
,onError()
,在观察者的回调中,分别对应着相同方法名进行回调,这里对观察者的回调方法进行简单介绍编程
人类就喜欢酷炫的东西,炫耀自身的优势,固然RxJava也少不了人类这种心理,就是链式编程,下面这段代码能够完美替代例子上面的全部代码,其效果是和上面同样的。这里须要注意的是,建立Observer过程当中,会将全部的接收方法都建立出来,若是此时程序发生异常,RxJava默认会将异常信息try-catch缓存
public static void main(String[] args) {
//建立被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
//默认在主线程里执行该方法
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
})
//建立观察者并订阅
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
复制代码
长此以往,人类喜欢简洁,喜欢定制服务,巧了,RxJava也给你知足了,下面这段代码中,实现的方法跟上面的实现方法是对应起来的,你们看参数就知道哪一个对应哪一个了,你能够经过建立Consumer,不须要实现的方法你能够不写,看上去更简洁,这里我为了方便你们看,都new出来了,Consumer就是消费者的意思,能够理解为消费了onNext等等等事件。这里须要注意的是,建立Consumer过程当中,若是没有将第二个Throwable的Consumer建立出来,若是此时程序发生异常,程序将会崩溃bash
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//对应onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//对应onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//对应onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//对应onSubscribe()
}
});
}
复制代码
固然Rxjava的使用不单单这么简单的事件发送,他还能完成一些业务上的逻辑。好比注册登陆操做,正常的逻辑是经过注册去获取用户的Token,而后经过Token进行登陆,这个过程涉及到注册须要在子线程去进行网络请求,而后在UI线程中更新界面提示,而后再切换到子线程进行登陆操做,最后又得切换到UI线程去更新界面,这一系列的操做,也是能够经过RxJava的线程切换来进行实现,在RxJava中的线程切换特别简单,只要下面这两句话就能自由的在子线程和UI线程中自由切换微信
public static void main(String[] args) {
//建立被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
//默认在主线程里执行该方法
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
})
//将被观察者切换到子线程
.subscribeOn(Schedulers.io())
//将观察者切换到主线程 须要在Android环境下运行
.observeOn(AndroidSchedulers.mainThread())
//建立观察者并订阅
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
复制代码
说到线程切换,就必须不得不说的是RxJava的线程调度器,其调度器就是Schedulers,在调度器中封装了各式各样的线程提供给咱们使用,下面举例其现有的调度器列表网络
调度器类型 | 效果 |
---|---|
Schedulers.computation() | 用于计算任务,如事件循环或和回调处理,不要用于IO操做(IO操做请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor做为调度器 |
Schedulers.immediate() | 在当前线程当即开始执行任务 |
Schedulers.io() | 用于IO密集型任务,如异步阻塞IO操做,这个调度器的线程池会根据须要增加;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread() | 为每一个任务建立一个新线程 |
Schedulers.trampoline() | 当其它排队的任务完成后,在当前线程排队开始执行 |
RxJava事件发出去并非置之不顾,要有合理的管理者来管理它们,在合适的时机要进行释放事件,这样才不会致使内存泄漏,这里的管理者咱们称为事件调度器(或事件管理者)CompositeDisposable
。Rxjava的事件流发出去后,会返回Disposable
类型的对象,咱们能够将该对象添加到事件调度器上,而后进行相关操做,这里的事件调度器咱们能够简单的理解为事件的容器异步
public class Main {
private static CompositeDisposable mRxEvent = new CompositeDisposable();
public static void main(String[] args) {
Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//对应onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//对应onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//对应onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//对应onSubscribe()
}
});
mRxEvent.add(subscribe);
mRxEvent.clear();
}
}
复制代码
CompositeDisposable
提供的方法中,都是对事件的管理ide
RxJava的事件发射分为冷与热,一个"热"的Observable可能一建立完就开始发射数据,所以全部后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,所以这个观察者能够确保会收到整个数据序列
RxJava能够简单的归结为三步骤