就在前不久作了一个关于RXJava的相关教学视频,事后整理了关于RxJava的预习资料和相关内容以及图文和相关源码,须要借鉴的能够和我联系~java
承接上文:我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?(一)
源码和面试大全PDF
(VX:mm14525201314)git
在RxJava
中,一个实现了Observer接口的对象能够订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列做出响应。这种模式简化了并发操做,由于它不须要阻塞等待Observable发射数据,而是建立了一个处于待命状态的观察者哨兵,哨兵在将来某个时刻响应Observable的通知。github
####Single面试
RxJava(以及它派生出来的RxGroovy
和RxScala
)中有一个名为Single的Observable变种。Single相似于Observable,不一样的是,它老是只发射一个值,或者一个错误通知,而不是发射一系列的值。缓存
所以,不一样于Observable须要三个方法onNext, onError, onCompleted
,订阅Single只须要两个方法:多线程
onSuccess
- Single发射单个的值到这个方法onError
- 若是没法发射须要的值,Single发射一个Throwable
对象到这个方法Single只会调用这两个方法中的一个,并且只会调用一次,调用了任何一个方法以后,订阅关系终止并发
Single也能够组合使用多种操做,一些操做符让你能够混合使用Observable和Single:ide
Subject能够当作是一个桥梁或者代理,在某些ReactiveX
实现中(如RxJava
),它同时充当了Observer
和Observable
的角色。由于它是一个Observer
,它能够订阅一个或多个Observable
;又由于它是一个Observable
,它能够转发它收到(Observe)的数据,也能够发射新的数据。测试
因为一个Subject
订阅一个Observable
,它能够触发这个Observable
开始发射数据(若是那个Observable
是"冷"的--就是说,它等待有订阅才开始发射数据)。所以有这样的效果,Subject能够把原来那个"冷"的Observable变成"热"的。this
针对不一样的场景一共有四种类型的Subject。他们并非在全部的实现中所有都存在,并且一些实现使用其它的命名约定(例如,在RxScala
中Subject被称做PublishSubject
)。
一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(若是原始Observable没有发射任何值,AsyncObject
也不发射任何值)它会把这最后一个值发射给任何后续的观察者。
然而,若是原始的Observable
由于发生了错误而终止,AsyncSubject
将不会发射任何数据,只是简单的向前传递这个错误通知
当观察者订阅BehaviorSubject
时,它开始发射原始Observable最近发射的数据(若是此时尚未收到任何数据,它会发射一个默认值),而后继续发射其它任何来自原始Observable的数据。
然而,若是原始的Observable由于发生了一个错误而终止,BehaviorSubject
将不会发射任何数据,只是简单的向前传递这个错误通知
PublishSubject
只会把在订阅发生的时间点以后来自原始Observable的数据发射给观察者。须要注意的是,PublishSubject
可能会一建立完成就马上开始发射数据(除非你能够阻止它发生),所以这里有一个风险:在Subject被建立后到有观察者订阅它以前这个时间段内,一个或多个数据可能会丢失。若是要确保来自原始Observable的全部数据都被分发,你须要这样作:或者使用Create建立那个Observable以便手动给它引入"冷"Observable的行为(当全部观察者都已经订阅时才开始发射数据),或者改用ReplaySubject
。
若是原始的Observable由于发生了一个错误而终止,PublishSubject
将不会发射任何数据,只是简单的向前传递这个错误通知。
ReplaySubject
会发射全部来自原始Observable的数据给观察者,不管它们是什么时候订阅的。也有其它版本的ReplaySubject
,在重放缓存增加到必定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
若是你把ReplaySubject
看成一个观察者使用,注意不要从多个线程中调用它的onNext
方法(包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议,给Subject的结果增长了不肯定性。
假设你有一个Subject,你想把它传递给其它的代理或者暴露它的Subscriber接口,你能够调用它的asObservable
方法,这个方法返回一个Observable。具体使用方法能够参考Javadoc文档。
若是你把 Subject 看成一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议,给Subject的结果增长了不肯定性。
要避免此类问题,你能够将 Subject 转换为一个 SerializedSubject
,相似于这样:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
若是你想给Observable操做符链添加多线程功能,你能够指定操做符(或者特定的Observable)在特定的调度器(Scheduler)上执行。
某些ReactiveX
的Observable操做符有一些变体,它们能够接受一个Scheduler参数。这个参数指定操做符将它们的部分或所有任务放在一个特定的调度器上执行。
使用ObserveOn
和SubscribeOn
操做符,你可让Observable在一个特定的调度器上执行,ObserveOn
指示一个Observable在一个特定的调度器上调用观察者的onNext
, onError
和onCompleted
方法,SubscribeOn
更进一步,它指示Observable将所有的处理过程(包括发射数据和通知)放在特定的调度器上执行。
下表展现了RxJava中可用的调度器种类:
在RxJava中,某些Observable操做符的变体容许你设置用于操做执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操做符的默认调度器:
除了将这些调度器传递给RxJava
的Observable
操做符,你也能够用它们调度你本身的任务。下面的示例展现了Scheduler.Worker
的用法:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
要调度递归的方法调用,你可使用schedule
,而后再用schedule(this)
,示例:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,因此你能够在订阅取消时中止任务,或者从正在调度的任务内部取消订阅,示例:
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
Worker
同时是Subscription
,所以你能够(一般也应该)调用它的unsubscribe
方法通知能够挂起任务和释放资源了。
你可使用schedule(action,delayTime,timeUnit)
在指定的调度器上延时执行你的任务,下面例子中的任务将在500毫秒以后开始执行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
使用另外一个版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)
方法让你能够安排一个按期执行的任务,下面例子的任务将在500毫秒以后执行,而后每250毫秒执行一次:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler让你能够对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试颇有用处。这个调度器有三个额外的方法:
advanceTimeTo(time,unit)
向前波动调度器的时钟到一个指定的时间点advanceTimeBy(time,unit)
将调度器的时钟向前拨动一个指定的时间段triggerActions( )
开始执行任何计划中的可是未启动的任务,若是它们的计划时间等于或者早于调度器时钟的当前时间(顺手留下GitHub连接,须要获取相关面试等内容的能够本身去找)
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)