Android源码系列-解密RxJava

rxjava是什么?

ReactiveX

ReactiveX是Reactive Extensions的缩写,通常简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年愈来愈流行了,如今已经支持几乎所有的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET。html

rxjava

rxjava是ReactiveX在java平台的一个实现。是一个编程模型,以观察者模式提供链式的接口调用,动态控制线程的切换,使得能够简便的处理异步数据流。java

简介

Github:rxjavareact

中文文档:ReactiveX/RxJava文档中文版android

官网:reactivexgit

特色

  • 链式调用,使用简单
  • 简化逻辑
  • 灵活的线程调度
  • 提供完善的数据操做符,功能强大

观察者模式

观察者模式定义对象间一种一对多的依赖关系,使得每当一个对象改变状态,则因此依赖于它的对象都会获得通知并被自动更新。rxjava的核心设计就是采用观察者模式。Observable是被观察者,Observer是观察者,经过subscribe方法进行订阅。github

  • 优势

观察者和被观察者之间是抽象解耦,应对业务变化数据库

加强系统灵活性、可扩展性编程

具体代码示例可参考设计模式-观察者模式json

  • 缺点

在应用观察者模式时须要考虑一下开发效率和运行效率问题,程序中包括一个被观察者、多个观察者、开发和调试等内容会比较复杂,并且在Java中消息的通知默认是顺序执行,一个观察者卡顿,会影响总体的执行效率,在这种状况下,通常考虑采用异步的方式设计模式

rxjava怎么用?

gradle引入版本

implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
   implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
复制代码

