RxJava简单实现源码分析--简单的调度

1、RxJava的介绍:

 RxJava是Reactive Extensions的Java VM实现:一个库,用于经过使用可观察序列来编写异步和基于事件的程序。它扩展了观察者模式,以支持数据/事件序列,并添加了容许您以声明方式组合序列的运算符,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。编程

简单理解就是,ReactiveX用更好的方式为异步编程提供一套API接口。为何说更好的方式呢?这里就得说说现有的Android实现异步的方式AsyncTask和Handler了。这两种方式均可以处理异步操做,而不阻塞线程。可是它们抽象不够,实现比较复杂,并且难以处理比较复杂的业务逻辑好比页面有两个请求,当两个请求都成功的时候才能展现页面。像这种须要处理多个异步线程的时候,AsyncTask和Handler就比较难以实现。这时RxJava就登场了,它能够很好的解决线程切换,处理多个线程的关系,同时可使业务逻辑扁平化,逻辑更清晰。安全

1、RxJava的简单使用:

       RxJava是基于观察者模式的,因此有Observable和Observer,经过subscribe来实现订阅。bash

public abstract class Observable<T> implements ObservableSource<T> {
     void subscribe(@NonNull Observer<? super T> observer);
}复制代码
public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);    
    void onComplete();}复制代码

Observable经过subscribe拿到Observer的引用,当Observable有数据的时候通知Observer,数据从Observable(被观察者)流向Observer(观察者)数据结构

简单的代码实现并发

//1.建立Observable
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
})
//2.订阅观察者
.subscribe(
//3.建立观察者
new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext:" + integer);
    }
    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError:" + e.getMessage());
    }
    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }});复制代码

源码分析流程:异步

步骤1. Observable.create()建立了一个被观察者,经过subscribe方法订阅了一个观察者ide

Observable.create():
异步编程

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码

返回的是ObservableCreate这个对象就是返回的Observable对象,走到这一步只是建立了Observable,并把建立的ObservableOnSubscribe对象传递进来。类中subscribeActual()这个方法和CreateEmitter这个类如今尚未调用,后面分析。源码分析

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //在Observable调用subscribe(Observer)方法时候调用,这里先不看    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        
    }
}复制代码

步骤2.如今建立了ObservableCreate这个Observable,而后它调用了subscribe()方法订阅观察者ui

public final void subscribe(Observer<? super T> observer) {
    //检查是否为空
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //用来调试的,,真实环境返回的仍是observer,忽略此步骤       
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        
    }
}复制代码

能够看到这里就是调用了subscribeActual(observer);这个抽象方法。其实调用的是步骤1中建立的ObservableCreate这个对象的subscribeActual()方法,并把第3步建立的Observer传入

如今后头看看步骤1中建立的ObservableCreate这个类subscribeActual()

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //这是一个封装了Observer的类,是步骤1中onSubscribe方法中传入的参数
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //这里调用了步骤3中的onSubscribe回调。        
        observer.onSubscribe(parent);
        try {
            //这里调用了步骤1中建立的ObservableOnSubscribe的subscribe这个方法。
            //方法中调用的emitter.onNext(1);实际上是这里的parent这个类的onNext()方法
            //最终调用了步骤3中建立的Observer的onNext方法,这样就走完了流程
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                return;
            }
            if (!isDisposed()) {
                //调用真正的步骤3中建立的observer
                observer.onNext(t);
            }
        }
        @Override
        public void onError(Throwable t) {
            
        }复制代码

这里有点绕,其实ObservableCreate是一个被观察者,有个内部类CreateEmitter,内部类持有观察者,被观察者要发送数据就是经过这个内部类调用onNext(T t),继而调用内部的观察者。

相关文章
相关标签/搜索