主讲人:阳石柏java
RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库),这就是RxJava。然而对于刚接触Rxjava的人来讲,这种归纳显然变得晦涩难懂,那么RxJava究竟是什么?其实初学RxJava只要把握两点:观察者模式和异步,就基本能够熟练使用RxJava了。react
异步在这里并不须要作太多的解释,由于在概念和使用上,并无太多高深的东西。大概就是你脑子里想能到的那些多线程,线程切换这些东西。我会在后面会讲解它的用法。咱们先把观察者模式说清楚,“按下开关,台灯灯亮”api
观察上图,其实已经很明了了,不过须要指出一下几点(对于下面理解RxJava很重要):多线程
这三点对于咱们理解RxJava很是重要。由于上述三条分别对应了RxJava中被观察者(Observable),观察者(Observer)和操做符的职能。而观察者模式又是RxJava程序运行的骨架。RxJava也是基于观察者模式来组建本身的程序逻辑的,就是构建被观察者(Observable),观察者(Observer/Subscriber),而后创建两者的订阅关系(就像那根电线,链接起台灯和开关)实现观察,在事件传递过程当中还能够对事件作各类处理。架构
Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){ @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("On"); subscriber.onNext("Off"); subscriber.onNext("On"); subscriber.onNext("On"); subscriber.onCompleted(); } });
这是最正宗的写法,建立了一个开关类,产生了五个事件,分别是:开,关,开,开,结束。异步
Observable switcher=Observable.just("On","Off","On","On");
String [] kk={"On","Off","On","On"}; Observable switcher=Observable.from(kk);
Subscriber light=new Subscriber<String>() { @Override public void onCompleted() { //被观察者的onCompleted()事件会走到这里; Log.d("DDDDDD","结束观察...\n"; } @Override public void onError(Throwable e) { //出现错误会调用这个方法 } @Override public void onNext(String s) { //处理传过来的onNext事件 Log.d("DDDDD","handle this---"+s) }
Action1 light =new Action1<String>() { @Override public void call(String s) { Log.d("DDDDD","handle this---"+s) } }
switcher.subscribe(light); 这里有不少初学者的疑问就是明明是台灯观察开关,正常来看就应是light.subscribe(switcher)才对,之因此“开关订阅台灯”,是为了保证流式API调用风格
//这就是RxJava的流式API调用 Observable.just("On","Off","On","On") //在传递过程当中对事件进行过滤操做 .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s!=null; } }) .subscribe(mSubscriber);
因为被观察者产生事件,是事件的起点,那么开头就是用Observable这个主体调用来建立被观察者,产生事件,为了保证流式API调用规则,就直接让Observable做为惟一的调用主体,一路调用下去。async
一句话,背后的真实的逻辑依然是台灯订阅了开关,可是在表面上,咱们让开关“伪装”订阅了台灯,以便于保持流式API调用风格不变。ide
好了,如今分解动做都完成了,已经架构了一个基本的RxJava事件处理流程。学习
咱们再来按照观察者模式的运做流程和流式Api的写法复习一遍:this
结合流程图的相应代码实例以下:
//建立被观察者,是事件传递的起点 Observable.just("On","Off","On","On") //这就是在传递过程当中对事件进行过滤操做 .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s!=null; } }) //实现订阅 .subscribe( //建立观察者,做为事件传递的终点处理事件 new Subscriber<String>() { @Override public void onCompleted() { Log.d("DDDDDD","结束观察...\n"); } @Override public void onError(Throwable e) { //出现错误会调用这个方法 } @Override public void onNext(String s) { //处理事件 Log.d("DDDDD","handle this---"+s) } );
1.建立被观察者,产生事件
2.设置事件传递过程当中的过滤,合并,变换等加工操做。
3.订阅一个观察者对象,实现事件最终的处理。
Tips: 当调用订阅操做(即调用Observable.subscribe()方法)的时候,被观察者才真正开始发出事件。
好比被观察者产生的事件中只有图片文件路径;,可是在观察者这里只想要bitmap,那么就须要类型变换。
Observable.just(getFilePath())
//指定了被观察者执行的线程环境
.subscribeOn(Schedulers.newThread())
//将接下来执行的线程环境指定为io线程
.observeOn(Schedulers.io())
//使用map操做来完成类型转换
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//显然自定义的createBitmapFromPath(s)方法,是一个极其耗时的操做
return createBitmapFromPath(s);
}
})
//将后面执行的线程环境切换为主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
//建立观察者,做为事件传递的终点处理事件
new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
Log.d("DDDDDD","结束观察...\n");
}
@Override
public void onError(Throwable e) {
//出现错误会调用这个方法
}
@Override
public void onNext(Bitmap s) {
//处理事件
showBitmap(s)
}
);
实际上在使用map操做时,new Func1() 就对应了类型的转变的方向,String是原类型,Bitmap是转换后的类型。在call()方法中,输入的是原类型,返回转换后的类型而进行读取文件,建立bitmap多是一个耗时操做,那么就应该在子线程中执行,主线程应该仅仅作展现。那么线程切换通常就会是比较复杂的事情了。可是在Rxjava中,是很是方便的。
异步是相对于主线程来说的子线程操做,在这里咱们不妨使用线程调度这个概念更加贴切。
首先介绍一下RxJava的线程环境有哪些选项:
使用代码解释更加简洁方便:
//new Observable.just()执行在新线程 Observable.just(getFilePath()) //指定在新线程中建立被观察者 .subscribeOn(Schedulers.newThread()) //将接下来执行的线程环境指定为io线程 .observeOn(Schedulers.io()) //map就处在io线程 .map(mMapOperater) //将后面执行的线程环境切换为主线程, //可是这一句依然执行在io线程 .observeOn(AndroidSchedulers.mainThread()) //指定线程无效,但这句代码自己执行在主线程 .subscribeOn(Schedulers.io()) //执行在主线程 .subscribe(mSubscriber);
1.subscribeOn()它指示Observable在一个指定的调度器上建立(只做用于被观察者建立阶段)。只能指定一次,若是指定屡次则以第一次为准
2.observeOn()指定在事件传递(加工变换)和最终被处理(观察者)的发生在哪个调度器。可指定屡次,每次指定完都在下一步生效。
RxJava是一个观察者模式的架构,当这个架构中被观察者(Observable)和观察者(Subscriber)处在不一样的线程环境中时,因为者各自的工做量不同,致使它们产生事件和处理事件的速度不同,这就会出现两种状况:
1.被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件,(比如观察者在等米下锅,程序等待,这没有问题)。
2.被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,若是不做处理的话,事件会堆积起来,最终挤爆你的内存,致使程序崩溃。(比如被观察者生产的大米没人吃,堆积最后就会烂掉)。 下面咱们用代码演示一下这种崩溃的场景:
//被观察者在主线程中,每1ms发送一个事件 Observable.interval(1, TimeUnit.MILLISECONDS) //将观察者的工做放在新线程环境中 .observeOn(Schedulers.newThread()) //观察者处理每1000ms才处理一个事件 .subscribe(new Action1() { @Override public void call(Long aLong) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("TAG","---->"+aLong); } });
这段代码运行以后:
... Caused by: rx.exceptions.MissingBackpressureException ... ...
抛出MissingBackpressureException每每就是由于,被观察者发送事件的速度太快,而观察者处理太慢,并且你尚未作相应措施,因此报异常.
简而言之:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的状况下,一种告诉上游的被观察者下降发送速度的策略,即背压是流速控制的一种策略。
有两点要强调的是:
1.背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不一样的线程环境中。
2.背压(Backpressure)并非一个像flatMap同样能够在程序中直接使用的操做符,他只是一种控制事件流速的策略。
那么咱们再回看上面的程序异常就很好理解了,就是当被观察者发送事件速度过快的状况下,咱们没有作流速控制,致使了异常。
由以前咱们能够了解到,在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据。其结构示意图以下:
总的来讲:
1.背压是一种策略,具体措施是下游观察者通知上游的被观察者发送事件
2.背压策略很好的解决了异步环境下被观察者和观察者速度不一致的问题
3.在RxJava1.X中,一样是Observable,有的不支持背压策略,致使某些状况下,显得特别麻烦,出了问题也很难排查,使得RxJava的学习曲线变得十份陡峭。
首先要强调的一点是,RxJava以观察者模式为骨架,在2.0中依然如此。
不过这次更新中,出现了两种观察者模式:
Observable(被观察者)/Observer(观察者)
Flowable(被观察者)/Subscriber(观察者)
Observable正经常使用法: Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } }); Observer mObserver=new Observer<Integer>() { //这是新加入的方法,在订阅后发送数据以前, //回首先调用这个方法,而Disposable可用于取消订阅 @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; mObservable.subscribe(mObserver);
Flowable.range(0,10) .subscribe(new Subscriber<Integer>() { Subscription sub; //当订阅后,会首先调用这个方法,其实就至关于onStart(), //传入的Subscription s参数能够用于请求数据或者取消订阅 @Override public void onSubscribe(Subscription s) { Log.w("TAG","onsubscribe start"); sub=s; sub.request(1); Log.w("TAG","onsubscribe end"); } @Override public void onNext(Integer o) { Log.w("TAG","onNext--->"+o); sub.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { Log.w("TAG","onComplete"); } });
输出以下:
onsubscribe start
onNext—>0
onNext—>1
onNext—>2
…
onNext—>9
onComplete
onsubscribe end
这一块其实能够说没什么改动,大部分以前你用过的操做符都没变,即便有所变更,也只是包名或类名的改动。你们可能常常用到的就是Action和Function。
在1.0中,这类接口是从Action0,Action1…日后排(数字表明可接受的参数),如今作出了改动
Rx1.0———–Rx2.0
Action0——–Action
Action1——–Consumer
Action2——–BiConsumer
后面的Action都去掉了,只保留了ActionN
同上,也是命名方式的改变
上面那两个类,和RxJava1.0相比,他们都增长了throws Exception,也就是说,在这些方法作某些操做就不须要try-catch。
例如:
Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
Files.readLines(name)这类io方法原本是须要try-catch的,如今直接抛出异常,就能够放心的使用lambda ,保证代码的简洁优美。