接着举一个经常使用的rxjava使用的例子,咱们在项目常常须要请求服务端接口,而后获取数据,将数据进行缓存,而后处理ui上的显示。示例的代码以下:

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(ObservableEmitter<Response> e) throws Exception {
                //获取服务端的接口数据
                Request.Builder builder = new Request.Builder()
                        .url("http://xxx.com")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, Model>() {
            @Override
            public Model apply( Response response) throws Exception {
                //将json数据转化为对应的Model
                if (response.isSuccessful()) {
                    ResponseBody body = response.body();
                    if (body != null) {
                        Log.e(TAG, "map:转换前:" + response.body());
                        return new Gson().fromJson(body.string(), Model.class);
                    }
                }
                return null;
            }
        }).doOnNext(new Consumer<Model>() {
                    @Override
                    public void accept( Model s) throws Exception {
                        //对数据进行其余缓存的处理
                        Log.e(TAG, "doOnNext: 保存网络加载的数据:" + s.toString() + "\n");
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Model>() {
                    @Override
                    public void accept(Model model) throws Exception {
                        //刷新ui
                        Log.e(TAG, "成功刷新界面:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        //进行失败的异常提示
                        Log.e(TAG, "失败处理异常:" + throwable.getMessage() + "\n");
                    }
                });

复制代码

本文主要对rxjava的源码进行梳理分析,关于rxjava操做符的使用,推荐参考中文的文档,以及下面的博文介绍。

这多是最好的RxJava 2.x 教程(完结版)

rxjava核心执行流程是怎样?

rxjava主要是采用观察者模式进行设计,当执行相关的操做符是会生成新的Observable及Observer。Observable会持有上游被观察者,Observer会持有下游的观察者。当执行subscribe订阅方法的时候,经过持有上游的被观察者对象,会往上游逐步执行订阅方法。当执行到起始的被观察者回调方法时,若是执行ObservableEmitter的onNext方法时,因为Observer会持有下游的Observer对象,会逐步调用下游的onNext方法,直到最终subscribe传入的观察者实例。这是rxjava链式调用的核心执行流程。

固然rxjava还涉及到线程的调度及数据的背压处理,关于这些实现的原理会再后续进行梳理。但rxjava的链式调用的核心执行流程都是一致。下面咱们将经过2个部分来梳理rxjava的核心执行流程,包含一些关键类的说明,及经过示例的代码相关的执行流程图进行梳理。

关键类功能说明

说明
ObservableSource 接口类,只有一个subscribe方法,参数是Observer对象
Observer 接口类,观察者。有onSubscribe、onNext、onError、onComplete方法
Consumer 接口有,观察者。只有一个accept方法,在被订阅时最终也会转换成Observer,设计这个类是为了简化调用
Observable 抽象类,继承了ObservableSource接口,操做符的实现都是继承与它。内部封装了大量的操做符调用方法,主要是有一个核心的抽象方法abstract void subscribeActual(Observer<? super T> observer),用于实现相关的订阅分发逻辑。
AbstractObservableWithUpstream 继承于Observable,构造方法须要传入ObservableSource source对象,source是父被观察者。
ObservableCreate 继承于AbstractObservableWithUpstream,source为ObservableOnSubscribe。subscribeActual方法会实例化一个CreateEmitter对象,执行ObservableOnSubscribe的subscribe方法
ObservableMap 继承于AbstractObservableWithUpstream,订阅会新生产一个观察者MapObserver
MapObserver ObservableMap的内部类,onNext方法会触发mapper.apply(t)回调,而后执行下游观察者的onNext方法
ObservableDoOnEach 继承于AbstractObservableWithUpstream,订阅会新生产一个观察者DoOnEachObserver
DoOnEachObserver ObservableDoOnEach的内部类,onNext会执行onNext.accept(t)方法,而后执行下游观察者的onNext方法
ObservableSubscribeOn 继承于AbstractObservableWithUpstream,被观察者线程调度控制。subscribeActual会执行scheduler.scheduleDirect(new SubscribeTask(parent)),SubscribeTask的run方法会执行source.subscribe(parent)。ObservableSubscribeOn根据线程调度器的策略去执行上游的订阅方法实现。
ObservableObserveOn 继承于AbstractObservableWithUpstream,观察者线程调度控制。subscribeActual方法会判断scheduler是否为TrampolineScheduler。如果则执行下游的观察者,否会建立新的ObserveOnObserver,并传入schedule的work。
ObserveOnObserver ObservableObserveOn内部类,onNext会触发执行schedule()方法,根据worker去控制下游观察者的回调线程

代码执行流程

首先咱们根据上面demo例子,梳理出rxjava的简单执行流程,以下图:

image

经过流程图可知,rxjava当执行相关的操做符是会生成新的Observable及Observer。Observable会持有上游被观察者,Observer会持有下游的观察者。当执行subscribe订阅方法的时候,经过持有上游的被观察者对象,会往上游逐步执行订阅方法。当执行到起始的被观察者回调方法时,若是执行ObservableEmitter的onNext方法时,因为Observer会持有下游的Observer对象,会逐步调用下游的onNext方法,直到最终subscribe传入的观察者实例。

了解了rxjava大体的执行流程,下面咱们来详细的看看源码的执行流程。首先仍是先上一下总体的流程图,因为图片较大,建议结合上述的demo及rxjava的源码进行查看。

image

下面咱们分配经过几个操做符来看看rxjava源码具体的实现。

create

create的操做符会返回一个ObservableCreate的被观察者。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

接下来看看ObservableCreate对象的关键实现代码,以下:

//构造方法会传入ObservableOnSubscribe接口的引用,指定为该被观察者的source。
  public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

  //核心的subscribeActual 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
      //建立了CreateEmitter发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
          //执行了ObservableOnSubscribe的subscribe回调方法,传入了CreateEmitter对象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

当咱们在业务代码执行了ObservableEmitter的onNext方法,咱们看一下CreateEmitter的onNext的实现代码,以下:

//持有下游的观察者引用
   CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //若是没有取消订阅,则会执行下游的观察者的onNext方法,达到链式调用的效果
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
复制代码

map

map的操做符会返回一个ObservableMap的被观察者。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
复制代码

接下来看看ObservableMap对象的关键实现代码,以下:

@Override
    public void subscribeActual(Observer<? super U> t) {
        //将上游的被观察者订阅MapObserver观察者
        source.subscribe(new MapObserver<T, U>(t, function));
    }
复制代码

接下来主要看看MapObserver的onNext方法,该方法会在ObservableEmitter的onNext方法触发后被调用,以下:

//持有下游的观察者和回调函数mapper
   MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        
 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
              //map的核心执行代码,mapper.apply(t)会执行数据的转换,并将转换后的结果v继续交由下游的观察者执行
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //将转换后的结果v继续交由下游的观察者执行
            actual.onNext(v);
        }
复制代码

doOnNext

doOnNext的操做符会返回一个ObservableDoOnEach的被观察者。

private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
        return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    }
复制代码

接下来看看ObservableDoOnEach对象的关键实现代码,以下:

@Override
    public void subscribeActual(Observer<? super T> t) {
       //实例化一个DoOnEachObserver的观察者对象
        source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    }
复制代码

这里核心咱们仍是要看DoOnEachObserver的onNext对于数据的处理,以下:

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            try {
                //回调accept方法
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }
            //继续往下游调用观察者的onNext
            actual.onNext(t);
        }
