一张图看懂Rxjava的原理

前言

Rxjava是NetFlix出品的Java框架, 官方描述为 a library for composing asynchronous and event-based programs using observable sequences for the Java VM,翻译过来就是“使用可观察序列组成的一个异步地、基于事件的响应式编程框架”。一个典型的使用示范以下:java

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String s = "1234";
                //执行耗时任务
                emitter.onNext(s);
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe();

本文要讲的主要内容是Rxjava的核心思路,利用一张图并结合源码分析Rxjava的实现原理,至于使用以及其比较深刻的内容,好比不经常使用的操做符,背压等,读者能够自行学习。另外提一句,本文采用的Rxjava版本是2.2.3,Rxjava最新版本是3.x.x,感兴趣的能够自行阅读,但相信其最核心的原理是不会变化的。另外,本文篇幅较长,最好的效果是边看本文边到源码中体会,若是读者没有耐心读完,能够只看图片和头尾。编程

正题

先放出本文最重要的图:api

Rxjava原理图

Rxjava的核心思路被总结在了图中,本文分为两部分,第一部分讲图中的三条流和事件传递,第二部分讲线程切换的原理,下面进入正题。网络

流式构建和事件传递

在讲以前,先提一点,在Rxjava中,有Observable和Observer这两个核心的概念,可是它们在发生订阅时,跟普通的观察者模式写法不太同样,由于常识来说,应该是观察者去订阅(subscribe)被观察者,可是Rxjava为了其基于事件的流式编程,只能反着来,observable去订阅observer,因此在rxjava中,subscribe能够理解“注入”观察者。app

首先咱们看上面的图片,先简单解释一下:图中方形的框表明的是Observable,由于它表明节点,因此用Ni表示,圆形框表明的是观察者Observer,用Oi标识,后面加括号的意思是Oi持有其下游Observer的引用,左侧表明上游,右侧表明下游。图片里有三条有方向的彩色粗线,表明三个不一样的流,这三个流是咱们为了分析问题而抽象出来的的,表明从构建到订阅整个事件的流向,按照时间顺序从上到下依次流过,它们的含义分别是:框架

  1. 从左往右的构建流:用来构建整个事件序列,这个流表征了整个链路的构建过程,至关于构造方法。
  2. 从右往左的订阅流:当最终订阅(subscribe方法)这个行为发生的时候,每一个节点从右向左依次执行订阅行为。
  3. 从左往右的观察者回调流:当事件发生之后,会经过这个流依次通知给各个观察者。

咱们依次分析这三条流:异步

构建流

在使用Rxjava时,其流式构建流程是很大的特点,避免了传统回调的繁琐。怎么实现的呢?使用过Rxjava的读者应该都知道,Rxjava的每一步构建过程api都是相同的,这是由于每一步的函数返回结果都是一个Observable,Observable提供了Rxjava全部的功能。那么Obsevable在Rxjava中到底扮演一个什么角色呢?事实上,其官方定义就已经告诉咱们答案了,前言里官方定义中有这样一段:“using Observable sequences”,因此说,Obsevable就是构建流的组件,咱们能够当作一个个节点,这些节点串起来组成整个链路。Observable这个类实现了一个接口:ObservableSource,这个接口只有一个方法:subscribe(observer),也就是说,全部的Obsevable节点都具备订阅这个功能,这个功能很重要,是订阅流的关键,待会会讲。总结一下:async

在咱们编写Rxjava代码时,每一步操做都会生成一个新的Observable节点(没错,包括ObserveOn和SubscribeOn线程变换操做),并将新生成的Observable返回,直到最后一步执行subscribe方法ide

不管是构建的第一步 create方法,仍是observeOn,subscribeOn变换线程方法,仍是各类操做符好比map,flatMap等,都会生成对应的Observable,每一个Observble中要实现一个最重要的方法就是subscribe,咱们看其实现:函数

public final void subscribe(Observer<? super T> observer) {
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            RxJavaPlugins.onError(e);
            throw npe;
        }
    }

这里提一点,你们看源码时遇到RxJavaPlugins时直接略过看里面的代码就行了,它是hook用的,不影响主要流程。因此上面代码其实只有一行有用:

subscribeActual(observer);

也就是说,每一个节点在执行subscribe时,其实就是在调用该节点的subscribeActual方法,这个方法是抽象的,每一个节点的实现都不同。咱们举个栗子,拿ObseverOn这个操做生成的ObservableSubscribeOn瞧瞧:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
  //xxx省略
}

其中其父类继承Observable,因此它是一个Observble。

