1、Rx介绍html
1. 能够把Observable当作Iterable的推送方式的等价物。java
2. Observable类型给GOF的观察者模式添加了两种缺乏的语义,这样就和Iterable类型中可用的操做一致了:react
有了这两种功能,Rx就能使Observable与Iterable保持一致了,惟一的不一样是数据流的方向。任何对Iterable的操做,你均可以对Observable使用。android
2、Observablegit
1. 在异步模型中流程更像这样的:github
2. 取消订阅的结果会传递给这个Observable的操做符链,并且会致使这个链条上的每一个环节都中止发射数据项。这些并不保证会当即发生,然而,对一个Observable来讲,即便没有观察者了,它也能够在一个while循环中继续生成并尝试发射数据项。数据库
3. Observable何时开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一建立完就开始发射数据,所以全部后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,所以这个观察者能够确保会收到整个数据序列。编程
3、Single缓存
1. Single相似于Observable,不一样的是,它老是只发射一个值,或者一个错误通知,而不是发射一系列的值。网络
所以,不一样于Observable须要三个方法onNext, onError, onCompleted,订阅Single只须要两个方法:
2. Single只会调用这两个方法中的一个,并且只会调用一次,调用了任何一个方法以后,订阅关系终止。
4、Subject
1. Subject能够当作是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。
因为一个Observable订阅一个Observable,它能够触发这个Observable开始发射数据(若是那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。所以有这样的效果,Subject能够把原来那个"冷"的Observable变成"热"的。
1. Schedulers.immediate():
直接在当前线程运行,至关于不指定线程。这是默认的 Scheduler。
2. Schedulers.newThread():
老是启用新线程,并在新线程执行操做。
3. Schedulers.io( ):
用于IO密集型任务,如异步阻塞IO操做,这个调度器的线程池会根据须要增加;对于普通的计算任务,请使用Schedulers.computation();
Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。
Schedulers.io(): I/O 操做(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。
行为模式和 newThread() 差很少,区别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空闲的线程,所以多数状况下 io() 比 newThread() 更有效率。
不要把计算工做放在 io() 中,能够避免建立没必要要的线程。
4. Schedulers.computation():
计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。
这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。
不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU。
6. Schedulers.trampoline( ):
当其它排队的任务完成后,在当前线程排队开始执行。
7. 能够用Scheduler.Worker调度你本身的任务:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {...});
8. Worker同时是Subscription,所以你能够(一般也应该)调用它的unsubscribe方法通知能够挂起任务和释放资源了:
worker.unsubscribe();
9. 延时和周期调度器:
你可使用Worker.schedule(action,delayTime,timeUnit)在指定的调度器上延时执行你的任务;
Worker.schedulePeriodically(action,initialDelay,period,timeUnit)方法让你能够安排一个按期执行的任务。
1. FlatMap:能够认为是一个将嵌套的数据结构展开的过程。
FlatMap
对这些Observables发射的数据作的是合并(merge
)操做,所以它们多是交错的。
2. Map:实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项。
3. Scan:
Scan
操做符对原始Observable发射的第一项数据应用一个函数,而后将那个函数的结果做为本身的第一项数据发射。它将函数的结果同第二项数据一块儿填充给这个函数来产生它本身的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操做符在某些状况下被叫作accumulator(累加器)
。
4. Do:
doOnEach,doOnNext,doOnSubscribe,doOnUnsubscribe,doOnCompleted,doOnError,doOnTerminate,finallyDo;
5. ObserveOn:指定观察者观察Observable的调度程序(工做线程);
6. SubscribeOn:指定Observable应该在哪一个调度程序上执行;当使用了多个
subscribeOn()
的时候,只有第一个 subscribeOn()
起做用。
7. Subscribe:收到Observable发射的数据和通知后执行的操做;
8. Connect:指示一个可链接的Observable开始发射数据给订阅者;
9. Publish:将一个普通的Observable转换为可链接的;
10. RefCount:使一个可链接的Observable表现得像一个普通的Observable;
11. Replay:确保全部的观察者收到一样的数据序列,即便他们在Observable开始发射数据以后才订阅;
12. To:将Observable或者Observable发射的数据序列转换为另外一个对象或数据结构;
13. Blocking:
BlockingObservable
的方法不是将一个Observable变换为另外一个,也不是过滤Observables,它们会打断Observable的调用链,会阻塞等待直到Observable发射了想要的数据,而后返回这个数据(而不是一个Observable);
forEach( ),
first( ),
last( ),
getIterator( )
14. Unsubscribe:这个方法很重要,由于在subscribe()
以后, Observable
会持有 Subscriber
的引用,这个引用若是不能及时被释放,将有内存泄露的风险。因此最好保持一个原则:要在再也不使用的时候尽快在合适的地方(例如 onPause()
onStop()
等方法中)调用 unsubscribe()
来解除引用关系,以免内存泄露的发生。
15. doOnSubscribe():而与 Subscriber.onStart()
相对应的,有一个方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
一样是在 subscribe()
调用后并且在事件发送前执行,但区别在于它能够指定线程。默认状况下, doOnSubscribe()
执行在 subscribe()
发生的线程;而若是在 doOnSubscribe()
以后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程;
16. 根据响应式函数编程的概念,Subscribers更应该作的事情是“响应”,响应Observable发出的事件,而不是去修改。Subscribers越轻量越好;
17. defer:只有当订阅者订阅才建立Observable;为每一个订阅建立一个新的Observable;使用defer()来包装缓慢的代码(用到才生成);
18. cache
:记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者;cache() (或者 replay())会继续执行网络请求(并记住数据序列)(甚至你调用了unsubscribe也不会中止);
19. firstOrDefault:firstOrDefault
与first
相似,可是在Observagle没有发射任何数据时发射一个你在参数中指定的默认值。
20. toList:一般,发射多项数据的Observable会为每一项数据调用onNext
方法。你能够用toList
操做符改变这个行为,让Observable将多项数据组合成一个List
,而后调用一次onNext
方法传递整个列表。
21. onErrorResumeNext:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError
调用,不会将错误传递给观察者,做为替代,它会开始镜像另外一个,备用的Observable。
22. onErrorReturn:返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError
调用,不会将错误传递给观察者,做为替代,它会发射一个特殊的项并调用观察者的onComleted
方法。
23. concatMap:相似于最简单版本的flatMap
,可是它按次序链接而不是合并那些生成的Observables,而后产生本身的数据序列。
Further Reading:
1、Deferring Observable code until subscription in RxJava
just()
, from()
, and other Observable
creation tools store the value of data when created, not when subscribed.
defer(): none of the code inside of defer()
is executed until subscription.
Observable.create()
- no need to call onCompleted()
.2、Loading data from multiple sources with RxJava
first() vs. takeFirst(): The difference between the two calls is that first()
will throw a NoSuchElementException
if none of the sources emits valid data, whereas takeFirst()
will simply complete without exception.
3、Don't break the chain: use RxJava's compose() operator
1. compose(): feed it an Observable
of one type and it'll return an Observable
of another. reusable and the chain is preserved.
compose() vs flatMap(): The difference is that compose()
is a higher level abstraction: it operates on the entire stream, not individually emitted items(flatMap).
If you want to replace some operators with reusable code, use compose()
.
2. Reusing Transformers: don't have to care about type
final Transformer schedulersTransformer = observable -> observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); @SuppressWarnings("unchecked") <T> Transformer<T, T> applySchedulers() { return (Transformer<T, T>) schedulersTransformer; }
4、Functional Reactive Programming on Android With RxJava
1. Problems of concurrent programming with AsyncTask:
(1) There are many things that can go wrong, so we want to recover from errors, and add a try-catch block. Perhaps we want to inform the user about this error too, which likely involves interacting with the UI. Wait, we cannot do that because we are not allowed to update any user-interface elements from a background thread.
(2) how do we obtain a reference to a Context
, without which we cannot do anything meaningful with the UI? Apparently, we have to bind it to the task instance up front, at the point of instantiation, and keep a reference to it throughout a task’s execution. But what if the download takes a minute to run? Do we want to hold on to an Activity
instance for an entire minute?
(3) Composing service objects means nesting AsyncTask
, which leads to what is commonly referred to as “callback hell” because you start tasks from a task callback from a task callback from a …
(4) Dealing with AsyncTask and Screen Orientation. (AsyncTask configuration change)
2. Rxjava solves all of the problems in one fell swoop:
Context
5、What are the Hot and Cold observables?
hot happens even when nobody is subscribed, cold happens "on demand". Also, Publish() converts cold to hot and Defer() converts hot to cold.
passive sequences are Cold and active are described as being Hot.
7、HOW TO KEEP YOUR RXJAVA SUBSCRIBERS FROM LEAKING
1. The straightforward solution to this is to unsubscribe from your Observable
when theActivity
is about to be destroyed
2. If we subclass Observable
to wrap our Subscribers
in a Subscriber
decorator that delegates work to its weakly held, wrapped Subscriber
, we can keep clients from having to worry about leaking their Subscribers
without forcing them to write boilerplate code.
Add throttling behaviour: debounce() is what you usually need.
Kill the previous requests: introduce switchMap instead of flatMap.
No error functionality / no network functionality: You need a retry mechanism for these.
RxTextView.textChanges(searchEditText) .debounce(150, MILLISECONDS) // throttling behaviour .switchMap(Api::searchItems) // Kill the previous requests .retryWhen(new RetryWithConnectivityIncremental(context, 5, 15, SECONDS)) // a retry mechanism .subscribe(this::updateList, t->showErrorToUser());
9、Observe on the correct thread
Instead of doing this: Observable.just(1,2,3).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).flatMap(/** logic which doesn't touch ui **//).subscribe();
do this: Observable.just(1,2,3).subscribeOn(Schedulers.newThread()).flatMap(/** logic which doesn't touch ui **//).observeOn(AndroidSchedulers.mainThread()).subscribe();
Earlier .subscribeOn() wins.
AsyncSubject:仅释放Observable释放的最后一个数据,而且仅在Observable完成以后(subject.onComplete())。
observer will receive no onNext events if the subject.onCompleted() isn't called.
当Observer订阅了一个BehaviorSubject,它一开始就会释放Observable最近释放的一个数据对象,当尚未任何数据释放时,它则是一个默认值。
PublishSubject:仅会向Observer释放在订阅以后Observable释放的数据。
无论Observer什么时候订阅ReplaySubject,ReplaySubject会向全部Observer释放Observable释放过的数据。
假设你有一个Subject,你想把它传递给其它的代理或者暴露它的Subscriber接口,你能够调用它的asObservable方法,这个方法返回一个Observable。具体使用方法能够参考Javadoc文档。
因为一个Observable订阅一个Observable,它能够触发这个Observable开始发射数据(若是那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。所以有这样的效果,Subject能够把原来那个"冷"的Observable变成"热"的。
若是你把 Subject
看成一个 Subscriber
使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议,给Subject的结果增长了不肯定性。
要避免此类问题,你能够将 Subject
转换为一个 SerializedSubject
,相似于这样:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
12、Implementing an Event Bus With RxJava - RxBus
There’s an important semantic difference between the Observer and Pub-sub patterns though:
In the pub-sub pattern the focus is on “broadcasting” messages outside.
The Observable here doesn’t want to know who the events are going out to, just that they’ve gone out.
In other words the Observable (a.k.a Publisher) doesn’t want to know who the Observers (a.k.a Subscribers) are.
Why the anonymity: “decoupling”, subscriber need not have logic coded in them that establish the dependencies between the two.
How the anonymity: "event bus", get hold of a middleman and let that middleman take care of all the communication.