复制代码

subscribeOn

subscribeOn的操做符会返回一个ObservableSubscribeOn的被观察者,并传入scheduler线程调度参数。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
复制代码

接下来看看ObservableSubscribeOn对象的关键实现代码,以下:

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //建立了SubscribeOnObserver的观察者
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
        
        //这个是核心方法,调用了线程调度去的scheduleDirect方法,并传入SubscribeTask任务
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
复制代码

接下来咱们看看SubscribeTask的实现,以下:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //执行上游被观察的订阅方法,这里就是subscribeOn将上游的订阅方法控制在scheduler指定线程执行的核心
            source.subscribe(parent);
        }
    }
复制代码

最后看下SubscribeOnObserver的onNext方法,比较简单,直接执行下游观察者的onNext方法,以下:

@Override
        public void onNext(T t) {
            actual.onNext(t);
        }
复制代码

关于scheduler的具体实现,在后续的线程原理进行分析。这里咱们只须要知道上游的被观察者的订阅在指定的scheduler线程策略中执行就能够了。

observerOn

observerOn 的操做符会返回一个ObservableObserveOn的被观察者,并传入scheduler线程调度参数。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
复制代码

接下来看看ObservableObserveOn对象的关键实现代码,以下:

@Override
    protected void subscribeActual(Observer<? super T> observer) {
       //TrampolineScheduler 若是是当前的线程 则直接将下游的观察者与上游的被观察订阅
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //其余线程策略
            Scheduler.Worker w = scheduler.createWorker();
            //将线程策略的worker传入ObserveOnObserver观察者
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

复制代码

接下来关键仍是看ObserveOnObserver的实现,以下:

@Override
public void onNext(T t) {
    // 上一级的模式若是不是异步的,加入队列
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //进行线程调度
    schedule();
}

void schedule() {
    // 判断当前正在执行的任务数目
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}


复制代码

ObserveOnObserver自己继承了Runnable接口,run方法实现以下:

@Override
public void run() {
    //输出结果是否融合
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
复制代码

咱们先进入drainNormal方法:

void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    //第一层循环
    for (;;) {
        // 检查异常处理
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        //第二层循环
        for (;;) {
            boolean d = done;
            T v;
            //从队列中获取数据
            v = q.poll();
            boolean empty = v == null;
            // 检查异常
            if (checkTerminated(d, empty, a)) {
                return;
            }
            //若是没有数据了,跳出
            if (empty) {
                break;
            }
            //执行下一次操做。
            a.onNext(v);
        }
        //减掉执行的次数,并获取剩于任务数量,而后再次循环
        //直到获取剩余任务量为0,跳出循环
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

复制代码

关于scheduler的具体实现,在后续的线程原理进行分析。这里咱们只须要知道下游的观察者的onNext在指定的scheduler线程策略中执行就能够了。

subscribe

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

复制代码

最后的订阅方法在作了非空检查后,会调用subscribeActual方法,开始往上游逐层执行订阅。

被观察者Observable是如何发送数据?

经过上面的流程分析,咱们能够知道。若是使用create建立了Observable,在ObservableOnSubscribe的subscribe方法中会经过ObservableEmitter的onNext去发送数据,onNext会触发开始往下游观察者传递数据。固然rxjava的建立型操做符还有不少,如just、from等,本质最后都是触发下游观察者的onNext进行数据的发送。

观察者Observer是如何接收到数据的?

经过源码分析,每个链层的Observer都会持有相邻下游的Observer对象,当开始发送数据时,会依次链式执行Observer的onNext方法,最后执行到subscribe方法中建立的Observer对象。

被观察者和观察者之间是如何实现订阅?

每个链层的Observable 都会持有相邻上游的Observable对象,在subscribe方法开始调用后,最后会执行到subscribeActual方法,在subscribeActual方法中会将观察者与上游的被观察执行订阅。

rxjava是如何进行线程的调度?

rxjava的Scheduler有不少种实现,下面咱们介绍Scheduler的相关说明,而后经过最经常使用的.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())来分析具体的线程调度流程。

Scheduler

咱们在调用subscribeOn与observeOn时,都会传入Scheduler对象,首先咱们先看一下Scheduler的种类及其功能

Scheduler种类 说明
Schedulers.io( ) 用于IO密集型的操做,例如读写SD卡文件,查询数据库,访问网络等,具备线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,若是有,则复用,若是没有则建立新的线程,并加入到线程池中,若是每次都没有空闲线程使用,能够无上限的建立新线程
Schedulers.newThread( ) 在每执行一个任务时建立一个新的线程,不具备线程缓存机制,由于建立一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io( )的地方,均可以使用Schedulers.newThread( ),可是,Schedulers.newThread( )的效率没有Schedulers.io( )高
Schedulers.computation() 用于CPU 密集型计算任务,即不会被 I/O 等操做限制性能的耗时操做,例如xml,json文件的解析,Bitmap图片的压缩取样等,具备固定的线程池,大小为CPU的核数。不能够用于I/O操做,由于I/O操做的等待时间会浪费CPU
Schedulers.trampoline() 在当前线程当即执行任务,若是当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完以后,再将未完成的任务接着执行
Schedulers.single() 拥有一个线程单例,全部的任务都在这一个线程中执行,当此线程中有任务执行时,其余任务将会按照先进先出的顺序依次执行
Scheduler.from(Executor executor) 指定一个线程调度器,由此调度器来控制任务的执行策略
AndroidSchedulers.mainThread() 在Android UI线程中执行任务,为Android开发定制

subscribeOn(Schedulers.io())

根据上面的分析,subscribeOn()方法最后会执行到subscribeActual方法,SubscribeTask上面分析了,继承了Runnable接口, run方法最后会执行source.subscribe(parent)方法。

@Override
   public void subscribeActual(final Observer<? super T> s) {
       final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

       s.onSubscribe(parent);
       
       parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
   }
复制代码

这里咱们主要要分析scheduler.scheduleDirect()方法。

@NonNull
  public Disposable scheduleDirect(@NonNull Runnable run) {
      return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
  }
  
   @NonNull
  public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //建立一个Worker对象
      final Worker w = createWorker();


      final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
      
      //DisposeTasky也是一个包装类 继承了Runnable接口
      DisposeTask task = new DisposeTask(decoratedRun, w);

      //这里是关键的实现,执行了worker的schedule方法
      w.schedule(task, delay, unit);

      return task;
  }
复制代码

Worker的schedule是一个抽象的方法,Schedulers.io()对应的Worker实现为EventLoopWorker。咱们看看EventLoopWorker的schedule实现以下:

static final class EventLoopWorker extends Scheduler.Worker {
      private final CompositeDisposable tasks;
      private final CachedWorkerPool pool;
      private final ThreadWorker threadWorker;

      final AtomicBoolean once = new AtomicBoolean();

      EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }

      @Override
      public void dispose() {
          if (once.compareAndSet(false, true)) {
              tasks.dispose();

              // releasing the pool should be the last action
              pool.release(threadWorker);
          }
      }

      @Override
      public boolean isDisposed() {
          return once.get();
      }

      @NonNull
      @Override
      public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
          if (tasks.isDisposed()) {
              // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 复制代码

这里会执行到 threadWorker的scheduleActual方法,继续往下看

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
      Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

      ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

      if (parent != null) {
          if (!parent.add(sr)) {
              return sr;
          }
      }

      Future<?> f;
      try {
          if (delayTime <= 0) {
              f = executor.submit((Callable<Object>)sr);
          } else {
              f = executor.schedule((Callable<Object>)sr, delayTime, unit);
          }
          sr.setFuture(f);
      } catch (RejectedExecutionException ex) {
          if (parent != null) {
              parent.remove(sr);
          }
          RxJavaPlugins.onError(ex);
      }

      return sr;
  }
复制代码

在这里会使用executor最终去执行run方法。固然看到这里有一个疑问IoScheduler在这里是怎么实现线程的复用呢?咱们看看threadWorker在IoScheduler中的线程的建立,以下:

EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }
复制代码