整个过程有点像builder模式,不一样之处是它是生成了新的节点,而builder模式返回的自身。若是你读过okHttp的源码,okHttp中拦截器跟这里有些类似,okHttp中会构建多个Chain节点,而后用相应的Intercepter去处理Chain。

咱们理解了编写Rxjava代码的过程其实就是构建一个一个Observable节点的过程,接下来咱们看第二条流。

订阅流

构建过程只是经过构造函数将一些配置传给了各个节点,实际尚未执行任何代码,只有最后一步才真正的执行订阅行为。当最后一个节点调用subscribe方法时,是构建流向订阅流变化的转折点,咱们以图中为例:最后一个节点是N5,N5节点是最后一个flatmap操做符方法产生的,也就是说,最后是调用这个节点的subscribe方法,这个方法最终也是会调用到subscribeActual方法中去,咱们看其源码:

public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

    static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
        final Observer<? super U> downstream;
        final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    }

刚才咱们分析了,N5节点是Observable节点,其subscribe方法最后调用的是subscribeActual方法,咱们看上面代码中它的这个方法:前面的判断语句跳过,第二行:

source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));

这行代码须要注意两点:

  1. 生成了一个新的Observer,请注意其构造函数中第一个参数t,保存到了downstream这个“下游”变量中,这个t从哪儿传进来的呢?对于N5节点来讲,这个t就是咱们代码中最后一步编写的Observer,好比咱们经常使用的网络请求返回后的回调。也就是说,这个新生成的Observer包含了它的“下游”观察者的引用,在图片中对应最右边的圆形框O1(observer)。
  2. 执行订阅行为,这里的source是该节点构造函数传入的source,经过源码得知其实就是N5节点的上一个节点N4,所以,这里的订阅行为本质上是让当前节点的上一个节点订阅当前节点新生成的Observer

到这里,咱们分析了最后一个节点执行subscribe方法的过程,事实上,每一个节点的执行流程都是相似的(subscribeOn节点有些特殊,等会线程调度会将),也就是说,N5会调用N4的subscribe方法,而在N4的subscribe方法中,又去调用了N3的subscribe....一直到N0会调用source的subscribe方法。总结下来就是:

从最后一个N5节点的订阅行为开始,依次执行前面各个节点真正的订阅方法。在每一个节点的订阅方法中,都会生成一个新的Observer,这个Observer会包含“下游”的Observer,这样当每一个节点都执行完订阅(subscribeActual)后,也就生成了一串Observer,它们经过downstream,upstream引用链接。

以上就是订阅流的发生过程,简单讲就是下游节点调用上游节点的subscribeActual方法,从而造成了一个调用链。

观察者回调流

当订阅流执行到最后,也就是第一个节点N0时,咱们看发生了什么,首先看看N0节点怎么创建的:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

生成了ObservableCreate实例,咱们看这个类(简化):

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}

因此订阅流的最终会掉到上面的subscrbeActual方法,它其实仍是和其余节点同样,最主要的仍是执行了

source.subscribe(parent)

这行代码,那么这个节点的source是什么呢?它就是咱们事件的源头啊!

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String s = "1234";
                //执行耗时任务
                emitter.onNext(s);
            }
        })

上面代码直接拿的开头的例子,这个source是一个ObservableOnSubscribe,看它的subscribe方法里,这里很重要,这个函数里面实际上是订阅流和观察者流的转折点,也就是流在这儿“转向了”。这里,这个事件源没有像节点那样,调用上一个节点的订阅方法,而是调用了其参数的emitter的onNext方法,这个emitter对应N0节点的什么呢?看代码知道,时CreateEmitter这个类,咱们看这个类里面

static final class CreateEmitter<T> extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
      
        final Observer<? super T> observer;
      
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
      //省略
    }

看它的onNext方法,执行的是

observer.onNext(t)

observer是谁?构造函数传进来的,也就是N0节点subscribeActual方法中的observer,这个observer是谁呢?仔细回想一下,前面订阅流的时候不就是一次订阅上一个节点生成的Observer吗,因此这个observer就是前一个节点N1生成的Observer,咱们看N1节点,是一个Map,对应的Observable节点里的Observer源码以下:

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
      
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
           downstream.onNext(v);
          //省略后续

名为MapObserver,看它的onNext方法,忽略前面两个判断语句,核心就两句,一个是mapper.apply(t),另外一个就是downstream.onNext(v)。也就是说,这个mapObserver干了两件事,一个是把上个节点返回的数据进行一次map变换,另外一个就是将map后的结果传递给下游,下游是什么呢?看了订阅流的读者天然知道,就是N2节点的Observer,对应图中O4,依次类推,咱们知道了,事件发生之后,经过各个节点的Observer事件源被层层处理并传递给下游,一直到最后一个观察者执行完毕,整个事件处理完成。

