RxJava是Reactive Extensions的Java VM实现:一个库,用于经过使用可观察序列来编写异步和基于事件的程序。它扩展了观察者模式,以支持数据/事件序列,并添加了容许您以声明方式组合序列的运算符,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。编程
简单理解就是,ReactiveX用更好的方式为异步编程提供一套API接口。为何说更好的方式呢?这里就得说说现有的Android实现异步的方式AsyncTask和Handler了。这两种方式均可以处理异步操做,而不阻塞线程。可是它们抽象不够,实现比较复杂,并且难以处理比较复杂的业务逻辑。好比页面有两个请求,当两个请求都成功的时候才能展现页面。像这种须要处理多个异步线程的时候,AsyncTask和Handler就比较难以实现。这时RxJava就登场了,它能够很好的解决线程切换,处理多个线程的关系,同时可使业务逻辑扁平化,逻辑更清晰。安全
RxJava是基于观察者模式的,因此有Observable和Observer,经过subscribe来实现订阅。bash
public abstract class Observable<T> implements ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}复制代码
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();}复制代码
Observable经过subscribe拿到Observer的引用,当Observable有数据的时候通知Observer,数据从Observable(被观察者)流向Observer(观察者)数据结构
简单的代码实现并发
//1.建立Observable
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
})
//2.订阅观察者
.subscribe(
//3.建立观察者
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext:" + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}});复制代码
源码分析流程:异步
步骤1. Observable.create()建立了一个被观察者,经过subscribe方法订阅了一个观察者ide
Observable.create():
异步编程
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码
返回的是ObservableCreate这个对象就是返回的Observable对象,走到这一步只是建立了Observable,并把建立的ObservableOnSubscribe对象传递进来。类中subscribeActual()这个方法和CreateEmitter这个类如今尚未调用,后面分析。源码分析
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//在Observable调用subscribe(Observer)方法时候调用,这里先不看 @Override
protected void subscribeActual(Observer<? super T> observer) {
}
}复制代码
步骤2.如今建立了ObservableCreate这个Observable,而后它调用了subscribe()方法订阅观察者ui
public final void subscribe(Observer<? super T> observer) {
//检查是否为空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//用来调试的,,真实环境返回的仍是observer,忽略此步骤
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
}
}复制代码
能够看到这里就是调用了subscribeActual(observer);这个抽象方法。其实调用的是步骤1中建立的ObservableCreate这个对象的subscribeActual()方法,并把第3步建立的Observer传入
如今后头看看步骤1中建立的ObservableCreate这个类subscribeActual()
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//这是一个封装了Observer的类,是步骤1中onSubscribe方法中传入的参数
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//这里调用了步骤3中的onSubscribe回调。
observer.onSubscribe(parent);
try {
//这里调用了步骤1中建立的ObservableOnSubscribe的subscribe这个方法。
//方法中调用的emitter.onNext(1);实际上是这里的parent这个类的onNext()方法
//最终调用了步骤3中建立的Observer的onNext方法,这样就走完了流程
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
return;
}
if (!isDisposed()) {
//调用真正的步骤3中建立的observer
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
}复制代码
这里有点绕,其实ObservableCreate是一个被观察者,有个内部类CreateEmitter,内部类持有观察者,被观察者要发送数据就是经过这个内部类调用onNext(T t),继而调用内部的观察者。