这里会经过维护一个Worker的线程池来达到线程复用的效果,具体咱们看看CachedWorkerPool的get方法,以下:

ThreadWorker get() {
          if (allWorkers.isDisposed()) {
              return SHUTDOWN_THREAD_WORKER;
          }
          //从已经release的work线程队列中获取缓存
          while (!expiringWorkerQueue.isEmpty()) {
              ThreadWorker threadWorker = expiringWorkerQueue.poll();
              //若是找到,返回复用的线程
              if (threadWorker != null) {
                  return threadWorker;
              }
          }

          // 若是没有,则会建立一个新的ThreadWorker
          ThreadWorker w = new ThreadWorker(threadFactory);
          allWorkers.add(w);
          return w;
      }
复制代码

observeOn(AndroidSchedulers.mainThread())

@Override
   protected void subscribeActual(Observer<? super T> observer) {
       //若是指定当前线程 则不进行调度
       if (scheduler instanceof TrampolineScheduler) {
           source.subscribe(observer);
       } else {
           //建立Worker
           Scheduler.Worker w = scheduler.createWorker();
           //实例化ObserveOnObserver观察者并传入Worker
           source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
       }
   }
复制代码

这里咱们主要须要分析ObserveOnObserver对象,onNext实现以下:

@Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if (sourceMode != QueueDisposable.ASYNC) {
               queue.offer(t);
           }
           schedule();
       }
       
        void schedule() {
           if (getAndIncrement() == 0) {
               worker.schedule(this);
           }
       }