至此,咱们三个流分析完毕,接下来,咱们开始分析线程调度是怎么实现的。

线程调度

观察仔细的读者可能已经看到了,图中N2节点左侧的全部节点和右侧的节点颜色不一样,我为何要这样画呢?其实里面的玄机就是线程调度,接下来咱们分别看subscribeOn和observeOn的线程切换玄机吧。

SubscribeOn

在订阅流发生的的时候,大多数节点都是直接调用上一个节点的subscribe方法,实现虽有差异,但大同小异。惟一有个最大的不一样就是subscribeOn这个节点,咱们看源码:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

普通的节点执行时,大多只是简单的执行source.subscribe(observer),可是这个不同。先看第二行,它调用了观察者的onSubscribe方法,熟悉Rxjava的人知道,咱们在自定义Observer的时候,里面有这个回调,其发生时机就在此刻。咱们接着看最后一行,忽略parent.setDisposable这个逻辑,咱们直接看参数里面的东西。

scheduler.scheduleDirect(new SubscribeTask(parent))

看看干了什么:

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

继续:

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

建立了一个worker,一个runnable,而后将两者封装到一个DisposeTask中,最后用worker执行这个task,那么这个worker是什么呢?

@NonNull
    public abstract Worker createWorker();

createworker是一个抽象方法,因此须要去找Scheduler的子类,咱们回想一下rxjava的使用,若是在子线程中执行,咱们通常设置调度器为Schedulers.io(),咱们看这个子类的实现:

在IOSchedluer类中:

@Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

继续:

@NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }

这里的executor就是一个ExecutorService,熟悉线程池的读者应该知道,这里的submit方法,就是将callable丢到线程池中去执行任务了。

咱们回到主线

scheduler.scheduleDirect(new SubscribeTask(parent))

对于io线程的调度器来讲,上面的代码就是将new SubscribeTask(parent)丢到线程池中执行,咱们看参数里面的SubscribeTask:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

看run方法:source.subscribe(parent),这里的parent跟普通节点同样,仍然是本节点生成的新的Observer,对于本节点来讲,是一个SubscribeOnObserver。所以,咱们就知道了,对于subscribeOn这个节点,它跟普通的节点不一样之处在于:

SubscribeOn节点在订阅的时候,将它的上游节点的订阅行为,以runnable的形式扔给了一个线程池(对于IO调度器来讲),也就是说,当订阅流流到SubscribeOn节点时,线程发生了切换,以后流向的节点都在切换后的线程中执行。

分析到这里,咱们就知道了subscribeOn的线程切换原理了,原来是在订阅流中塞了一个线程变化操做。咱们再看图中的颜色问题,为何这个节点上游的节点都是红色的呢?由于当订阅流流过这个节点后,后面的节点只是单纯的传递给上游节点而已,不管是普通的操做符,仍是ObserveOn节点,都是简单的传递给上游,没有作线程切换(注意,ObserveOn是在观察者流中作的线程切换,待会会讲)。

咱们再思考一个问题,若是上游还有别的subscribeOn,会发生什么?

咱们假设N1节点的map修改程subscribeOn(AndroidScheduler.Main),也就是说,切换到主线程。咱们仍是从N2节点开始分析,刚才说到最后会执行到SubscribeTask里的Run方法,注意此时source.subscribe(parent)发生在子线程中,接下来,回调用N1节点的subscribe,N1节点回调用scheduler.scheduleDirect(new SubscribeTask(parent)),方法,此时,由于线程调度器是主线程的,咱们看它的代码:

private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }

看看这个HandlerScheduler的方法:

@Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, unit.toMillis(delay));
        return scheduled;
    }

熟悉Android Handler机制的读者应该很清楚,这里会把N1节点上游的操做,经过Handler机制,扔给主线程操做,虽然这一步是在N2节点的子线程中执行的,可是它以前的事件仍然会在主线程中执行。所以咱们有如下结论:

subscribeOn节点影响它前面的节点的线程,若是前面还有多个subscribeOn节点,最终只有第一个,也就是最上游的那个节点生效

接下来咱们分析observeOn

ObserveOn

前面的subscribeOn线程切换是在订阅流中发生的,接下来的ObserveOn比较简单,它发生在第三条流-观察者回调流中,咱们看ObserveOn节点的源码:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    //简化
       @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
      }

在前面的观察者流分析时,咱们知道,观察者流是经过onNext()方法传递的,咱们看最后一行,schedule(),线程切换,因此这个ObserveOn节点其实没干啥事,就是切换线程了,并且是在onNext回调中切换的。咱们进到这个方法:

void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

