RXjava解析(二)我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?

就在前不久作了一个关于RXJava的相关教学视频,事后整理了关于RxJava的预习资料和相关内容以及图文和相关源码,须要借鉴的能够和我联系~java

承接上文:我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)
源码和面试大全PDF
(VX:mm14525201314)
RXjava解析(二)我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?git

RxJava

RxJava中,一个实现了Observer接口的对象能够订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列做出响应。这种模式简化了并发操做,由于它不须要阻塞等待Observable发射数据,而是建立了一个处于待命状态的观察者哨兵,哨兵在将来某个时刻响应Observable的通知。github

####Single面试

介绍

RxJava(以及它派生出来的RxGroovyRxScala)中有一个名为Single的Observable变种。Single相似于Observable,不一样的是,它老是只发射一个值,或者一个错误通知,而不是发射一系列的值。缓存

所以,不一样于Observable须要三个方法onNext, onError, onCompleted,订阅Single只须要两个方法:多线程

  • onSuccess- Single发射单个的值到这个方法
  • onError - 若是没法发射须要的值,Single发射一个Throwable对象到这个方法

Single只会调用这两个方法中的一个,并且只会调用一次,调用了任何一个方法以后,订阅关系终止并发

Single的操做符

Single也能够组合使用多种操做,一些操做符让你能够混合使用Observable和Single:
RXjava解析(二)我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?ide

Subject

Subject能够当作是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了ObserverObservable的角色。由于它是一个Observer,它能够订阅一个或多个Observable;又由于它是一个Observable,它能够转发它收到(Observe)的数据,也能够发射新的数据。测试

因为一个Subject订阅一个Observable,它能够触发这个Observable开始发射数据(若是那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。所以有这样的效果,Subject能够把原来那个"冷"的Observable变成"热"的。this

Subject的种类

针对不一样的场景一共有四种类型的Subject。他们并非在全部的实现中所有都存在,并且一些实现使用其它的命名约定(例如,在RxScala中Subject被称做PublishSubject)。

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(若是原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。

然而,若是原始的Observable由于发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知

BehaviorSubject

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(若是此时尚未收到任何数据,它会发射一个默认值),而后继续发射其它任何来自原始Observable的数据。

然而,若是原始的Observable由于发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知

PublishSubject

PublishSubject只会把在订阅发生的时间点以后来自原始Observable的数据发射给观察者。须要注意的是,PublishSubject可能会一建立完成就马上开始发射数据(除非你能够阻止它发生),所以这里有一个风险:在Subject被建立后到有观察者订阅它以前这个时间段内,一个或多个数据可能会丢失。若是要确保来自原始Observable的全部数据都被分发,你须要这样作:或者使用Create建立那个Observable以便手动给它引入"冷"Observable的行为(当全部观察者都已经订阅时才开始发射数据),或者改用ReplaySubject

若是原始的Observable由于发生了一个错误而终止,PublishSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

ReplaySubject

ReplaySubject会发射全部来自原始Observable的数据给观察者,不管它们是什么时候订阅的。也有其它版本的ReplaySubject,在重放缓存增加到必定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。

若是你把ReplaySubject看成一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议,给Subject的结果增长了不肯定性。

RxJava的对应类

假设你有一个Subject,你想把它传递给其它的代理或者暴露它的Subscriber接口,你能够调用它的asObservable方法,这个方法返回一个Observable。具体使用方法能够参考Javadoc文档。

串行化

若是你把 Subject 看成一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议,给Subject的结果增长了不肯定性。

要避免此类问题,你能够将 Subject 转换为一个 SerializedSubject,相似于这样:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

调度器 Scheduler

若是你想给Observable操做符链添加多线程功能,你能够指定操做符(或者特定的Observable)在特定的调度器(Scheduler)上执行。

某些ReactiveX的Observable操做符有一些变体,它们能够接受一个Scheduler参数。这个参数指定操做符将它们的部分或所有任务放在一个特定的调度器上执行。

使用ObserveOnSubscribeOn操做符,你可让Observable在一个特定的调度器上执行,ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onErroronCompleted方法,SubscribeOn更进一步,它指示Observable将所有的处理过程(包括发射数据和通知)放在特定的调度器上执行。

RxJava示例

调度器的种类

下表展现了RxJava中可用的调度器种类:
RXjava解析(二)我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?

默认调度器

在RxJava中,某些Observable操做符的变体容许你设置用于操做执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操做符的默认调度器:
RXjava解析(二)我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?

使用调度器

除了将这些调度器传递给RxJavaObservable操做符,你也能够用它们调度你本身的任务。下面的示例展现了Scheduler.Worker的用法:

worker = Schedulers.newThread().createWorker();
  worker.schedule(new Action0() {
    @Override
    public void call() {
      yourWork();
   }
  });
  // some time later...
  worker.unsubscribe();

递归调度器

要调度递归的方法调用,你可使用schedule,而后再用schedule(this),示例:

worker = Schedulers.newThread().createWorker();
  worker.schedule(new Action0() {
    @Override
    public void call() {
      yourWork();
      // recurse until unsubscribed (schedule will do nothing if unsubscribed)
      worker.schedule(this);
   }
  });
  // some time later...
  worker.unsubscribe();

检查或设置取消订阅状态

Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,因此你能够在订阅取消时中止任务,或者从正在调度的任务内部取消订阅,示例:

Worker worker = Schedulers.newThread().createWorker();
  Subscription mySubscription = worker.schedule(new Action0() {

       @Override
       public void call() {
           while(!worker.isUnsubscribed()) {
               status = yourWork();
               if(QUIT == status) { worker.unsubscribe(); }

           }
       }
  });

Worker同时是Subscription,所以你能够(一般也应该)调用它的unsubscribe方法通知能够挂起任务和释放资源了。

延时和周期调度器

你可使用schedule(action,delayTime,timeUnit)在指定的调度器上延时执行你的任务,下面例子中的任务将在500毫秒以后开始执行:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

使用另外一个版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法让你能够安排一个按期执行的任务,下面例子的任务将在500毫秒以后执行,而后每250毫秒执行一次:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

测试调度器

TestScheduler让你能够对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试颇有用处。这个调度器有三个额外的方法:

  • advanceTimeTo(time,unit) 向前波动调度器的时钟到一个指定的时间点
  • advanceTimeBy(time,unit) 将调度器的时钟向前拨动一个指定的时间段
  • triggerActions( )开始执行任何计划中的可是未启动的任务,若是它们的计划时间等于或者早于调度器时钟的当前时间

顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)

相关文章
相关标签/搜索