复制代码

关键仍是执行了worker的schedule,AndroidSchedulers的实现主要为HandlerScheduler,HandlerScheduler中关于Worker的实现为HandlerWorker,咱们看下schedule的实现以下:

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
           if (run == null) throw new NullPointerException("run == null");
           if (unit == null) throw new NullPointerException("unit == null");

           if (disposed) {
               return Disposables.disposed();
           }

           run = RxJavaPlugins.onSchedule(run);

           ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

           Message message = Message.obtain(handler, scheduled);
           message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } //经过handler发送消息执行run接口 handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } 复制代码

关于handler的实例,咱们看AndroidSchedulers中的建立以下:

private static final class MainHolder {
       static final Scheduler DEFAULT
           = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
   }
复制代码

综上可知AndroidSchedulers.mainThread()是经过消息将run方法的实现交由主线程Looper进行处理,达到将观察者的数据处理在主线程中执行的效果

rxjava背压策略实现原理是怎样的?

背压(backpressure)

当上下游在不一样的线程中,经过Observable发射,处理,响应数据流时,若是上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来及处理的数据就会形成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,若是缓存池中的数据一直得不处处理,越积越多,最后就会形成内存溢出,这即是响应式编程中的背压(backpressure)问题。

背压处理机制

rxjava2.x使用Flowable来支持背压的机制,调用create方法时须要传入BackpressureStrategy策略。