worker是这个节点订阅时指定的 scheduler.createWorker(), 以主线程观察为例:

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
     
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            if (async) {
                message.setAsynchronous(true);
            }
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }

一样,经过Handler机制,将runnable扔给主线程执行,runnable是谁呢,是this,this就是这个ObserveOnObserver,咱们看它的run方法:

@Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

继续看drainNormal

void drainNorml() {
    //简化
  final Observer<? super T> a = downstream;
  T v;
  v = q.poll();
  a.onNext(v);
}

抓重点,仍是把上游的处理结果扔给下游。也就是说observeOn会将它下游的onNext操做扔给它切换的线程中,所以ObserveOn影响的是它的下游,因此咱们途中observeOn后面的颜色都是蓝的。

一样咱们思考,若是有多个observeOn会发生什么?很简单,思路同subscribeOn,每一个ObserveOn只会影响它下游一直到下一个obseveOn节点的线程,也就是分段的。

总结

到此为止咱们就讲完了所有内容,包括三条流的原理和线程切换的原理,至于Rxjava的其余功能和原理,限于篇幅,本文不会讲解,感兴趣的读者自行阅读源码。本文主要为读者提供了理解Rxjava的思路,真正要去理解它,仍是要多看源码。

在我看来,Rxjava有点像观察者模式和责任链模式的结合,普通的观察者模式通常是被观察者通知多个观察者,而Rxjava则是被观察者通知第一个Obsever,接下来Observer依次通知其余节点的Observer,造成一个“观察链”,将观察者模式进行了一种相似链式的变换,每一个节点又会执行它不一样的“职责”,很是巧妙,总结如下就是:
最原始的订阅事件从最后一个节点开始,沿着Obsevable节点往上游传递,事件源头处理完任务后,通知给最上游的观察者,而后通知沿着Observer链条往下游传递,直到最后一个观察者结束。


后记,关于flatmap

关于flatmap这个操做符,读者可能会用到,但理解起来又比较难,咱们经过本文,其实就很容易从源码中理解这个操做符的含义。这里我顺便给你们解释一下,仍是看图:

上图简化了整个事件流向,咱们对事件源进行了flatmap操做,flatmap在订阅流的时候跟其余的操做符基本一致,可是在观察者回调流中却很不同,它在回调流中作了如下内容:
flatmap将上游传过来的数据进行了一次变换,变成了一个Observable,如何变的是由开发者自定义的,好比图中下面三个竖着的三个Observable节点流,这条流跟上面的四个Observable节点本质上是同样的。flatmap这个节点的Obsever将上游的数据转化成了一个新的Observable流,而后执行这条新的流,当这条新的流走完时,会接着原来的观察者流继续走下去。也就是说,flatMap这个操做符将一条新的Observable节点流“插入”到原始的观察者回调流上去了。
那图中的橘黄色和紫色的虚线是什么意思呢?
实际上它是flatmap的一种特殊状况,当新插入的流的事件源有多个的时候,这是会产生分流,每一个流都会执行一遍下游的原始节点。咱们拿下面这个例子来看:

String[] mainArmy = {"第一大队", "第二大队", "第三大队"};
        Observable.fromArray(mainArmy)
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
                        String[] littleArmy = {s + "的第一小队", s + "的第二小队", s + "的第三小队"};
                        return Observable.fromArray(littleArmy);
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String little) throws Exception {
                System.out.println(little);
            }
        });

这个代码运行结果是

第一大队的第一小队
第一大队的第二小队
第一大队的第三小队
第二大队的第一小队
第二大队的第二小队
第二大队的第三小队
第三大队的第一小队
第三大队的第二小队
第三大队的第三小队

这个例子也许很好的体现了flatmap这个操做符的意义,把list铺平展开,并且防止了繁琐的嵌套循环。可是,虽然flatmap很擅长处理这种问题(我不知道这个操做符是否是为了解决这个问题而设计出来的),但flatmap的功能却远不只如此,它的本质是在合并Obsevable流,能够作不少事情,好比咱们网络请求的“连环请求”,举个例子,首先经过书本的Id获取出版商名字,而后拿到出版商名字后获取出版社信息。

api.getBookPublisherNameById("01102").flatmap(new Function<String, ObservableSource<PublisherInfo>>() {
        @Override
            public ObservableSource<PublisherInfo> apply(String s) throws Exception {
                return api.getPublisherInfoByName(s);
        }
    }).subscribe(new COnsume<PublisherInfo>() {
                 @Override
            public void accept(PublisherInfo little) throws Exception {
                //获取到出版社信息
            }
})

看完这里,flatmap是否是也蛮好理解的~

相关文章
相关标签/搜索