如今RxJava已是2.0以上版本了,看到不少资料上还在讲1.0的版本,所以作个简单的笔记,供你们参考,不足之处请兄弟们留言我添加完善java
一、概述编程
RxJava(Reactive Extensions Java),RxJava基于观察者模式,是一种响应式编程模型,目的是提供统一的接口帮助开发者便捷的处理异步数据流。app
RxJava本质上是一个异步库,使用简洁的逻辑处理繁琐复杂任务。异步
二、观察者模式ide
观察者模式涉及到2主角函数
1)Observable :被观察者spa
2)Observer:观察者线程
被观察者中有一个观察者的列表,当被观察者发生变化的时候回根据这张表一一通知观察者。code
三、1.0和2.0的区别 orm
3.一、背压
1.0背压集中在Obserable中处理,致使有点混乱,。
2.0版本把对背压的处理提出来了,出现了两种观察者模式:
Observable(被观察者)/Observer(观察者) 不支持背压(Backpressure)
Flowable(被观察者)/Subscriber(观察者) 支持背压(Backpressure)
Flowable是新增的
3.二、操做符修改
1.0版本
Action这类接口命名Action0、Action1…(数字表明可接受的参数),如今作出了改动
1.x | 2.x |
Action0 | Action |
Action1 | Consumer |
Action2 | BiConsumer |
... | |
ActionN |
Funx修改
1.x | 2.x |
Func | Function<T, R> A functional interface that takes a value and returns another value |
Func2 | BiFunction<T1, T2, R> |
Func3 ~ Func9 | Function3 ~ Function9
Function3<T1, T2, T3, R> Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> |
FuncN | Function<T, R> |
First()
1.x | 2.x |
first | firstElement |
first(Func1) | filter(predicate).first() |
firstOrDefault(T) | first(T) |
firstOrDefault(Func1, T) | first(T) |
3.三、线程调度 Schedulers
2.0去掉了Schedulers.immediate()、Schedulers.test()
3.4 、建立被观察者的变化
1.0版本 Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onNext("hello"); subscriber.onNext("hello"); subscriber.onCompleted(); } }); 2.0版本 变化点1:create参数Observable.OnSubscribe改成ObservableOnSubscribe 变化点2:回到函数call(Subscriber)改成 subscribe(ObservableEmitter) 变化点3:可发射三种事件:emitter.onNext(T value)、onComplete、onError(Throwable e) Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext("hello!"); e.onNext("world!"); e.onNext("are you ok!"); e.onError(new Throwable("error")); e.onComplete(); } });
3.五、建立观察者的变化
1.0 中建立观察者Observer有两种方式 方式1:采用 Observer 接口 Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; 方式2:采用Subscriber接口建立, 其对 Observer接口进行了扩展,增长了onStart()、unSubscribe() Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; 2.0 建立观察者Observer 变化点1:增长了回调方法onSubscribe(),其最早被调用,可用来作一些初始化的工做 变化点2:onCompleted()改为 onComplete() Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } // 注意 @Override public void onComplete() { } }
3.六、接口都增长了异常抛出
public interface Action { /** * Runs the action and optionally throws a checked exception. * @throws Exception if the implementation wishes to throw a checked exception */ void run() throws Exception; } public interface BiConsumer<T1, T2> { /** * Performs an operation on the given values. * @param t1 the first value * @param t2 the second value * @throws Exception on error */ void accept(T1 t1, T2 t2) throws Exception; } public interface Function<T, R> { /** * Apply some calculation to the input value and return some other value. * @param t the input value * @return the output value * @throws Exception on error */ R apply(T t) throws Exception; } /** * A functional interface (callback) that accepts a single value. * @param <T> the value type */ public interface Consumer<T> { /** * Consume the given value. * @param t the value * @throws Exception on error */ void accept(T t) throws Exception; }
3.7 from 变化
from -> fromArray
1.0版本 private void doFrom() { String[] items = {"item1", "item2", "item3"}; Observable.from(items) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } 2.0版本 private void doFrom() { String[] items = {"item1", "item2", "item3"}; Observable.fromArray(items).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); }