Strategy 做用
MISSING 此策略表示,经过Create方法建立的Flowable没有指定背压策略,不会对经过OnNext发射的数据作缓存或丢弃处理,须要下游经过背压操做符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略
ERROR 在此策略下,若是放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常
BUFFER 此策略下,Flowable的异步缓存池同Observable的同样,没有固定大小,能够无限制向里添加数据,不会抛出MissingBackpressureException异常,但会致使OOM
DROP 在此策略下,若是Flowable的异步缓存池满了,会丢掉上游发送的数据
LATEST 与Drop策略同样,若是缓存池满了,会丢掉将要放入缓存池中的数据,不一样的是,无论缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中

实现原理

首先看看Flowable的create实现

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
复制代码

这里会建立一个FlowableCreate对象,并传入指定的BackpressureStrategy策略。接着看看FlowableCreate的订阅方法

@Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        //根据不一样的策略初始化不一样的数据发射器
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
复制代码

BaseEmitter

abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;

        final Subscriber<? super T> actual;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> actual) {
            this.actual = actual;
            this.serial = new SequentialDisposable();
        }
}
    
    //这里须要注意的是,Request最终会把n负责给AtomicLong
   @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }
//省略其余若干方法
复制代码

经过上面的结束咱们知道Flowable有一个缓冲池,那个这个大小是多少,在哪里进行复制给发射器呢?

//长度是128
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    
   public static int bufferSize() {
        return BUFFER_SIZE;
    }
    
    //在调用observeOn时,会将长度最后传给emitter发射器,具体能够打断的追踪查看调用链
    public final Flowable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
复制代码

MissingEmitter

不会对经过OnNext发射的数据作缓存或丢弃处理

@Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }
复制代码

NoOverflowBaseAsyncEmitter

DropAsyncEmitter和ErrorAsyncEmitter继承了NoOverflowBaseAsyncEmitter

@Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //若是数量不为0则减1,经过上面的Request,能够知道get()为Flowable的BUFFER_SIZE 128
            if (get() != 0) {
                actual.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                //超出阈值 执行onOverflow
                onOverflow();
            }
        }
复制代码

DropAsyncEmitter

若是Flowable的异步缓存池满了,会丢掉上游发送的数据

static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {


        private static final long serialVersionUID = 8360058422307496563L;

        DropAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            // nothing to do
        }

    }

复制代码

ErrorAsyncEmitter

若是Flowable的异步缓存池满了,会抛出异常

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {


        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }

    }
复制代码

BufferAsyncEmitter

Flowable的异步缓存池同Observable的同样,没有固定大小,能够无限制向里添加数据

@Override
        public void onNext(T t) {
            if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //加入队列 queue为SpscLinkedArrayQueue队列
            queue.offer(t);
            //通知消费
            drain();
        }
复制代码

LatestAsyncEmitter

Flowable的异步缓存池同Observable的同样,没有固定大小,能够无限制向里添加数据

@Override
        public void onNext(T t) {
           if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //进行覆盖 queue为AtomicReference
            queue.set(t);
             //通知消费
            drain();
        }
复制代码

总结

思考

本文主要对rxjava的链式执行流程、线程调度以及背压机制进行梳理分析。rxjava的库还有很是多的操做符及功能,但愿后续有时间继续进行分析。rxjava的源码及一些概念命名仍是相对比较复杂,前先后后大约花了2周的时间进行源码的学习,坚持下来了,仍是收获满满。

参考资料

这多是最好的RxJava 2.x 教程(完结版)

ReactiveX中文文档

Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解

RxJava2 源码解析——线程调度 Scheduler

推荐

Android源码系列-解密OkHttp

Android源码系列-解密Retrofit

Android源码系列-解密Glide

Android源码系列-解密EventBus

Android源码系列-解密RxJava

Android源码系列-解密LeakCanary

Android源码系列-解密BlockCanary

关于

欢迎关注个人我的公众号

微信搜索:一码一浮生,或者搜索公众号ID:life2code

image

相关文章
相关标签/搜索